Run Duration

一些处理器支持配置运行持续时间(Run Duration)。此设置告诉处理器在单个任务中继续使用同一task尽可能多地来处理来自传入队列的的FlowFiles(或成批的流文件)。
对于处理单个任务本身非常快并且FlowFile数量也很大的处理器来说,这是一个理想的选择。

在上面的示例中,将完全相同的FlowFiles传递到这两个处理器,这些处理器被配置为执行相同的Attribute更新。两者在过去5分钟内处理了相同数量的FlowFiles;但是,配置为运行持续时间的处理器消耗的总体CPU时间更少。并非所有处理器都支持设置Run Duration。处理器功能的性质,使用的方法或使用的客户端库可能决定了不支持此功能。这样的话你将无法在此类处理器上设置Run Duration

工作原理叙述

  1. 处理器已为其任务分配了线程。处理器从传入连接的Active queue中获取最高优先级的FlowFile(或一批FlowFile)。如果对FlowFile的处理未超过配置的运行持续时间,则会从Active queue中拉出另一个FlowFile(或一批FlowFile)。此过程将在同一线程下继续进行所有操作,直到达到Run Duration时间或Active queue为空。届时,会话完成,所有处理过的FlowFiles都立即提交给适当的关系。

  2. 由于直到整个运行完成才提交所有的FlowFiles,因此在FlowFiles上导致了一些延迟。你配置的Run Duration决定了至少要发生多少延迟(Active queue不为空的时候)。

  3. 如果针对FlowFile执行处理器所需的时间比配置的Run Duration更长,那么调整此配置没有任何其他好处。

这对于堆使用意味着什么

  1. 由于它仅处理Active queue中的传入FlowFiles,因此此处没有增加堆压力。(Active queue中的FlowFiles已经在堆空间中,关于Active queue请看深入理解Apache NIFI Connection)。

  2. 新生成的FlowFiles(如果有的话,取决于处理器功能)全部保留在堆中,直到最终提交为止。这可能会带来一些额外的堆压力,因为所有新生成的FlowFiles都将保留在堆中,直到在运行时间结束时将它们全部提交给输出关系为止(尤其是新FlowFile的content,还没有刷到repository)。

实现

使用SupportsBatching注解标注的Processor是支持Run Duration的,如果一个处理器使用了这个注释,那么它就允许框架对ProcessSession进行批处理的提交,以及允许框架从后续对ProcessSessionFactory.createSession() 的调用中多次返回相同的ProcessSession

比如UpdateAttribute

