对bulk request的处理流程:

1、遍历所有的request,对其做一些加工,主要包括:获取routing(如果mapping里有的话)、指定的timestamp(如果没有带timestamp会使用当前时间),如果没有指定id字段,在action.bulk.action.allow_id_generation配置为true的情况下,会自动生成一个base64UUID作为id字段,并会将request的opType字段置为CREATE,因为如果是使用es自动生成的id的话,默认就是createdocument而不是updatedocument。(注:坑爹啊,我从github上面下的最新的ES代码,发现自动生成id这一段已经没有设置opType字段了,看起来和有指定id是一样的处理逻辑了,见https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java)。

2、创建一个shardId--> Operation的Map,再次遍历所有的request,获取获取每个request应该发送到的shardId,获取的过程是这样的:request有routing就直接返回,如果没有,会先对id求一个hash,这里的hash函数默认是Murmur3,当然你也可以通过配置index.legacy.routing.hash.type来决定使用的hash函数,决定发到哪个shard:

return MathUtils.mod(hash, indexMetaData.getNumberOfShards()); 注意:最新版ES代码已经改变!

即用hash对shard的总数求模来获取shardId,将shardId作为key,通过遍历的index和request组成BulkItemRequest的集合作为value放入之前说的map中(为什么要拿到遍历的index,因为在bulk response中可以看到对每个request的请求处理结果的),其实说了这么多就是要对request按shard来分组(为负载均衡)。

3、遍历上面得到的map,对不同的分组创建一个bulkShardRequest,包含配置consistencyLevel和timeout。并从集群state中获得primary shard,如果primary在本机就直接执行,如果不在会再发送到其shard所在的node。

