2022-02-14 influxdb集群coordinator启动及请求处理流程解析
目录
摘要:
时序图:
可以看出流程如下:
源码处理:
启动服务:
对每个新连接开辟新的协程:
读取数据并解析请求:
以executeSelectStatement为例的请求处理源码:
摘要:
解析influxdb集群启动及请求处理流程
时序图:
可以看出流程如下:
- 启动服务后监听对应端口
- 有请求连接后, 建立新协程处理该连接的数据
- 每个连接单独一个协程处理数据
- 读取连接发送的数据, 解析请求
- 根据请求的类型做不同的接口分发
源码处理:
启动服务:
// Service processes data received over raw TCP connections.
type Service struct {mu sync.RWMutexwg sync.WaitGroupclosing chan struct{}Listener net.ListenerMetaClient interface {ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo)}TSDBStore TSDBStoreLogger *zap.LoggerstatMap *expvar.Map
}// NewService returns a new instance of Service.
func NewService(c Config) *Service {return &Service{closing: make(chan struct{}),//Logger: log.New(os.Stderr, "[cluster] ", log.LstdFlags),Logger: zap.NewNop(),statMap: freetsdb.NewStatistics("cluster", "cluster", nil),}
}// Open opens the network listener and begins serving requests.
func (s *Service) Open() error {s.Logger.Info("Starting cluster service")// Begin serving conections.s.wg.Add(1)go s.serve()return nil
}
对每个新连接开辟新的协程:
// serve accepts connections from the listener and handles them.
func (s *Service) serve() {defer s.wg.Done()for {// Check if the service is shutting down.select {case <-s.closing:returndefault:}// Accept the next connection.conn, err := s.Listener.Accept()if err != nil {if strings.Contains(err.Error(), "connection closed") {s.Logger.Info("cluster service accept", zap.Error(err))return}s.Logger.Info("accept", zap.Error(err))continue}// Delegate connection handling to a separate goroutine.s.wg.Add(1)go func() {defer s.wg.Done()s.handleConn(conn)}()}
}
读取数据并解析请求:
// handleConn services an individual TCP connection.
func (s *Service) handleConn(conn net.Conn) {// Ensure connection is closed when service is closed.closing := make(chan struct{})defer close(closing)go func() {select {case <-closing:case <-s.closing:}conn.Close()}()for {// Read type-length-value.typ, err := ReadType(conn)if err != nil {if strings.HasSuffix(err.Error(), "EOF") {return}s.Logger.Info("unable to read type", zap.Error(err))return}// Delegate message processing by type.switch typ {case writeShardRequestMessage:buf, err := ReadLV(conn)if err != nil {s.Logger.Info("unable to read length-value:", zap.Error(err))return}s.statMap.Add(writeShardReq, 1)err = s.processWriteShardRequest(buf)if err != nil {s.Logger.Info("process write shard error:", zap.Error(err))}s.writeShardResponse(conn, err)case executeStatementRequestMessage:buf, err := ReadLV(conn)if err != nil {s.Logger.Info("unable to read length-value:", zap.Error(err))return}err = s.processExecuteStatementRequest(buf)if err != nil {s.Logger.Info("process execute statement error:", zap.Error(err))}s.writeShardResponse(conn, err)case createIteratorRequestMessage:s.statMap.Add(createIteratorReq, 1)s.processCreateIteratorRequest(conn)returncase fieldDimensionsRequestMessage:s.statMap.Add(fieldDimensionsReq, 1)s.processFieldDimensionsRequest(conn)returndefault:s.Logger.Info("coordinator service message type not found:", zap.Uint8("Type", uint8(typ)))}}
}
以executeSelectStatement为例的请求处理源码:
func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) error {cur, err := e.createIterators(ctx, stmt, ctx.ExecutionOptions)if err != nil {return err}// Generate a row emitter from the iterator set.em := query.NewEmitter(cur, ctx.ChunkSize)defer em.Close()// Emit rows to the results channel.var writeN int64var emitted boolvar pointsWriter *BufferedPointsWriterif stmt.Target != nil {pointsWriter = NewBufferedPointsWriter(e.PointsWriter, stmt.Target.Measurement.Database, stmt.Target.Measurement.RetentionPolicy, 10000)}for {row, partial, err := em.Emit()if err != nil {return err} else if row == nil {// Check if the query was interrupted while emitting.select {case <-ctx.Done():return ctx.Err()default:}break}// Write points back into system for INTO statements.if stmt.Target != nil {n, err := e.writeInto(pointsWriter, stmt, row)if err != nil {return err}writeN += ncontinue}result := &query.Result{Series: []*models.Row{row},Partial: partial,}// Send results or exit if closing.if err := ctx.Send(result); err != nil {return err}emitted = true}// Flush remaining points and emit write count if an INTO statement.if stmt.Target != nil {if err := pointsWriter.Flush(); err != nil {return err}var messages []*query.Messageif ctx.ReadOnly {messages = append(messages, query.ReadOnlyWarning(stmt.String()))}return ctx.Send(&query.Result{Messages: messages,Series: []*models.Row{{Name: "result",Columns: []string{"time", "written"},Values: [][]interface{}{{time.Unix(0, 0).UTC(), writeN}},}},})}// Always emit at least one result.if !emitted {return ctx.Send(&query.Result{Series: make([]*models.Row, 0),})}return nil
}
func (e *StatementExecutor) writeInto(w pointsWriter, stmt *influxql.SelectStatement, row *models.Row) (n int64, err error) {if stmt.Target.Measurement.Database == "" {return 0, errNoDatabaseInTarget}// It might seem a bit weird that this is where we do this, since we will have to// convert rows back to points. The Executors (both aggregate and raw) are complex// enough that changing them to write back to the DB is going to be clumsy//// it might seem weird to have the write be in the Executor, but the interweaving of// limitedRowWriter and ExecuteAggregate/Raw makes it ridiculously hard to make sure that the// results will be the same as when queried normally.name := stmt.Target.Measurement.Nameif name == "" {name = row.Name}points, err := convertRowToPoints(name, row)if err != nil {return 0, err}if err := w.WritePointsInto(&IntoWriteRequest{Database: stmt.Target.Measurement.Database,RetentionPolicy: stmt.Target.Measurement.RetentionPolicy,Points: points,}); err != nil {return 0, err}return int64(len(points)), nil
}
// writeToShards writes points to a shard and ensures a write consistency level has been met. If the write
// partially succeeds, ErrPartialWrite is returned.
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string,consistency ConsistencyLevel, points []models.Point) error {atomic.AddInt64(&w.stats.PointWriteReqLocal, int64(len(points)))// The required number of writes to achieve the requested consistency levelrequired := len(shard.Owners)switch consistency {case ConsistencyLevelAny, ConsistencyLevelOne:required = 1case ConsistencyLevelQuorum:required = required/2 + 1}// response channel for each shard writer go routinetype AsyncWriteResult struct {Owner meta.ShardOwnerErr error}ch := make(chan *AsyncWriteResult, len(shard.Owners))for _, owner := range shard.Owners {go func(shardID uint64, owner meta.ShardOwner, points []models.Point) {if w.Node.ID == owner.NodeID {atomic.AddInt64(&w.stats.PointWriteReqLocal, int64(len(points)))err := w.TSDBStore.WriteToShard(shardID, points)// If we've written to shard that should exist on the current node, but the store has// not actually created this shard, tell it to create it and retry the writeif err == tsdb.ErrShardNotFound {err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID, true)if err != nil {ch <- &AsyncWriteResult{owner, err}return}err = w.TSDBStore.WriteToShard(shardID, points)}ch <- &AsyncWriteResult{owner, err}return}atomic.AddInt64(&w.stats.PointWriteReqRemote, int64(len(points)))err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points)if err != nil && tsdb.IsRetryable(err) {// The remote write failed so queue it via hinted handoffatomic.AddInt64(&w.stats.WritePointReqHH, int64(len(points)))hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points)if hherr != nil {ch <- &AsyncWriteResult{owner, hherr}return}// If the write consistency level is ANY, then a successful hinted handoff can// be considered a successful write so send nil to the response channel// otherwise, let the original error propagate to the response channelif hherr == nil && consistency == ConsistencyLevelAny {ch <- &AsyncWriteResult{owner, nil}return}}ch <- &AsyncWriteResult{owner, err}}(shard.ID, owner, points)}var wrote inttimeout := time.After(w.WriteTimeout)var writeError errorfor range shard.Owners {select {case <-w.closing:return ErrWriteFailedcase <-timeout:atomic.AddInt64(&w.stats.WriteTimeout, 1)// return timeout error to callerreturn ErrTimeoutcase result := <-ch:// If the write returned an error, continue to the next responseif result.Err != nil {atomic.AddInt64(&w.stats.WriteErr, 1)w.Logger.Info("write failed", zap.Uint64("shard", shard.ID), zap.Uint64("node", result.Owner.NodeID), zap.Error(result.Err))// Keep track of the first error we see to return back to the clientif writeError == nil {writeError = result.Err}continue}wrote++// We wrote the required consistency levelif wrote >= required {atomic.AddInt64(&w.stats.WriteOK, 1)return nil}}}if wrote > 0 {atomic.AddInt64(&w.stats.WritePartial, 1)return ErrPartialWrite}if writeError != nil {return fmt.Errorf("write failed: %v", writeError)}return ErrWriteFailed}
2022-02-14 influxdb集群coordinator启动及请求处理流程解析相关推荐
- 互联网级监控系统必备-时序数据库之Influxdb集群及踩过的坑
上篇博文中,我们介绍了做互联网级监控系统的必备-Influxdb的关键特性.数据读写.应用场景:互联网级监控系统必备-时序数据库之Influxdb 本文中,我们介绍Influxdb数据库集群的搭建,同 ...
- influxDB集群模式实践
女主宣言 influxDB数据库以其优秀的时序数据存储和查询能力,在处理和分析类似资源监控等时序数据方面得到了广泛的应用.而influxDB自带的各种特殊函数如求平均值.标准差.随机取数据,使数据分析 ...
- windows下配置redis集群,启动节点报错:createing server TCP listening socket *:7000:listen:Unknown error...
windows下配置redis集群,启动节点报错:createing server TCP listening socket *:7000:listen:Unknown error 学习了:https ...
- Zookeeper源码分析:集群模式启动概述
参考资料 <<从PAXOS到ZOOKEEPER分布式一致性原理与实践>> zookeeper-3.0.0 Zookeeper概述 Zookeeper是一个分布式的,开放源码的分 ...
- zookeeper做集群后启动不了,大部分原因是防火墙未关闭
zookeeper做单机版,可以正常启动:但是zookeeper做集群后启动不了,大部分原因是防火墙未关闭. centos的关闭防火墙方法比较独立. systemctl stop firewalld. ...
- 大数据之-Hadoop完全分布式_集群的启动和停止方式总结---大数据之hadoop工作笔记0039
然后我们来总结一下hadoop集群的,启动停止方式 可以看到上面的方法可以在一个服务器上,去停止或启动服务. 然后后面我们会用脚本,去启动 start-dfs.sh 或者 stop-dfs.sh 这个 ...
- yarn集群下启动spark错误WARN:66 - Neither spark.yarn.jars nor spark.yarn.archive is set
yarn集群下启动spark错误如下: WARN Client:66 - Neither spark.yarn.jars nor spark.yarn.archive is set, falling ...
- 利用Docker配置influxdb集群
利用Docker配置influxdb集群 –by leiyong 2022-4-27 1.文档历史 2.安装influxdb docker run -d --name influxdb-server1 ...
- Hadoop集群如何启动
Hadoop集群如何启动 1 hadoop软件安装的目录:/opt/app 有如下软件:hbase.spark-2.1.0-bin-hadoop2.4.hbconf.sqoop.hdconf.hive ...
最新文章
- Mysql将日期转为字符串
- 命令行里对SAP Spartacus执行命令ng test core
- spark sql读取hive底层_[大数据]spark sql读写Hive数据不一致
- Outlook Express 自動回信設定
- 快速清除oracle多个表,Oracle数据库之批量清除一个表的数据,TB级别数据
- SAMBA用户访问指定的目录
- 判断一个二叉树是不是对称二叉树
- 阿里云计算新增ODPS、SLS两款云产品
- 32. iostat
- Python读CookBook之数据结构和算法
- Oracle的expdp导出、impdp导出命令
- zzulioj1001C语言答案,ZZULIOJ
- 微信小程序mpvue框架
- 使用豆瓣源下载python包
- MATLAB中text函数使用
- Edge浏览器Alt+Tab快捷键切换其他应用窗口
- MySQL从删库到跑路(9):group by——给漂亮小姐姐分个组
- 老生常谈:让软件留下临终遗言并优雅地关闭
- 计算机网络室工作总结,计算机教室工作总结范文
- 嵌入式系统开发技术(00)
热门文章
- Psins代码解析之常用的子函数
- 郝健: Linux内存管理学习笔记-第3节课
- Pyside2,Pycharm中右键转py文件的时候,出来的结果总是c++代码
- 鸡兔同笼。鸡兔一共有 50 只,脚一共有 160 只//,问鸡和兔各多少只?要求鸡兔至少一样一只
- thinkphp5 错误调试之模块不存在
- c++判断某年某月天数
- 【QT】C++ GUI Qt4 学习笔记3
- 大学生计算机python_人人都能学计算机:计算机科学入门与Python编程_学堂在线章节测试答案...
- 微信小程序怎么样取代传统收款设备的流程
- 虚拟机测试服务器最大带宽,利用VMware ESXi测量网络延迟 你了解多少