ignite.compute().call(IgniteCallable<R> job)
1, IgniteComputeImpl.callAsync0
2, GridClosureProcessor.callAsync
3, GridTaskProcessor.execute -> startTask 这个方法比较长 前面大部分处理 taskClass, deploy
execute执行的时候,mode与job封装进T7, T7 继承 GridPeerDeployAwareTaskAdapter , 实现了 ComputeTask (map,reduce)
GridClosureProcessorctx.task().execute(new T7<>(mode, job), null, sys, execName);

T7实现了ComputeTask的map reduce方法

@Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {//这个方法的目的是根据调用方式是rebalance还是broadcast,将job与要执行的节点node匹配上return outMap(t.get1(), F.asList(t.get2()), subgrid, lb);}
private <T, R> ComputeTaskInternalFuture<R> startTask(@Nullable String taskName,@Nullable Class<?> taskCls,@Nullable ComputeTask<T, R> task,IgniteUuid sesId,@Nullable T arg,boolean sys,@Nullable String execName)

startTask 的第三个参数传入 T7,前两个参数传入的是空, 在方法前半段,对taskName,taskClass做了初始化,来源于task

4, 创建TaskSession 
// Creates task session with task name and task version.GridTaskSessionImpl ses = ctx.session().createTaskSession(sesId,ctx.localNodeId(),taskName,dep,taskCls == null ? null : taskCls.getName(),top,topPred,startTime,endTime,Collections.<ComputeJobSibling>emptyList(),Collections.emptyMap(),fullSup,internal,subjId,execName);
   5.创建 GridTaskWorker
 GridTaskWorker<?, ?> taskWorker = new GridTaskWorker<>(ctx,arg,ses,fut,taskCls,task,dep,new TaskEventListener(),map,subjId);

6. 调用 taskWorker.run(); 启动 job

下面进入 GridTaskWorker 看看run的动作

run方法属于 抽象类 GridWorker 中的方法,里面调用  body()

如下是GridTaskWorker里面的 body方法实现

 @Override protected void body() {evtLsnr.onTaskStarted(this);try {// Use either user task or deployed one.if (task == null) {assert taskCls != null;assert ComputeTask.class.isAssignableFrom(taskCls);try {task = newTask((Class<? extends ComputeTask<T, R>>)taskCls);}catch (IgniteCheckedException e) {// If cannot instantiate task, then assign internal flag based// on information available.internal = dep.internalTask(null, taskCls);recordTaskEvent(EVT_TASK_STARTED, "Task started.");throw e;}}internal = ses.isInternal();recordTaskEvent(EVT_TASK_STARTED, "Task started.");initializeSpis();ses.setClassLoader(dep.classLoader());// Nodes are ignored by affinity tasks.final List<ClusterNode> shuffledNodes =affCacheIds == null ? getTaskTopology() : Collections.<ClusterNode>emptyList();// Load balancer.ComputeLoadBalancer balancer = ctx.loadBalancing().getLoadBalancer(ses, shuffledNodes);continuous = ctx.resource().isAnnotationPresent(dep, task, TaskContinuousMapperResource.class);if (log.isDebugEnabled())log.debug("Injected task resources [continuous=" + continuous + ']');// Inject resources.ctx.resource().inject(dep, task, ses, balancer, mapper);Map<? extends ComputeJob, ClusterNode> mappedJobs = U.wrapThreadLoader(dep.classLoader(),new Callable<Map<? extends ComputeJob, ClusterNode>>() {@Override public Map<? extends ComputeJob, ClusterNode> call() {return task.map(shuffledNodes, arg);}});if (log.isDebugEnabled())log.debug("Mapped task jobs to nodes [jobCnt=" + (mappedJobs != null ? mappedJobs.size() : 0) +", mappedJobs=" + mappedJobs + ", ses=" + ses + ']');if (F.isEmpty(mappedJobs)) {synchronized (mux) {// Check if some jobs are sent from continuous mapper.if (F.isEmpty(jobRes))throw new IgniteCheckedException("Task map operation produced no mapped jobs: " + ses);}}elseprocessMappedJobs(mappedJobs);synchronized (mux) {lockRespProc = false;}processDelayedResponses();}catch (ClusterGroupEmptyCheckedException e) {U.warn(log, "Failed to map task jobs to nodes (topology projection is empty): " + ses);finishTask(null, e);}catch (IgniteException | IgniteCheckedException e) {if (!fut.isCancelled()) {if (!(e instanceof VisorClusterGroupEmptyException))U.error(log, "Failed to map task jobs to nodes: " + ses, e);finishTask(null, e);}else if (log.isDebugEnabled())log.debug("Failed to map task jobs to nodes due to task cancellation: " + ses);}// Catch throwable to protect against bad user code.catch (Throwable e) {String errMsg = "Failed to map task jobs to nodes due to undeclared user exception" +" [cause=" + e.getMessage() + ", ses=" + ses + "]";U.error(log, errMsg, e);finishTask(null, new ComputeUserUndeclaredException(errMsg, e));if (e instanceof Error)throw e;}}

核心方法  1 , processMappedJobs(mappedJobs);


下面的是直接调用了, 返回了结果集, 这个结果集是封装好的ComputeJob

 Map<? extends ComputeJob, ClusterNode> mappedJobs = U.wrapThreadLoader(dep.classLoader(),new Callable<Map<? extends ComputeJob, ClusterNode>>() {@Override public Map<? extends ComputeJob, ClusterNode> call() {return task.map(shuffledNodes, arg);}});

针对映射好的job->node封装进  GridJobResultImpl , ComputeJob的实现类 C2

jobResList.add(new GridJobResultImpl(job, jobId, node, sib));

然后调用sendRequest方法 , 发送请求

 /*** @param res Job result.*/private void sendRequest(ComputeJobResult res) {assert res != null;GridJobExecuteRequest req = null;ClusterNode node = res.getNode();try {ClusterNode curNode = ctx.discovery().node(node.id());// Check if node exists prior to sending to avoid cases when a discovery// listener notified about node leaving after topology resolution. Note// that we make this check because we cannot count on exception being// thrown in case of send failure.if (curNode == null) {U.warn(log, "Failed to send job request because remote node left grid (if fail-over is enabled, " +"will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() +", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');ctx.resource().invokeAnnotated(dep, res.getJob(), ComputeJobAfterSend.class);GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(),res.getJobContext().getJobId(), null, null, null, null, null, null, false, null);fakeRes.setFakeException(new ClusterTopologyException("Failed to send job due to node failure: " + node));onResponse(fakeRes);}else {long timeout = ses.getEndTime() == Long.MAX_VALUE ? Long.MAX_VALUE :ses.getEndTime() - U.currentTimeMillis();if (timeout > 0) {boolean loc = node.id().equals(ctx.discovery().localNode().id()) &&!ctx.config().isMarshalLocalJobs();Map<Object, Object> sesAttrs = ses.isFullSupport() ? ses.getAttributes() : null;Map<? extends Serializable, ? extends Serializable> jobAttrs =(Map<? extends Serializable, ? extends Serializable>)res.getJobContext().getAttributes();boolean forceLocDep = internal || !ctx.deploy().enabled();try {MarshallerUtils.jobReceiverVersion(node.version());req = new GridJobExecuteRequest(ses.getId(),res.getJobContext().getJobId(),ses.getTaskName(),ses.getUserVersion(),ses.getTaskClassName(),loc ? null : U.marshal(marsh, res.getJob()),loc ? res.getJob() : null,ses.getStartTime(),timeout,ses.getTopology(),loc ? ses.getTopologyPredicate() : null,loc ? null : U.marshal(marsh, ses.getTopologyPredicate()),loc ? null : U.marshal(marsh, ses.getJobSiblings()),loc ? ses.getJobSiblings() : null,loc ? null : U.marshal(marsh, sesAttrs),loc ? sesAttrs : null,loc ? null : U.marshal(marsh, jobAttrs),loc ? jobAttrs : null,ses.getCheckpointSpi(),dep.classLoaderId(),dep.deployMode(),continuous,dep.participants(),forceLocDep,ses.isFullSupport(),internal,subjId,affCacheIds,affPartId,mapTopVer,ses.executorName());}finally {MarshallerUtils.jobReceiverVersion(null);}//如果是本地调用,则直接执行if (loc)ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req);
//远程调用,则发送到GridTopicelse {byte plc;if (internal)plc = MANAGEMENT_POOL;else {Byte ctxPlc = getThreadContext(TC_IO_POLICY);if (ctxPlc != null)plc = ctxPlc;elseplc = PUBLIC_POOL;}// Send job execution request.ctx.io().sendToGridTopic(node, TOPIC_JOB, req, plc);if (log.isDebugEnabled())log.debug("Sent job request [req=" + req + ", node=" + node + ']');}if (!loc)ctx.resource().invokeAnnotated(dep, res.getJob(), ComputeJobAfterSend.class);}elseU.warn(log, "Job timed out prior to sending job execution request: " + res.getJob());}}catch (IgniteCheckedException e) {IgniteException fakeErr = null;try {boolean deadNode = e instanceof ClusterTopologyCheckedException || isDeadNode(res.getNode().id());// Avoid stack trace if node has left grid.if (deadNode) {U.warn(log, "Failed to send job request because remote node left grid (if failover is enabled, " +"will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() +", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');fakeErr = new ClusterTopologyException("Failed to send job due to node failure: " + node, e);}elseU.error(log, "Failed to send job request: " + req, e);}catch (IgniteClientDisconnectedCheckedException e0) {if (log.isDebugEnabled())log.debug("Failed to send job request, client disconnected [node=" + node +", taskName=" + ses.getTaskName() + ", taskSesId=" + ses.getId() + ", jobSesId=" +res.getJobContext().getJobId() + ']');fakeErr = U.convertException(e0);}GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(),res.getJobContext().getJobId(), null, null, null, null, null, null, false, null);if (fakeErr == null)fakeErr = U.convertException(e);fakeRes.setFakeException(fakeErr);onResponse(fakeRes);}}

如果是发送本地的,则新建  GridJobWorker , 然后执行初始化 , run -> body() -> execute0()

res = U.wrapThreadLoader(dep.classLoader(), new Callable<Object>() {@Nullable @Override public Object call() {try {if (internal && ctx.config().isPeerClassLoadingEnabled())ctx.job().internal(true);return job.execute();}finally {if (internal && ctx.config().isPeerClassLoadingEnabled())ctx.job().internal(false);}}});

最终执行的地方在这里, 调用了job的execute()方法, 而job 是 ComputeJob 的实现类 , 是GridJobExecuteRequest 中的getJob

再次回到 sendRequest , 追根溯源 ,定义GridJobExecuteRequest  的地方 , 第7个参数是 ComputeJob, 追到 processMappedJobs

查看上面写的 U.wrapThreadLoader, 这个方法将task转成了computeJob, 实现类 C2, 而C2里面的execute方法如下

/** {@inheritDoc} */@Override public Object execute() {try {return c.call();}catch (Exception e) {throw new IgniteException(e);}}


如果是发送到远程服务器 则调用

// Send job execution request.
 ctx.io().sendToGridTopic(node, TOPIC_JOB, req, plc);

 /*** @param node Destination node.* @param topic Topic to send the message to.* @param topicOrd GridTopic enumeration ordinal.* @param msg Message to send.* @param plc Type of processing.* @param ordered Ordered flag.* @param timeout Timeout.* @param skipOnTimeout Whether message can be skipped on timeout.* @param ackC Ack closure.* @param async If {@code true} message for local node will be processed in pool, otherwise in current thread.* @throws IgniteCheckedException Thrown in case of any errors.*/private void send(ClusterNode node,Object topic,int topicOrd,Message msg,byte plc,boolean ordered,long timeout,boolean skipOnTimeout,IgniteInClosure<IgniteException> ackC,boolean async) throws IgniteCheckedException {assert node != null;assert topic != null;assert msg != null;assert !async || msg instanceof GridIoUserMessage : msg; // Async execution was added only for IgniteMessaging.assert topicOrd >= 0 || !(topic instanceof GridTopic) : msg;GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);if (locNodeId.equals(node.id())) {assert plc != P2P_POOL;CommunicationListener commLsnr = this.commLsnr;if (commLsnr == null)throw new IgniteCheckedException("Trying to send message when grid is not fully started.");if (ordered)processOrderedMessage(locNodeId, ioMsg, plc, null);else if (async)processRegularMessage(locNodeId, ioMsg, plc, NOOP);elseprocessRegularMessage0(ioMsg, locNodeId);if (ackC != null)ackC.apply(null);}else {if (topicOrd < 0)ioMsg.topicBytes(U.marshal(marsh, topic));try {if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi)((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackC);elsegetSpi().sendMessage(node, ioMsg);}catch (IgniteSpiException e) {if (e.getCause() instanceof ClusterTopologyCheckedException)throw (ClusterTopologyCheckedException)e.getCause();if (!ctx.discovery().alive(node))throw new ClusterTopologyCheckedException("Failed to send message, node left: " + node.id(), e);throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +"TCP connection cannot be established due to firewall issues) " +"[node=" + node + ", topic=" + topic +", msg=" + msg + ", policy=" + plc + ']', e);}}}

发送数据使用了类 GridTcpNioCommunicationClient, 实现接口 GridCommunicationClient, 此接口还有一个实现类


核心方法2 processDelayedResponses 这个方法debug了一下,发现没有什么用处,从队列里拿到的数据是null

这个方法从队列 delayedRess (ConcurrentLinkedDeque)中获取返回数据

GridJobExecuteResponse res = delayedRess.poll();

poll()方法表示,如果队列是空,则返回null, 那么要想取到数据,就必然要确保delayedRess队列有值, 下面分析一下这个response是在何时放入这个队列的

找到位置 , 通过debug , 发现 processMappedJobs 这个方法最后的 processDelayedResponses();


IgniteComputeImpl -> call(IgniteCallable<R> job) 有返回值
IgniteComputeImpl -> callAsync0(IgniteCallable<R> job) 有返回值 IgniteInternalFuture<R> 
GridClosureProcessor-> callAsync 返回 ComputeTaskInternalFuture
GridTaskProcessor -> ComputeTaskInternalFuture<R> execute
GridTaskProcessor -> startTask 返回 ComputeTaskInternalFuture  这个是最后的返回, 里面执行的是taskWorker.run()

也就是说在执行startTask的时候,后面的返回结果会放进 ComputeTaskInternalFuture ,异步等待结果

在定义GridTaskWorker的时候, ComputeTaskInternalFuture的实例放进了 GridTaskWorker 中

ComputeTaskInternalFuture 的get方法会判断状态,当状态是空,或者不是错误的时候,返回state,这个state就是结果



 Object res0 = loc ? res.getJobResult() : U.unmarshal(marsh, res.getJobResultBytes(),U.resolveClassLoader(clsLdr, ctx.config()));

在上一篇文章中 https://mp.csdn.net/postedit/89333644

报错 Cluster group is empty 就是因为这个返回结果中的list显示9个,但实际只有一个,其余是空