@EventDriven
@SideEffectFree
@SupportsBatching
...
public class UpdateAttribute extends AbstractProcessor implements Searchable {

重点看在哪里处理了这个SupportsBatching注解,在(深入解析Apache NIFI的调度策略)[./9NIFI调度.md]一文中,我们在讲解Timer driven的时候有提到ConnectableTask.invoke方法,是线程执行调度具体Processor的ontrigger方法前的处理(里面有检测Processor是否有工作可做),下面我们看一下这个方法:

public InvocationResult invoke() {//任务终止if (scheduleState.isTerminated()) {logger.debug("Will not trigger {} because task is terminated", connectable);return InvocationResult.DO_NOT_YIELD;}···//查看Processor是否有工作可做if (!isWorkToDo()) {logger.debug("Yielding {} because it has no work to do", connectable);return InvocationResult.yield("No work to do");}//背压机制if (numRelationships > 0) {final int requiredNumberOfAvailableRelationships = connectable.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;if (!repositoryContext.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships)) {logger.debug("Yielding {} because Backpressure is Applied", connectable);return InvocationResult.yield("Backpressure Applied");}}//可以运行logger.debug("Triggering {}", connectable);//获取 Run Duration的配置final long batchNanos = connectable.getRunDuration(TimeUnit.NANOSECONDS);final ProcessSessionFactory sessionFactory;final StandardProcessSession rawSession;final boolean batch;//处理SupportsBatching注解if (connectable.isSessionBatchingSupported() && batchNanos > 0L) {rawSession = new StandardProcessSession(repositoryContext, scheduleState::isTerminated);sessionFactory = new BatchingSessionFactory(rawSession);batch = true;} else {rawSession = null;sessionFactory = new StandardProcessSessionFactory(repositoryContext, scheduleState::isTerminated);batch = false;}final ActiveProcessSessionFactory activeSessionFactory = new WeakHashMapProcessSessionFactory(sessionFactory);scheduleState.incrementActiveThreadCount(activeSessionFactory);final long startNanos = System.nanoTime();final long finishIfBackpressureEngaged = startNanos + (batchNanos / 25L);final long finishNanos = startNanos + batchNanos;int invocationCount = 0;final String originalThreadName = Thread.currentThread().getName();try {try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), connectable.getRunnableComponent().getClass(), connectable.getIdentifier())) {boolean shouldRun = connectable.getScheduledState() == ScheduledState.RUNNING;while (shouldRun) {//循环onTrigger处理 直到Run Duration时间到了或者Processor没有工作可做或者触发背压机制了invocationCount++;connectable.onTrigger(processContext, activeSessionFactory);if (!batch) {return InvocationResult.DO_NOT_YIELD;}final long nanoTime = System.nanoTime();if (nanoTime > finishNanos) {return InvocationResult.DO_NOT_YIELD;}if (nanoTime > finishIfBackpressureEngaged && isBackPressureEngaged()) {return InvocationResult.DO_NOT_YIELD;}if (connectable.getScheduledState() != ScheduledState.RUNNING) {break;}if (!isWorkToDo()) {break;}if (isYielded()) {break;}if (numRelationships > 0) {final int requiredNumberOfAvailableRelationships = connectable.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;shouldRun = repositoryContext.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships);}}} catch (final TerminatedTaskException tte) {final ComponentLog procLog = new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent());procLog.info("Failed to process session due to task being terminated", new Object[] {tte});} catch (final ProcessException pe) {final ComponentLog procLog = new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent());procLog.error("Failed to process session due to {}", new Object[] {pe});} catch (final Throwable t) {// Use ComponentLog to log the event so that a bulletin will be created for this processorfinal ComponentLog procLog = new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent());procLog.error("{} failed to process session due to {}; Processor Administratively Yielded for {}",new Object[] {connectable.getRunnableComponent(), t, schedulingAgent.getAdministrativeYieldDuration()}, t);logger.warn("Administratively Yielding {} due to uncaught Exception: {}", connectable.getRunnableComponent(), t.toString(), t);connectable.yield(schedulingAgent.getAdministrativeYieldDuration(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);}} finally {try {//批量提交if (batch) {try {rawSession.commit();} catch (final Throwable t) {final ComponentLog procLog = new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent());procLog.error("Failed to commit session {} due to {}; rolling back", new Object[] { rawSession, t.toString() }, t);try {rawSession.rollback(true);} catch (final Exception e1) {procLog.error("Failed to roll back session {} due to {}", new Object[] { rawSession, t.toString() }, t);}}}···return InvocationResult.DO_NOT_YIELD;}

通过这个方法我们看到

  1. 设置了SupportsBatching注解的Processor并且配置了Run Duration时,传到onTrigger方法的ProcessSessionFactory sessionFactory是不一样的。
  2. 批量对应传入的是BatchingSessionFactory,这个类的commit方法可以简单理解为并没有实际干提交事务的事儿,只是做了一些check
  3. 批量的最后对应的是rawSession.commit()

所以,如果你自定义的组件想要支持批处理并且符合批处理的特征(简单说就是任务执行快并且FlowFile数量也很大),只要加一个SupportsBatching注解就可以了。

注意

理论分析:对于一些源组件来说(source 一个流程的源),然后是需要记录状态的(比如说记录一个增量值到state,再比如是从别的地方取数据或者接受数据,拿到数据后告诉对方数据已到手),正常来说Processor的实现都是先session.commit再干记录状态那些事,但如果是批量处理配置Run Duration,通过上面的代码分析发现,processor.onTrigger里我们写的session.commit其实并没有提交,而是等到批处理结束后再提交,如果这个任务是依赖记录状态来获取数据的,其实是不保证后面的commit一定执行的(NIFI shutdown了,NIFI宕了),最终没有commit但是状态已经记录,那么这次批处理的数据是丢失的。

场景模拟描述:现有一个Rest服务,提供类似于kafka的功能,消费者可以来注册获取数据,服务端记录客户端消费的offset,然后使用InvokeHttp批处理的去到这个服务获取数据,那么就有概率发生上面说的情况。

公众号

关注公众号 得到第一手文章/文档更新推送。

深入理解Apache NIFI Run Duration相关推荐

  1. 深入解析Apache NIFI的调度策略

    简介:本文主要讲解Apache NIFI的调度策略,对象主要是针对Processor组件.本文假定读者已经对Apache NIFI有了一定的了解和使用经验,同时作者也尽可能的去讲解的更透彻,使得本文尽 ...

  2. Apache NiFi用户指南

    Apache NiFi用户指南 介绍 Apache NiFi是基于流程编程概念的数据流系统.它支持强大且可扩展的数据路由,转换和系统中介逻辑的有向图.NiFi具有基于Web的用户界面,用于设计,控制, ...

  3. Apache NiFi系统管理员指南 [ 三 ]

    基本群集设置 故障排除 State管理 配置状态提供程序 嵌入式ZooKeeper服务器 ZooKeeper访问控制 ZooKeeper安全 ZooKeeper Migrator Bootstrap属 ...

  4. Apache nifi 集群安装

    原文地址:https://pierrevillard.com/2016/08/13/apache-nifi-1-0-0-cluster-setup/ 文章写的很好了,步骤性的英文写得也比较易懂,原样搬 ...

  5. Apache NIFI入门(读完即入门)

    Apache NIFI入门(读完即入门) 编辑人(全网同名):酷酷的诚 邮箱:zhangchengk@foxmail.com 我将在本文中介绍: 什么是ApacheNIFI,应在什么情况下使用它,理解 ...

  6. Apache NiFi系统管理员指南 [ 一 ]

    如何安装和启动NiFi 端口配置 NiFi 嵌入式Zookeeper 配置最佳实践 安全配置 TLS生成工具包 用户认证 轻量级目录访问协议(LDAP) Kerberos的 OpenId Connec ...

  7. Apache NiFi系统管理员指南 [ 四 ]

    系统属性 核心属性 State管理 H2设置 FlowFile存储库 交换管理(Swap Management) 内容存储库 (Content Repository) 文件系统内容存储库属性 (Fil ...

  8. Apache NiFi远程代码执行-RCE

    目录 一. 漏洞简介 二. 影响版本 三. docker-compose进行漏洞环境搭建 四. 漏洞复现 五. 漏洞挖掘 六. 漏洞修复 一. 漏洞简介 Apache NiFi 是一个易于使用.功能强 ...

  9. Apache NIFI 安装 ● 操作 ● 文件同步 ● oracle 数据库增量同步实例讲解

    nifi简介 nifi背景 NiFi之前是在美国国家安全局(NSA)开发和使用了8年的一个可视化.可定制的数据集成产品.2014年NSA将其贡献给了Apache开源社区,2015年7月成功成为Apac ...

  10. Apache NiFi深度扩展

    介绍 该高级文档旨在深入了解NiFi的实施和设计决策.它假设读者已经阅读了足够的其他文档来了解NiFi的基础知识. FlowFiles是NiFi的核心,也是基于流程的设计.FlowFile是一种数据记 ...

最新文章

  1. Django源码分析1:创建项目和应用分析
  2. jaca和mysql外卖系统_【项目实战】太强大了,Java外卖点餐初级系统【附源码】...
  3. python numpy 子数组_Python利用Numpy数组进行数据处理(一)
  4. SAP Spartacus里的localStorage用法
  5. 【牛客 - NC93】设计LRU缓存结构(模拟)
  6. 怎么查看父子级目录linux,如何查找linux中特定父目录的所有文件?
  7. gflags.lib(gflags.obj) : error LNK2001: 无法解析的外部符号 __imp_PathMatchSpecA
  8. jQuery的回调管理机制(二)
  9. 最新基于高德地图的android进阶开发(3)GPS地图定位
  10. Word高效指南 - 如何批量删除空格空白行
  11. 众觅,让支付宝『到位』全国到位
  12. RK3066和AML8726-MX方案对比 频率与功耗 / 性能 / 方案成本
  13. 原创超简单代码(1.19)
  14. Android系统设置settings应用学习(二)--源代码解析
  15. 开启snapshot的操作失败 如何解决——两种办法
  16. stc89c51单片机音乐盒系统设计_基于单片机数字音乐盒的设计与实现(附PCB,电路图,程序)...
  17. centos7无盘启动_从无盘启动看Linux启动原理
  18. 远程服务器 上传公钥,ssh-keygen教程第5章:copy公钥要服务端
  19. bzoj2150 部落战争
  20. 2019数据结构考研(一)

热门文章

  1. DBeaver 安装及配置离线驱动
  2. 曾被网友疯狂恶搞的「蚂蚁呀嘿」项目开源上过GitHub热榜
  3. oracle length
  4. 【Jquery】文本框校验练习
  5. 深度模型训练之learning rate
  6. iphone,ipad,android图片尺寸
  7. ios开发学习--按钮(Button)效果源码分享
  8. Minecraft mod制作简易教程目录
  9. AES加密报错Given final block not properly padded
  10. XOM版本1.2.5