目录

摘要:

时序图:

可以看出流程如下:

源码处理:

启动服务:

对每个新连接开辟新的协程:

读取数据并解析请求:

以executeSelectStatement为例的请求处理源码:


摘要:

解析influxdb集群启动及请求处理流程

时序图:

可以看出流程如下:

  1. 启动服务后监听对应端口
  2. 有请求连接后, 建立新协程处理该连接的数据
    1. 每个连接单独一个协程处理数据
  3. 读取连接发送的数据, 解析请求
  4. 根据请求的类型做不同的接口分发

源码处理:

启动服务:

// Service processes data received over raw TCP connections.
type Service struct {mu sync.RWMutexwg      sync.WaitGroupclosing chan struct{}Listener net.ListenerMetaClient interface {ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo)}TSDBStore TSDBStoreLogger  *zap.LoggerstatMap *expvar.Map
}// NewService returns a new instance of Service.
func NewService(c Config) *Service {return &Service{closing: make(chan struct{}),//Logger:  log.New(os.Stderr, "[cluster] ", log.LstdFlags),Logger:  zap.NewNop(),statMap: freetsdb.NewStatistics("cluster", "cluster", nil),}
}// Open opens the network listener and begins serving requests.
func (s *Service) Open() error {s.Logger.Info("Starting cluster service")// Begin serving conections.s.wg.Add(1)go s.serve()return nil
}

对每个新连接开辟新的协程:


// serve accepts connections from the listener and handles them.
func (s *Service) serve() {defer s.wg.Done()for {// Check if the service is shutting down.select {case <-s.closing:returndefault:}// Accept the next connection.conn, err := s.Listener.Accept()if err != nil {if strings.Contains(err.Error(), "connection closed") {s.Logger.Info("cluster service accept", zap.Error(err))return}s.Logger.Info("accept", zap.Error(err))continue}// Delegate connection handling to a separate goroutine.s.wg.Add(1)go func() {defer s.wg.Done()s.handleConn(conn)}()}
}

读取数据并解析请求:


// handleConn services an individual TCP connection.
func (s *Service) handleConn(conn net.Conn) {// Ensure connection is closed when service is closed.closing := make(chan struct{})defer close(closing)go func() {select {case <-closing:case <-s.closing:}conn.Close()}()for {// Read type-length-value.typ, err := ReadType(conn)if err != nil {if strings.HasSuffix(err.Error(), "EOF") {return}s.Logger.Info("unable to read type", zap.Error(err))return}// Delegate message processing by type.switch typ {case writeShardRequestMessage:buf, err := ReadLV(conn)if err != nil {s.Logger.Info("unable to read length-value:", zap.Error(err))return}s.statMap.Add(writeShardReq, 1)err = s.processWriteShardRequest(buf)if err != nil {s.Logger.Info("process write shard error:", zap.Error(err))}s.writeShardResponse(conn, err)case executeStatementRequestMessage:buf, err := ReadLV(conn)if err != nil {s.Logger.Info("unable to read length-value:", zap.Error(err))return}err = s.processExecuteStatementRequest(buf)if err != nil {s.Logger.Info("process execute statement error:", zap.Error(err))}s.writeShardResponse(conn, err)case createIteratorRequestMessage:s.statMap.Add(createIteratorReq, 1)s.processCreateIteratorRequest(conn)returncase fieldDimensionsRequestMessage:s.statMap.Add(fieldDimensionsReq, 1)s.processFieldDimensionsRequest(conn)returndefault:s.Logger.Info("coordinator service message type not found:", zap.Uint8("Type", uint8(typ)))}}
}

以executeSelectStatement为例的请求处理源码:


func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) error {cur, err := e.createIterators(ctx, stmt, ctx.ExecutionOptions)if err != nil {return err}// Generate a row emitter from the iterator set.em := query.NewEmitter(cur, ctx.ChunkSize)defer em.Close()// Emit rows to the results channel.var writeN int64var emitted boolvar pointsWriter *BufferedPointsWriterif stmt.Target != nil {pointsWriter = NewBufferedPointsWriter(e.PointsWriter, stmt.Target.Measurement.Database, stmt.Target.Measurement.RetentionPolicy, 10000)}for {row, partial, err := em.Emit()if err != nil {return err} else if row == nil {// Check if the query was interrupted while emitting.select {case <-ctx.Done():return ctx.Err()default:}break}// Write points back into system for INTO statements.if stmt.Target != nil {n, err := e.writeInto(pointsWriter, stmt, row)if err != nil {return err}writeN += ncontinue}result := &query.Result{Series:  []*models.Row{row},Partial: partial,}// Send results or exit if closing.if err := ctx.Send(result); err != nil {return err}emitted = true}// Flush remaining points and emit write count if an INTO statement.if stmt.Target != nil {if err := pointsWriter.Flush(); err != nil {return err}var messages []*query.Messageif ctx.ReadOnly {messages = append(messages, query.ReadOnlyWarning(stmt.String()))}return ctx.Send(&query.Result{Messages: messages,Series: []*models.Row{{Name:    "result",Columns: []string{"time", "written"},Values:  [][]interface{}{{time.Unix(0, 0).UTC(), writeN}},}},})}// Always emit at least one result.if !emitted {return ctx.Send(&query.Result{Series: make([]*models.Row, 0),})}return nil
}

func (e *StatementExecutor) writeInto(w pointsWriter, stmt *influxql.SelectStatement, row *models.Row) (n int64, err error) {if stmt.Target.Measurement.Database == "" {return 0, errNoDatabaseInTarget}// It might seem a bit weird that this is where we do this, since we will have to// convert rows back to points. The Executors (both aggregate and raw) are complex// enough that changing them to write back to the DB is going to be clumsy//// it might seem weird to have the write be in the Executor, but the interweaving of// limitedRowWriter and ExecuteAggregate/Raw makes it ridiculously hard to make sure that the// results will be the same as when queried normally.name := stmt.Target.Measurement.Nameif name == "" {name = row.Name}points, err := convertRowToPoints(name, row)if err != nil {return 0, err}if err := w.WritePointsInto(&IntoWriteRequest{Database:        stmt.Target.Measurement.Database,RetentionPolicy: stmt.Target.Measurement.RetentionPolicy,Points:          points,}); err != nil {return 0, err}return int64(len(points)), nil
}

// writeToShards writes points to a shard and ensures a write consistency level has been met.  If the write
// partially succeeds, ErrPartialWrite is returned.
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string,consistency ConsistencyLevel, points []models.Point) error {atomic.AddInt64(&w.stats.PointWriteReqLocal, int64(len(points)))// The required number of writes to achieve the requested consistency levelrequired := len(shard.Owners)switch consistency {case ConsistencyLevelAny, ConsistencyLevelOne:required = 1case ConsistencyLevelQuorum:required = required/2 + 1}// response channel for each shard writer go routinetype AsyncWriteResult struct {Owner meta.ShardOwnerErr   error}ch := make(chan *AsyncWriteResult, len(shard.Owners))for _, owner := range shard.Owners {go func(shardID uint64, owner meta.ShardOwner, points []models.Point) {if w.Node.ID == owner.NodeID {atomic.AddInt64(&w.stats.PointWriteReqLocal, int64(len(points)))err := w.TSDBStore.WriteToShard(shardID, points)// If we've written to shard that should exist on the current node, but the store has// not actually created this shard, tell it to create it and retry the writeif err == tsdb.ErrShardNotFound {err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID, true)if err != nil {ch <- &AsyncWriteResult{owner, err}return}err = w.TSDBStore.WriteToShard(shardID, points)}ch <- &AsyncWriteResult{owner, err}return}atomic.AddInt64(&w.stats.PointWriteReqRemote, int64(len(points)))err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points)if err != nil && tsdb.IsRetryable(err) {// The remote write failed so queue it via hinted handoffatomic.AddInt64(&w.stats.WritePointReqHH, int64(len(points)))hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points)if hherr != nil {ch <- &AsyncWriteResult{owner, hherr}return}// If the write consistency level is ANY, then a successful hinted handoff can// be considered a successful write so send nil to the response channel// otherwise, let the original error propagate to the response channelif hherr == nil && consistency == ConsistencyLevelAny {ch <- &AsyncWriteResult{owner, nil}return}}ch <- &AsyncWriteResult{owner, err}}(shard.ID, owner, points)}var wrote inttimeout := time.After(w.WriteTimeout)var writeError errorfor range shard.Owners {select {case <-w.closing:return ErrWriteFailedcase <-timeout:atomic.AddInt64(&w.stats.WriteTimeout, 1)// return timeout error to callerreturn ErrTimeoutcase result := <-ch:// If the write returned an error, continue to the next responseif result.Err != nil {atomic.AddInt64(&w.stats.WriteErr, 1)w.Logger.Info("write failed", zap.Uint64("shard", shard.ID), zap.Uint64("node", result.Owner.NodeID), zap.Error(result.Err))// Keep track of the first error we see to return back to the clientif writeError == nil {writeError = result.Err}continue}wrote++// We wrote the required consistency levelif wrote >= required {atomic.AddInt64(&w.stats.WriteOK, 1)return nil}}}if wrote > 0 {atomic.AddInt64(&w.stats.WritePartial, 1)return ErrPartialWrite}if writeError != nil {return fmt.Errorf("write failed: %v", writeError)}return ErrWriteFailed}

2022-02-14 influxdb集群coordinator启动及请求处理流程解析相关推荐

  1. 互联网级监控系统必备-时序数据库之Influxdb集群及踩过的坑

    上篇博文中,我们介绍了做互联网级监控系统的必备-Influxdb的关键特性.数据读写.应用场景:互联网级监控系统必备-时序数据库之Influxdb 本文中,我们介绍Influxdb数据库集群的搭建,同 ...

  2. influxDB集群模式实践

    女主宣言 influxDB数据库以其优秀的时序数据存储和查询能力,在处理和分析类似资源监控等时序数据方面得到了广泛的应用.而influxDB自带的各种特殊函数如求平均值.标准差.随机取数据,使数据分析 ...

  3. windows下配置redis集群,启动节点报错:createing server TCP listening socket *:7000:listen:Unknown error...

    windows下配置redis集群,启动节点报错:createing server TCP listening socket *:7000:listen:Unknown error 学习了:https ...

  4. Zookeeper源码分析:集群模式启动概述

    参考资料 <<从PAXOS到ZOOKEEPER分布式一致性原理与实践>> zookeeper-3.0.0 Zookeeper概述 Zookeeper是一个分布式的,开放源码的分 ...

  5. zookeeper做集群后启动不了,大部分原因是防火墙未关闭

    zookeeper做单机版,可以正常启动:但是zookeeper做集群后启动不了,大部分原因是防火墙未关闭. centos的关闭防火墙方法比较独立. systemctl stop firewalld. ...

  6. 大数据之-Hadoop完全分布式_集群的启动和停止方式总结---大数据之hadoop工作笔记0039

    然后我们来总结一下hadoop集群的,启动停止方式 可以看到上面的方法可以在一个服务器上,去停止或启动服务. 然后后面我们会用脚本,去启动 start-dfs.sh 或者 stop-dfs.sh 这个 ...

  7. yarn集群下启动spark错误WARN:66 - Neither spark.yarn.jars nor spark.yarn.archive is set

    yarn集群下启动spark错误如下: WARN  Client:66 - Neither spark.yarn.jars nor spark.yarn.archive is set, falling ...

  8. 利用Docker配置influxdb集群

    利用Docker配置influxdb集群 –by leiyong 2022-4-27 1.文档历史 2.安装influxdb docker run -d --name influxdb-server1 ...

  9. Hadoop集群如何启动

    Hadoop集群如何启动 1 hadoop软件安装的目录:/opt/app 有如下软件:hbase.spark-2.1.0-bin-hadoop2.4.hbconf.sqoop.hdconf.hive ...

最新文章

  1. Mysql将日期转为字符串
  2. 命令行里对SAP Spartacus执行命令ng test core
  3. spark sql读取hive底层_[大数据]spark sql读写Hive数据不一致
  4. Outlook Express 自動回信設定
  5. 快速清除oracle多个表,Oracle数据库之批量清除一个表的数据,TB级别数据
  6. SAMBA用户访问指定的目录
  7. 判断一个二叉树是不是对称二叉树
  8. 阿里云计算新增ODPS、SLS两款云产品
  9. 32. iostat
  10. Python读CookBook之数据结构和算法
  11. Oracle的expdp导出、impdp导出命令
  12. zzulioj1001C语言答案,ZZULIOJ
  13. 微信小程序mpvue框架
  14. 使用豆瓣源下载python包
  15. MATLAB中text函数使用
  16. Edge浏览器Alt+Tab快捷键切换其他应用窗口
  17. MySQL从删库到跑路(9):group by——给漂亮小姐姐分个组
  18. 老生常谈:让软件留下临终遗言并优雅地关闭
  19. 计算机网络室工作总结,计算机教室工作总结范文
  20. 嵌入式系统开发技术(00)

热门文章

  1. Psins代码解析之常用的子函数
  2. 郝健: Linux内存管理学习笔记-第3节课
  3. Pyside2,Pycharm中右键转py文件的时候,出来的结果总是c++代码
  4. 鸡兔同笼。鸡兔一共有 50 只,脚一共有 160 只//,问鸡和兔各多少只?要求鸡兔至少一样一只
  5. thinkphp5 错误调试之模块不存在
  6. c++判断某年某月天数
  7. 【QT】C++ GUI Qt4 学习笔记3
  8. 大学生计算机python_人人都能学计算机:计算机科学入门与Python编程_学堂在线章节测试答案...
  9. 微信小程序怎么样取代传统收款设备的流程
  10. 虚拟机测试服务器最大带宽,利用VMware ESXi测量网络延迟 你了解多少