分析入口 
ignite.compute().call(IgniteCallable<R> job)
1, IgniteComputeImpl.callAsync0
2, GridClosureProcessor.callAsync
3, GridTaskProcessor.execute -> startTask 这个方法比较长 前面大部分处理 taskClass, deploy
分析对客户端运算类的处理
execute执行的时候,mode与job封装进T7, T7 继承 GridPeerDeployAwareTaskAdapter , 实现了 ComputeTask (map,reduce)
GridClosureProcessorctx.task().execute(new T7<>(mode, job), null, sys, execName);

T7实现了ComputeTask的map reduce方法

@Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {//这个方法的目的是根据调用方式是rebalance还是broadcast,将job与要执行的节点node匹配上return outMap(t.get1(), F.asList(t.get2()), subgrid, lb);}
private <T, R> ComputeTaskInternalFuture<R> startTask(@Nullable String taskName,@Nullable Class<?> taskCls,@Nullable ComputeTask<T, R> task,IgniteUuid sesId,@Nullable T arg,boolean sys,@Nullable String execName)

startTask 的第三个参数传入 T7,前两个参数传入的是空, 在方法前半段,对taskName,taskClass做了初始化,来源于task

4, 创建TaskSession 
// Creates task session with task name and task version.GridTaskSessionImpl ses = ctx.session().createTaskSession(sesId,ctx.localNodeId(),taskName,dep,taskCls == null ? null : taskCls.getName(),top,topPred,startTime,endTime,Collections.<ComputeJobSibling>emptyList(),Collections.emptyMap(),fullSup,internal,subjId,execName);
   5.创建 GridTaskWorker
 GridTaskWorker<?, ?> taskWorker = new GridTaskWorker<>(ctx,arg,ses,fut,taskCls,task,dep,new TaskEventListener(),map,subjId);

6. 调用 taskWorker.run(); 启动 job

下面进入 GridTaskWorker 看看run的动作

run方法属于 抽象类 GridWorker 中的方法,里面调用  body()

如下是GridTaskWorker里面的 body方法实现

 @Override protected void body() {evtLsnr.onTaskStarted(this);try {// Use either user task or deployed one.if (task == null) {assert taskCls != null;assert ComputeTask.class.isAssignableFrom(taskCls);try {task = newTask((Class<? extends ComputeTask<T, R>>)taskCls);}catch (IgniteCheckedException e) {// If cannot instantiate task, then assign internal flag based// on information available.internal = dep.internalTask(null, taskCls);recordTaskEvent(EVT_TASK_STARTED, "Task started.");throw e;}}internal = ses.isInternal();recordTaskEvent(EVT_TASK_STARTED, "Task started.");initializeSpis();ses.setClassLoader(dep.classLoader());// Nodes are ignored by affinity tasks.final List<ClusterNode> shuffledNodes =affCacheIds == null ? getTaskTopology() : Collections.<ClusterNode>emptyList();// Load balancer.ComputeLoadBalancer balancer = ctx.loadBalancing().getLoadBalancer(ses, shuffledNodes);continuous = ctx.resource().isAnnotationPresent(dep, task, TaskContinuousMapperResource.class);if (log.isDebugEnabled())log.debug("Injected task resources [continuous=" + continuous + ']');// Inject resources.ctx.resource().inject(dep, task, ses, balancer, mapper);Map<? extends ComputeJob, ClusterNode> mappedJobs = U.wrapThreadLoader(dep.classLoader(),new Callable<Map<? extends ComputeJob, ClusterNode>>() {@Override public Map<? extends ComputeJob, ClusterNode> call() {return task.map(shuffledNodes, arg);}});if (log.isDebugEnabled())log.debug("Mapped task jobs to nodes [jobCnt=" + (mappedJobs != null ? mappedJobs.size() : 0) +", mappedJobs=" + mappedJobs + ", ses=" + ses + ']');if (F.isEmpty(mappedJobs)) {synchronized (mux) {// Check if some jobs are sent from continuous mapper.if (F.isEmpty(jobRes))throw new IgniteCheckedException("Task map operation produced no mapped jobs: " + ses);}}elseprocessMappedJobs(mappedJobs);synchronized (mux) {lockRespProc = false;}processDelayedResponses();}catch (ClusterGroupEmptyCheckedException e) {U.warn(log, "Failed to map task jobs to nodes (topology projection is empty): " + ses);finishTask(null, e);}catch (IgniteException | IgniteCheckedException e) {if (!fut.isCancelled()) {if (!(e instanceof VisorClusterGroupEmptyException))U.error(log, "Failed to map task jobs to nodes: " + ses, e);finishTask(null, e);}else if (log.isDebugEnabled())log.debug("Failed to map task jobs to nodes due to task cancellation: " + ses);}// Catch throwable to protect against bad user code.catch (Throwable e) {String errMsg = "Failed to map task jobs to nodes due to undeclared user exception" +" [cause=" + e.getMessage() + ", ses=" + ses + "]";U.error(log, errMsg, e);finishTask(null, new ComputeUserUndeclaredException(errMsg, e));if (e instanceof Error)throw e;}}

核心方法  1 , processMappedJobs(mappedJobs);

这个方法是发送job,如果job是本地类型,则本地执行,如果是远程的,发送到topic

下面的是直接调用了, 返回了结果集, 这个结果集是封装好的ComputeJob

 Map<? extends ComputeJob, ClusterNode> mappedJobs = U.wrapThreadLoader(dep.classLoader(),new Callable<Map<? extends ComputeJob, ClusterNode>>() {@Override public Map<? extends ComputeJob, ClusterNode> call() {return task.map(shuffledNodes, arg);}});

针对映射好的job->node封装进  GridJobResultImpl , ComputeJob的实现类 C2

jobResList.add(new GridJobResultImpl(job, jobId, node, sib));

然后调用sendRequest方法 , 发送请求

 /*** @param res Job result.*/private void sendRequest(ComputeJobResult res) {assert res != null;GridJobExecuteRequest req = null;ClusterNode node = res.getNode();try {ClusterNode curNode = ctx.discovery().node(node.id());// Check if node exists prior to sending to avoid cases when a discovery// listener notified about node leaving after topology resolution. Note// that we make this check because we cannot count on exception being// thrown in case of send failure.if (curNode == null) {U.warn(log, "Failed to send job request because remote node left grid (if fail-over is enabled, " +"will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() +", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');ctx.resource().invokeAnnotated(dep, res.getJob(), ComputeJobAfterSend.class);GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(),res.getJobContext().getJobId(), null, null, null, null, null, null, false, null);fakeRes.setFakeException(new ClusterTopologyException("Failed to send job due to node failure: " + node));onResponse(fakeRes);}else {long timeout = ses.getEndTime() == Long.MAX_VALUE ? Long.MAX_VALUE :ses.getEndTime() - U.currentTimeMillis();if (timeout > 0) {boolean loc = node.id().equals(ctx.discovery().localNode().id()) &&!ctx.config().isMarshalLocalJobs();Map<Object, Object> sesAttrs = ses.isFullSupport() ? ses.getAttributes() : null;Map<? extends Serializable, ? extends Serializable> jobAttrs =(Map<? extends Serializable, ? extends Serializable>)res.getJobContext().getAttributes();boolean forceLocDep = internal || !ctx.deploy().enabled();try {MarshallerUtils.jobReceiverVersion(node.version());req = new GridJobExecuteRequest(ses.getId(),res.getJobContext().getJobId(),ses.getTaskName(),ses.getUserVersion(),ses.getTaskClassName(),loc ? null : U.marshal(marsh, res.getJob()),loc ? res.getJob() : null,ses.getStartTime(),timeout,ses.getTopology(),loc ? ses.getTopologyPredicate() : null,loc ? null : U.marshal(marsh, ses.getTopologyPredicate()),loc ? null : U.marshal(marsh, ses.getJobSiblings()),loc ? ses.getJobSiblings() : null,loc ? null : U.marshal(marsh, sesAttrs),loc ? sesAttrs : null,loc ? null : U.marshal(marsh, jobAttrs),loc ? jobAttrs : null,ses.getCheckpointSpi(),dep.classLoaderId(),dep.deployMode(),continuous,dep.participants(),forceLocDep,ses.isFullSupport(),internal,subjId,affCacheIds,affPartId,mapTopVer,ses.executorName());}finally {MarshallerUtils.jobReceiverVersion(null);}//如果是本地调用,则直接执行if (loc)ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req);
//远程调用,则发送到GridTopicelse {byte plc;if (internal)plc = MANAGEMENT_POOL;else {Byte ctxPlc = getThreadContext(TC_IO_POLICY);if (ctxPlc != null)plc = ctxPlc;elseplc = PUBLIC_POOL;}// Send job execution request.ctx.io().sendToGridTopic(node, TOPIC_JOB, req, plc);if (log.isDebugEnabled())log.debug("Sent job request [req=" + req + ", node=" + node + ']');}if (!loc)ctx.resource().invokeAnnotated(dep, res.getJob(), ComputeJobAfterSend.class);}elseU.warn(log, "Job timed out prior to sending job execution request: " + res.getJob());}}catch (IgniteCheckedException e) {IgniteException fakeErr = null;try {boolean deadNode = e instanceof ClusterTopologyCheckedException || isDeadNode(res.getNode().id());// Avoid stack trace if node has left grid.if (deadNode) {U.warn(log, "Failed to send job request because remote node left grid (if failover is enabled, " +"will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() +", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');fakeErr = new ClusterTopologyException("Failed to send job due to node failure: " + node, e);}elseU.error(log, "Failed to send job request: " + req, e);}catch (IgniteClientDisconnectedCheckedException e0) {if (log.isDebugEnabled())log.debug("Failed to send job request, client disconnected [node=" + node +", taskName=" + ses.getTaskName() + ", taskSesId=" + ses.getId() + ", jobSesId=" +res.getJobContext().getJobId() + ']');fakeErr = U.convertException(e0);}GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(),res.getJobContext().getJobId(), null, null, null, null, null, null, false, null);if (fakeErr == null)fakeErr = U.convertException(e);fakeRes.setFakeException(fakeErr);onResponse(fakeRes);}}

如果是发送本地的,则新建  GridJobWorker , 然后执行初始化 , run -> body() -> execute0()

res = U.wrapThreadLoader(dep.classLoader(), new Callable<Object>() {@Nullable @Override public Object call() {try {if (internal && ctx.config().isPeerClassLoadingEnabled())ctx.job().internal(true);return job.execute();}finally {if (internal && ctx.config().isPeerClassLoadingEnabled())ctx.job().internal(false);}}});

最终执行的地方在这里, 调用了job的execute()方法, 而job 是 ComputeJob 的实现类 , 是GridJobExecuteRequest 中的getJob

再次回到 sendRequest , 追根溯源 ,定义GridJobExecuteRequest  的地方 , 第7个参数是 ComputeJob, 追到 processMappedJobs

查看上面写的 U.wrapThreadLoader, 这个方法将task转成了computeJob, 实现类 C2, 而C2里面的execute方法如下

/** {@inheritDoc} */@Override public Object execute() {try {return c.call();}catch (Exception e) {throw new IgniteException(e);}}

也就是说,本地执行,就是调用callable里面的call方法

如果是发送到远程服务器 则调用

// Send job execution request.
 ctx.io().sendToGridTopic(node, TOPIC_JOB, req, plc);

 /*** @param node Destination node.* @param topic Topic to send the message to.* @param topicOrd GridTopic enumeration ordinal.* @param msg Message to send.* @param plc Type of processing.* @param ordered Ordered flag.* @param timeout Timeout.* @param skipOnTimeout Whether message can be skipped on timeout.* @param ackC Ack closure.* @param async If {@code true} message for local node will be processed in pool, otherwise in current thread.* @throws IgniteCheckedException Thrown in case of any errors.*/private void send(ClusterNode node,Object topic,int topicOrd,Message msg,byte plc,boolean ordered,long timeout,boolean skipOnTimeout,IgniteInClosure<IgniteException> ackC,boolean async) throws IgniteCheckedException {assert node != null;assert topic != null;assert msg != null;assert !async || msg instanceof GridIoUserMessage : msg; // Async execution was added only for IgniteMessaging.assert topicOrd >= 0 || !(topic instanceof GridTopic) : msg;GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);if (locNodeId.equals(node.id())) {assert plc != P2P_POOL;CommunicationListener commLsnr = this.commLsnr;if (commLsnr == null)throw new IgniteCheckedException("Trying to send message when grid is not fully started.");if (ordered)processOrderedMessage(locNodeId, ioMsg, plc, null);else if (async)processRegularMessage(locNodeId, ioMsg, plc, NOOP);elseprocessRegularMessage0(ioMsg, locNodeId);if (ackC != null)ackC.apply(null);}else {if (topicOrd < 0)ioMsg.topicBytes(U.marshal(marsh, topic));try {if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi)((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackC);elsegetSpi().sendMessage(node, ioMsg);}catch (IgniteSpiException e) {if (e.getCause() instanceof ClusterTopologyCheckedException)throw (ClusterTopologyCheckedException)e.getCause();if (!ctx.discovery().alive(node))throw new ClusterTopologyCheckedException("Failed to send message, node left: " + node.id(), e);throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +"TCP connection cannot be established due to firewall issues) " +"[node=" + node + ", topic=" + topic +", msg=" + msg + ", policy=" + plc + ']', e);}}}

发送数据使用了类 GridTcpNioCommunicationClient, 实现接口 GridCommunicationClient, 此接口还有一个实现类

GridShmemCommunicationClient

核心方法2 processDelayedResponses 这个方法debug了一下,发现没有什么用处,从队列里拿到的数据是null

这个方法从队列 delayedRess (ConcurrentLinkedDeque)中获取返回数据

GridJobExecuteResponse res = delayedRess.poll();

poll()方法表示,如果队列是空,则返回null, 那么要想取到数据,就必然要确保delayedRess队列有值, 下面分析一下这个response是在何时放入这个队列的

找到位置 , 通过debug , 发现 processMappedJobs 这个方法最后的 processDelayedResponses();
是有作用的,结果就是通过这个方法拿到的.

下面重新梳理一下整个调用过程,及结果的获取

IgniteComputeImpl -> call(IgniteCallable<R> job) 有返回值
IgniteComputeImpl -> callAsync0(IgniteCallable<R> job) 有返回值 IgniteInternalFuture<R> 
GridClosureProcessor-> callAsync 返回 ComputeTaskInternalFuture
GridTaskProcessor -> ComputeTaskInternalFuture<R> execute
GridTaskProcessor -> startTask 返回 ComputeTaskInternalFuture  这个是最后的返回, 里面执行的是taskWorker.run()

也就是说在执行startTask的时候,后面的返回结果会放进 ComputeTaskInternalFuture ,异步等待结果

在定义GridTaskWorker的时候, ComputeTaskInternalFuture的实例放进了 GridTaskWorker 中

ComputeTaskInternalFuture 的get方法会判断状态,当状态是空,或者不是错误的时候,返回state,这个state就是结果

在onResponse方法中

下面的代码就是反序列化结果的地方

 Object res0 = loc ? res.getJobResult() : U.unmarshal(marsh, res.getJobResultBytes(),U.resolveClassLoader(clsLdr, ctx.config()));

在上一篇文章中 https://mp.csdn.net/postedit/89333644

报错 Cluster group is empty 就是因为这个返回结果中的list显示9个,但实际只有一个,其余是空

继续在上一篇中分析问题的原因.这篇只分析调用过程

Ignite GridTaskWorker 执行分析相关推荐

  1. 【Windows 逆向】CheatEngine 工具 ( 汉化版 CE 工具推荐 | 编写简单 C++ 程序 | C++ 程序执行分析 | 使用 CE 修改上述 C++ 程序 )

    文章目录 一.汉化版 CE 工具推荐 二.编写简单 C++ 程序 三.C++ 程序执行分析 四.使用 CE 修改上述 C++ 程序 一.汉化版 CE 工具推荐 推荐一个汉化版的 CE 工具 : htt ...

  2. MyBatisPlus插件扩展_SqlExplainInterceptor执行分析插件的使用

    场景 项目搭建专栏: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/column/info/37194 简介 SQL 执行分析拦截器[ 目前只支持 MYSQL-5 ...

  3. 2022-08-12 mysql/stonedb-Q16-并行执行分析

    摘要: mysql/stonedb-Q16-并行执行分析 问题记录: 并行执行无法获取数据: (gdb) bt #0 0x0000000002d51ca8 in Tianmu::core::Filte ...

  4. 墨者学院12 命令注入执行分析

    问题描述 题目链接:命令注入执行分析 背景介绍:某单位IT运维人员,在服务器上留了一个页面用来ping内部服务器连通情况.安全工程师"墨者"检查发现这一文件存在漏洞,要求运维人员立 ...

  5. Android Adobe Reader 任意代码执行分析(附POC)

    livers · 2014/04/18 14:05 0x00 描述 前几天老外在fd还有exploit-db上,公布了Adobe Reader任意代码执行的漏洞. 漏洞编号: CVE: 2014-05 ...

  6. 远程过程调用失败0x800706be_WordPress5.0 远程代码执行分析

    本文作者:七月火 2019年2月19日,RIPS 团队官方博客放出 WordPress5.0.0 RCE 漏洞详情,漏洞利用比较有趣,但其中多处细节部分并未放出,特别是其中利用到的 LFI 并未指明, ...

  7. 操作数栈的字节码指令执行分析

    一:写个demo 二:点击Recompile编译 三:使用javap命令反编译class文件        javap -v 类名.class 四:查看结果 五:分析执行过程 首先bipush意思是把 ...

  8. mysql malloc lib_CVE-2016-6662-MySQL ‘malloc_lib’变量重写命令执行分析 | CN-SEC 中文网...

    摘要 今天有个关于MySQL的漏洞被披露出来,编号CVE-2016-6662.该漏洞主要涉及到 mysqld_safe 脚本中在加速/处理内存时会采用 "malloc_lib"变量 ...

  9. DiscuzX系列命令执行分析公开(三连弹)

    tang3 · 2015/01/15 18:55 0x00 漏洞概要 昨天360补天发了这样的一条微博: 然后打听了一下细节,发现居然是我13年7月报给TSRC的漏洞,看今天大家玩的挺开心,与TSRC ...

最新文章

  1. 白雪 | NLP加持知识图谱在金融事件挖掘中的应用
  2. EasyUI Combobox 设置默认值
  3. [leetcode]227. 基本计算器 II
  4. 国网英语计算机职称考试技巧,计算机职称考试通关的三大技巧
  5. StreamInsight查询系列汇总
  6. Linux vsftpd配置大全
  7. Excel中纵向查找函数-VLOOKUP函数
  8. php 手机版 答题系统,基于ThinkPHP框架开发的驾考在线答题系统_WAP手机自适应界面+手机在线驾考宝典答题系统...
  9. TAGE Predictor
  10. React-router 嵌套路由传值(render和children)
  11. 迷室3第三章难点问题解读
  12. python羊车门问题
  13. Lodash的一些基本使用
  14. 机器人制证系统大屏可视化
  15. matlab上万大型矩阵求逆,要好好总结一下超大矩阵求逆的技巧了
  16. 站住,Maven依赖的scope作用域,还记得几个?
  17. java 线程 组成_java多线程
  18. 【深度优先搜索】弹珠游戏
  19. 第十一届“泰迪杯”数据挖掘挑战赛赛前指导安排
  20. DDoS攻击详解ufonet、Mirai分布式拒绝服务攻击工具的介绍

热门文章

  1. JavaScript 个人笔记3(详细BOMDOM)
  2. FlashFXP,flashfxp怎么上传文件
  3. 小白前端自制红警登陆界面(试水前端)
  4. saltstack自动化运维部署--安装apache\原码安装nginx服务
  5. Apple Watch开发
  6. 睢宁县微服务平台_微服务(Weifuwu)国内微信公众服务平台
  7. ubuntu 配置 vino-server
  8. 真值的原码补码和反码
  9. Table表格的一些记录
  10. Block学习-关于Block是如何实现的,以及block中参数传递