源码位置:https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

    void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses ) {final ClusterState clusterState = clusterService.state();// TODO use timeout to wait here if its blocked...
        clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);MetaData metaData = clusterState.metaData();for (int i = 0; i < bulkRequest.requests.size(); i++) {DocWriteRequest docWriteRequest = bulkRequest.requests.get(i);//the request can only be null because we set it to null in the previous step, so it gets ignoredif (docWriteRequest == null) {continue;}if (addFailureIfIndexIsUnavailable(docWriteRequest, bulkRequest, responses, i, concreteIndices, metaData)) {continue;}Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);try {switch (docWriteRequest.opType()) {case CREATE:case INDEX:IndexRequest indexRequest = (IndexRequest) docWriteRequest;MappingMetaData mappingMd = null;final IndexMetaData indexMetaData = metaData.index(concreteIndex);if (indexMetaData != null) {mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());}indexRequest.resolveRouting(metaData);indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName());break;case UPDATE:TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);break;case DELETE:TransportDeleteAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (DeleteRequest) docWriteRequest);break;default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");}} catch (ElasticsearchParseException | RoutingMissingException e) {BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id(), e);BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);responses.set(i, bulkItemResponse);// make sure the request gets never processed againbulkRequest.requests.set(i, null);}}// first, go over all the requests and create a ShardId -> Operations mappingMap<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();for (int i = 0; i < bulkRequest.requests.size(); i++) {DocWriteRequest request = bulkRequest.requests.get(i);if (request == null) {continue;}String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());shardRequests.add(new BulkItemRequest(i, request));}if (requestsByShard.isEmpty()) {listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));return;}final AtomicInteger counter = new AtomicInteger(requestsByShard.size());String nodeId = clusterService.localNode().getId();for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {final ShardId shardId = entry.getKey();final List<BulkItemRequest> requests = entry.getValue();BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),requests.toArray(new BulkItemRequest[requests.size()]));bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());bulkShardRequest.timeout(bulkRequest.timeout());if (task != null) {bulkShardRequest.setParentTask(nodeId, task.getId());}shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {@Overridepublic void onResponse(BulkShardResponse bulkShardResponse) {for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {// we may have no response if item failedif (bulkItemResponse.getResponse() != null) {bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());}responses.set(bulkItemResponse.getItemId(), bulkItemResponse);}if (counter.decrementAndGet() == 0) {finishHim();}}});}}

路由代码:

ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();

转载于:https://www.cnblogs.com/bonelee/p/6078947.html

ES bulk源码分析——ES 5.0相关推荐

  1. 【es】es API源码分析

    1.概述 转载:[6]elasticsearch源码深入分析--API源码分析 2.RestController的继承关系 从Node实例化的过程中,我们知道ActionModule是Node提供Re ...

  2. vuex 源码分析_Vuex 2.0 源码分析(下)

    大家好,我叫黄轶,来自滴滴公共前端团队,最近在幕课网上线了一门 Vue.js 的实战课程--<Vue.js高仿饿了么外卖App 2016最火前端框架>,同时,我们团队最近写了一本书 --& ...

  3. Android 框架学习2:源码分析 EventBus 3.0 如何实现事件总线

    Go beyond yourself rather than beyond others. 上篇文章 深入理解 EventBus 3.0 之使用篇 我们了解了 EventBus 的特性以及如何使用,这 ...

  4. Linux0.1源码分析,《Linux 0.01内核分析与操作系统设计——创造你自己的操作系统》...

    该楼层疑似违规已被系统折叠 隐藏此楼查看此楼 readme 本光盘为<Linux 0.0.1内核分析与操作系统设计--创造你自己的操作系统>一书的配套光盘,读者可自由取其源文件做学习,研究 ...

  5. elasticsearch index、create和update的源码分析

    https://segmentfault.com/a/1190000011272749 社区里面有人问了如下一个问题: 执行 bulk 索引文档的时候,用 index 或者 create 类型并且自定 ...

  6. 一步步实现windows版ijkplayer系列文章之三——Ijkplayer播放器源码分析之音视频输出——音频篇

    https://www.cnblogs.com/harlanc/p/9693983.html 目录 OpenSL ES & AudioTrack 源码分析 创建播放器音频输出对象 配置并创建音 ...

  7. Redis源码分析:基础概念介绍与启动概述

    Redis源码分析 基于Redis-5.0.4版本,进行基础的源码分析,主要就是分析一些平常使用过程中的内容.仅作为相关内容的学习记录,有关Redis源码学习阅读比较广泛的便是<Redis设计与 ...

  8. jQuery源码分析系列

    声明:本文为原创文章,如需转载,请注明来源并保留原文链接Aaron,谢谢! 版本截止到2013.8.24 jQuery官方发布最新的的2.0.3为准 附上每一章的源码注释分析 :https://git ...

  9. Storm源码分析之四: Trident源码分析

    Storm源码分析之四: Trident源码分析 @(STORM)[storm] Storm源码分析之四 Trident源码分析 一概述 0小结 1简介 2关键类 1Spout的创建 2spout的消 ...

最新文章

  1. ASP.NET中相对路径的使用总结
  2. QT的QVarLengthArray类的使用
  3. 数组的最长平台c语言,2010台湾省C语言版高级
  4. vue-cli 中使用 less 插件
  5. html 和jsp 引入jquery_不用jsp怎么实现前后端交互?给萌新后端的ajax教程(1)
  6. jwt同一会话_在会话中使用JWT
  7. ROS下多个kinect在一台电脑上同时运行
  8. getHibernateTemplate()为NUll
  9. pytorch中RNN注意事项(关于input和output维度)
  10. 【BZOJ4503】两个串(FFT)
  11. php写的仿爱帮网电话号码字符串处理(防采集)。
  12. java课题设计实验报告,JAVA简单记事本程序设计实验报告
  13. 数据结构之红黑树插入案例详解
  14. Sugar BI数据可视化图表标注
  15. 2022年起重机司机(限桥式起重机)考试模拟100题及答案
  16. SQL Server 2016 Always Encrypted(始终加密)
  17. 用于单图像超分辨率的增强深度残差网络
  18. SpringCloudGateway路由定义存至Mysql数据库
  19. openwrt查看linux内核,OpenWrt教程-如何升级linux内核版本
  20. ibm服务器报错代码大全_IBM x系列服务器报错代码

热门文章

  1. 移动端HTML5video视频播放优化实践
  2. python数组每个元素加1_python-根据键转换numpy数组中的每个元素
  3. python最流行的框架_2020年最流行Python web开发框架(下)
  4. python贪吃蛇_如何用Python写一个贪吃蛇?
  5. 使用商业模式的九大模块去分析京东的商业模式_商业模式—筑基篇(1)
  6. opencv 安装_如何在 CentOS 8 上安装 OpenCV
  7. 怎么判断网络回路_收藏|电梯安全回路分析说明及故障判断
  8. catia过载属性使用方法_catia简明操作手册
  9. python开发框架 代码生成_我的第一个python web开发框架(28)——定制ORM(四)...
  10. 【机器学习入门到精通系列】应用机器学习的建议(方差和偏差)