ES bulk源码分析——ES 5.0
对bulk request的处理流程:
1、遍历所有的request,对其做一些加工,主要包括:获取routing(如果mapping里有的话)、指定的timestamp(如果没有带timestamp会使用当前时间),如果没有指定id字段,在action.bulk.action.allow_id_generation配置为true的情况下,会自动生成一个base64UUID作为id字段,并会将request的opType字段置为CREATE,因为如果是使用es自动生成的id的话,默认就是createdocument而不是updatedocument。(注:坑爹啊,我从github上面下的最新的ES代码,发现自动生成id这一段已经没有设置opType字段了,看起来和有指定id是一样的处理逻辑了,见https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java)。
2、创建一个shardId--> Operation的Map,再次遍历所有的request,获取获取每个request应该发送到的shardId,获取的过程是这样的:request有routing就直接返回,如果没有,会先对id求一个hash,这里的hash函数默认是Murmur3,当然你也可以通过配置index.legacy.routing.hash.type来决定使用的hash函数,决定发到哪个shard:
return MathUtils.mod(hash, indexMetaData.getNumberOfShards()); 注意:最新版ES代码已经改变!
即用hash对shard的总数求模来获取shardId,将shardId作为key,通过遍历的index和request组成BulkItemRequest的集合作为value放入之前说的map中(为什么要拿到遍历的index,因为在bulk response中可以看到对每个request的请求处理结果的),其实说了这么多就是要对request按shard来分组(为负载均衡)。
3、遍历上面得到的map,对不同的分组创建一个bulkShardRequest,包含配置consistencyLevel和timeout。并从集群state中获得primary shard,如果primary在本机就直接执行,如果不在会再发送到其shard所在的node。
源码位置:https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses ) {final ClusterState clusterState = clusterService.state();// TODO use timeout to wait here if its blocked... clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);MetaData metaData = clusterState.metaData();for (int i = 0; i < bulkRequest.requests.size(); i++) {DocWriteRequest docWriteRequest = bulkRequest.requests.get(i);//the request can only be null because we set it to null in the previous step, so it gets ignoredif (docWriteRequest == null) {continue;}if (addFailureIfIndexIsUnavailable(docWriteRequest, bulkRequest, responses, i, concreteIndices, metaData)) {continue;}Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);try {switch (docWriteRequest.opType()) {case CREATE:case INDEX:IndexRequest indexRequest = (IndexRequest) docWriteRequest;MappingMetaData mappingMd = null;final IndexMetaData indexMetaData = metaData.index(concreteIndex);if (indexMetaData != null) {mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());}indexRequest.resolveRouting(metaData);indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName());break;case UPDATE:TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);break;case DELETE:TransportDeleteAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (DeleteRequest) docWriteRequest);break;default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");}} catch (ElasticsearchParseException | RoutingMissingException e) {BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id(), e);BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);responses.set(i, bulkItemResponse);// make sure the request gets never processed againbulkRequest.requests.set(i, null);}}// first, go over all the requests and create a ShardId -> Operations mappingMap<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();for (int i = 0; i < bulkRequest.requests.size(); i++) {DocWriteRequest request = bulkRequest.requests.get(i);if (request == null) {continue;}String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());shardRequests.add(new BulkItemRequest(i, request));}if (requestsByShard.isEmpty()) {listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));return;}final AtomicInteger counter = new AtomicInteger(requestsByShard.size());String nodeId = clusterService.localNode().getId();for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {final ShardId shardId = entry.getKey();final List<BulkItemRequest> requests = entry.getValue();BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),requests.toArray(new BulkItemRequest[requests.size()]));bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());bulkShardRequest.timeout(bulkRequest.timeout());if (task != null) {bulkShardRequest.setParentTask(nodeId, task.getId());}shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {@Overridepublic void onResponse(BulkShardResponse bulkShardResponse) {for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {// we may have no response if item failedif (bulkItemResponse.getResponse() != null) {bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());}responses.set(bulkItemResponse.getItemId(), bulkItemResponse);}if (counter.decrementAndGet() == 0) {finishHim();}}});}}
路由代码:
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
转载于:https://www.cnblogs.com/bonelee/p/6078947.html
ES bulk源码分析——ES 5.0相关推荐
- 【es】es API源码分析
1.概述 转载:[6]elasticsearch源码深入分析--API源码分析 2.RestController的继承关系 从Node实例化的过程中,我们知道ActionModule是Node提供Re ...
- vuex 源码分析_Vuex 2.0 源码分析(下)
大家好,我叫黄轶,来自滴滴公共前端团队,最近在幕课网上线了一门 Vue.js 的实战课程--<Vue.js高仿饿了么外卖App 2016最火前端框架>,同时,我们团队最近写了一本书 --& ...
- Android 框架学习2:源码分析 EventBus 3.0 如何实现事件总线
Go beyond yourself rather than beyond others. 上篇文章 深入理解 EventBus 3.0 之使用篇 我们了解了 EventBus 的特性以及如何使用,这 ...
- Linux0.1源码分析,《Linux 0.01内核分析与操作系统设计——创造你自己的操作系统》...
该楼层疑似违规已被系统折叠 隐藏此楼查看此楼 readme 本光盘为<Linux 0.0.1内核分析与操作系统设计--创造你自己的操作系统>一书的配套光盘,读者可自由取其源文件做学习,研究 ...
- elasticsearch index、create和update的源码分析
https://segmentfault.com/a/1190000011272749 社区里面有人问了如下一个问题: 执行 bulk 索引文档的时候,用 index 或者 create 类型并且自定 ...
- 一步步实现windows版ijkplayer系列文章之三——Ijkplayer播放器源码分析之音视频输出——音频篇
https://www.cnblogs.com/harlanc/p/9693983.html 目录 OpenSL ES & AudioTrack 源码分析 创建播放器音频输出对象 配置并创建音 ...
- Redis源码分析:基础概念介绍与启动概述
Redis源码分析 基于Redis-5.0.4版本,进行基础的源码分析,主要就是分析一些平常使用过程中的内容.仅作为相关内容的学习记录,有关Redis源码学习阅读比较广泛的便是<Redis设计与 ...
- jQuery源码分析系列
声明:本文为原创文章,如需转载,请注明来源并保留原文链接Aaron,谢谢! 版本截止到2013.8.24 jQuery官方发布最新的的2.0.3为准 附上每一章的源码注释分析 :https://git ...
- Storm源码分析之四: Trident源码分析
Storm源码分析之四: Trident源码分析 @(STORM)[storm] Storm源码分析之四 Trident源码分析 一概述 0小结 1简介 2关键类 1Spout的创建 2spout的消 ...
最新文章
- ASP.NET中相对路径的使用总结
- QT的QVarLengthArray类的使用
- 数组的最长平台c语言,2010台湾省C语言版高级
- vue-cli 中使用 less 插件
- html 和jsp 引入jquery_不用jsp怎么实现前后端交互?给萌新后端的ajax教程(1)
- jwt同一会话_在会话中使用JWT
- ROS下多个kinect在一台电脑上同时运行
- getHibernateTemplate()为NUll
- pytorch中RNN注意事项(关于input和output维度)
- 【BZOJ4503】两个串(FFT)
- php写的仿爱帮网电话号码字符串处理(防采集)。
- java课题设计实验报告,JAVA简单记事本程序设计实验报告
- 数据结构之红黑树插入案例详解
- Sugar BI数据可视化图表标注
- 2022年起重机司机(限桥式起重机)考试模拟100题及答案
- SQL Server 2016 Always Encrypted(始终加密)
- 用于单图像超分辨率的增强深度残差网络
- SpringCloudGateway路由定义存至Mysql数据库
- openwrt查看linux内核,OpenWrt教程-如何升级linux内核版本
- ibm服务器报错代码大全_IBM x系列服务器报错代码
热门文章
- 移动端HTML5video视频播放优化实践
- python数组每个元素加1_python-根据键转换numpy数组中的每个元素
- python最流行的框架_2020年最流行Python web开发框架(下)
- python贪吃蛇_如何用Python写一个贪吃蛇?
- 使用商业模式的九大模块去分析京东的商业模式_商业模式—筑基篇(1)
- opencv 安装_如何在 CentOS 8 上安装 OpenCV
- 怎么判断网络回路_收藏|电梯安全回路分析说明及故障判断
- catia过载属性使用方法_catia简明操作手册
- python开发框架 代码生成_我的第一个python web开发框架(28)——定制ORM(四)...
- 【机器学习入门到精通系列】应用机器学习的建议(方差和偏差)