前言

com.hazelcast.jet.core.processor.Processors(简称P)
这个类实现了核心的P,这里的P对应的是Jet引擎内部的DAG图的节点。
简单来说,这些P处理的是聚合(SUM/AVG/MAX/MIN),更复杂的情况是,可能是在任意的Key上做聚合,甚至是基于事件发生的时间窗上的聚合。因此Jet开发了两类P节点:

  • 单阶段P
  • 两阶段P
    注意:从下面接口文档可以看出,无论是单阶段还是两阶段,都只适用于跑批的场景,并且重启job会丢失数据。
    Jet内部默认传递给P的函数都是无状态的。

Jet的Streaming和batch

Jet区分steaming和batch非常简单,仅仅在P的complete方法上,返回false就是steaming的,返回true就是batch的。

单阶段聚合

所有的入站数据最好是预先分好区,groupby好,这样才不会在Jet集群中产生大量跨节点的数据传输。这种方式最省内存。

             -----------------| upstream vertex |-----------------|| partitioned-distributedV-----------| aggregate |-----------

两阶段聚合

第一个阶段只做accumulate,第二个阶段才做combine和finish动作。第一阶段完全在单节点本地,数据的分区,groupby只发生在第二阶段,因为第一阶段已经对数据做了聚合,到第二阶段数据传输量极小。

            -----------------| upstream vertex |-----------------|| partitioned-localV------------| accumulate |------------|| partitioned-distributedV----------------| combine/finish |----------------

在Jet中,单阶段聚合是两阶段聚合的特例

没有分区,groupby的情况下,第一阶段数据就在节点本地accumulate,然后第二阶段汇集到一个单一combine/finish节点上。

            -----------------| upstream vertex |-----------------|| local, non-partitionedV------------| accumulate |------------|| distributed, all-to-oneV----------------| combine/finish | localParallelism = 1----------------

Processors.java方法详解

public static <A,R> SupplierEx<Processor> aggregateP(@NonnullAggregateOperation<A,R> aggrOp)Type Parameters:
A - type of accumulator returned from aggrOp.createAccumulatorFn()
R - type of the finished result returned from aggrOp.finishAccumulationFn()
Parameters:
aggrOp - the aggregate operation to perform

消费入站数据,返回结果R,如果R是null,这个P就表示没有任何动作。
不写快照,重启任务会丢失数据,只适合做批处理。
——————————————————————————————————————

public static <A,R> SupplierEx<Processor> accumulateP(@NonnullAggregateOperation<A,R> aggrOp)

消费入站数据,返回结果A。
不写快照,重启任务会丢失数据,只适合做批处理。
——————————————————————————————————————

public static <A,R> SupplierEx<Processor> combineP(@NonnullAggregateOperation<A,R> aggrOp)

配合accumulateP使用,消费入站数据,返回结果R,如果R是null,这个P就表示没有任何动作。
不写快照,重启任务会丢失数据,只适合做批处理。
——————————————————————————————————————

public static <K,A,R,OUT> SupplierEx<Processor> aggregateByKeyP(List<FunctionEx<?,? extends K>> keyFns,AggregateOperation<A,R> aggrOp,BiFunctionEx<? super K,? super R,OUT> mapToOutputFn)
Type Parameters:
K - type of key
A - type of accumulator returned from aggrOp.createAccumulatorFn()
R - type of the result returned from aggrOp.finishAccumulationFn()
OUT - type of the item to emit
Parameters:
keyFns - functions that compute the grouping key
aggrOp - the aggregate operation
mapToOutputFn - function that takes the key and the aggregation result and returns the output item

可支持多路入站数据,每路的类型必须不同,每路要有自己的key抽取函数。aggOP里面的accumulate函数每路数据都要提供。
不写快照,重启任务会丢失数据,只适合做批处理。
————————————————————————————————————

public static <K,A> SupplierEx<Processor> accumulateByKeyP(List<FunctionEx<?,? extends K>> getKeyFns,AggregateOperation<A,?> aggrOp)

两阶段P的第一阶段P,根据分区,groupby应用accumulate,每个groupby的key输出一个Map.Entry<K, A>。
可支持多路入站数据,每路的类型必须不同,每路要有自己的key抽取函数。aggOP里面的accumulate函数每路数据都要提供。
不写快照,重启任务会丢失数据,只适合做批处理。
————————————————————————————————————

public static <K,A,R,OUT> SupplierEx<Processor> combineByKeyP(AggregateOperation<A,R> aggrOp,BiFunctionEx<? super K,? super R,OUT> mapToOutputFn)

两阶段P的第二阶段P,从多路accumulateByKeyP接收数据并应用combine,应用函数mapToOutputFn,每个key算出一个R。
不写快照,重启任务会丢失数据,只适合做批处理。
————————————————————————————————————

public static <K,A,R,OUT> SupplierEx<Processor> aggregateToSlidingWindowP(List<FunctionEx<?,? extends K>> keyFns,List<ToLongFunctionEx<?>> timestampFns,TimestampKind timestampKind,SlidingWindowPolicy winPolicy,long earlyResultsPeriod,AggregateOperation<A,? extends R> aggrOp,KeyedWindowResultFunction<? super K,? super R,? extends OUT> mapToOutputFn)

暂时用不到
————————————————————————————————————

public static <K,A> SupplierEx<Processor> accumulateByFrameP(List<FunctionEx<?,? extends K>> keyFns,List<ToLongFunctionEx<?>> timestampFns,TimestampKind timestampKind,SlidingWindowPolicy winPolicy,AggregateOperation<A,?> aggrOp)

暂时用不到
—————————————————————————————————————

public static <K,A,R,OUT> SupplierEx<Processor> combineToSlidingWindowP(SlidingWindowPolicy winPolicy,AggregateOperation<A,? extends R> aggrOp,KeyedWindowResultFunction<? super K,? super R,? extends OUT> mapToOutputFn)

暂时用不到
—————————————————————————————————————

public static <K,A,R,OUT> SupplierEx<Processor> aggregateToSessionWindowP(long sessionTimeout,long earlyResultsPeriod,List<ToLongFunctionEx<?>> timestampFns,List<FunctionEx<?,? extends K>> keyFns,AggregateOperation<A,? extends R> aggrOp,KeyedWindowResultFunction<? super K,? super R,? extends OUT> mapToOutputFn)

暂时用不到
————————————————————————————————————

public static <T> SupplierEx<Processor> insertWatermarksP(@NonnullEventTimePolicy<? super T> eventTimePolicy)

流处理的API来了!!!
在流里面插入watermark,wartermark由eventTimePolicy给出。
这个P会丢弃迟到的(late)数据。
最晚emit水印的数据值会被写入快照。重启所有节点,会从快照继续往后执行。
听起来视乎破坏了单调性原则,但是Jet会合并watermark,可以保证从快照继续往后执行。副作用就是,重启前被认为是迟到的(late)的event,重启后不会被认为是迟到的了。
——————————————————————————————————————

public static <T,R> SupplierEx<Processor> mapP(@NonnullFunctionEx<? super T,? extends R> mapFn)
Type Parameters:
T - type of received item
R - type of emitted item
Parameters:
mapFn - a stateless mapping function

这个函数是无状态的,效果等同于java流式计算中的filter
————————————————————————————————————————

public static <T> SupplierEx<Processor> filterP(@NonnullPredicateEx<? super T> filterFn)

这个函数是无状态的,根据Predicate条件过滤数据。
————————————————————————————————————————

public static <T,R> SupplierEx<Processor> flatMapP(FunctionEx<? super T,? extends Traverser<? extends R>> flatMapFn)

这个函数是无状态的,把数据转换成Traverser。
Traverser不能为null,可以是empty。
————————————————————————————————————————

public static <T,K,S,R> SupplierEx<Processor> mapStatefulP(long ttl,FunctionEx<? super T,? extends K> keyFn,ToLongFunctionEx<? super T> timestampFn,Supplier<? extends S> createFn,TriFunction<? super S,? super K,? super T,? extends R> statefulMapFn,TriFunction<? super S,? super K,? super Long,? extends R> onEvictFn)

有状态的映射,createFn函数返回状态对象s,每个group key就有一个对应的对象s,s对象会有快照,重启不会丢。所以对象s必须可序列化。如果MapFn把输入映射为了null,相当于过滤掉了这个输入数据。对象s的timestamp和watermark两个时间参数(watermark - ttl ??? timestamp)可以控制对象s是否要被丢失或者用watermark更新timestamp。
————————————————————————————————

public static <T,K,S,R> SupplierEx<Processor> flatMapStatefulP(long ttl,FunctionEx<? super T,? extends K> keyFn,ToLongFunctionEx<? super T> timestampFn,Supplier<? extends S> createFn,TriFunction<? super S,? super K,? super T,? extends Traverser<R>> statefulFlatMapFn,TriFunction<? super S,? super K,? super Long,? extends Traverser<R>> onEvictFn)

有状态的映射,createFn函数返回状态对象s,每个group key就有一个对应的对象s,s对象会有快照,重启不会丢。所以对象s必须可序列化。对象s的timestamp和watermark两个时间参数(watermark - ttl ??? timestamp)可以控制对象s是否要被丢失或者用watermark更新timestamp。
注意这个函数跟上一个函数的区别,请跟无状态的两个对应函数对比。
————————————————————————————————————

public static <C,S,T,R> ProcessorSupplier mapUsingServiceP(ServiceFactory<C,S> serviceFactory,BiFunctionEx<? super S,? super T,? extends R> mapFn)
Type Parameters:
C - type of context object
S - type of service object
T - type of received item
R - type of emitted item

对每个输入数据应用无状态函数mapFn,其中参数S是serviceFactory生成的。
————————————————————————————————————

public static <C,S,T,K,R> ProcessorSupplier mapUsingServiceAsyncP(ServiceFactory<C,S> serviceFactory,int maxConcurrentOps,boolean preserveOrder,FunctionEx<T,K> extractKeyFn,BiFunctionEx<? super S,? super T,CompletableFuture<R>> mapAsyncFn)
Type Parameters:
C - type of context object
S - type of service object
T - type of received item
K - type of key
R - type of result item
Parameters:
serviceFactory - the service factory
maxConcurrentOps - maximum number of concurrent async operations per processor
preserveOrder - whether the async responses are ordered or not
extractKeyFn - a function to extract snapshot keys. Used only if preserveOrder==false
mapAsyncFn - a stateless mapping function

future可以为null,future不为null它里面的结果也可以为null,如果是null代码此数据被过滤掉了。extractKeyFn是用来生产快照key的。
快照key的限制:如果接收到的数据跨越了分片,需要保证key相同,如果是round-robin那就随意了,可以用Object::hashCode
——————————————————————————————————

public static <C,S,T> ProcessorSupplier filterUsingServiceP(ServiceFactory<C,S> serviceFactory,BiPredicateEx<? super S,? super T> filterFn)

serviceFactory生成一个S,然后传递给Predicate函数,过滤数据。
————————————————————————————————————

public static <C,S,T,R> ProcessorSupplier flatMapUsingServiceP(ServiceFactory<C,S> serviceFactory,BiFunctionEx<? super S,? super T,? extends Traverser<R>> flatMapFn)

把输入数据转换成Traverser对象,Traverser对象结尾不为null,serviceFactory生成一个S,传递给flatMapFn。
转换过程中有S对象可以用本地存储,所以虽然有快照,但是不能保证一致性,可能出异常情况。
————————————————————————————————————

public static <T> SupplierEx<Processor> sortP(Comparator<T> comparator)

把输入数据按PriorityQueue顺序排序,并在complete阶段发射出去。
————————————————————————————————————

public static SupplierEx<Processor> noopP()

不用解释了吧,有进没出的P

无状态transforms

没有中间状态存储,所有item跟其他item无关。比如下面的算子:
map
filter
flatMap
merge
mapUsingIMap
mapUsingRelicateMap
mapUsingService
mapUsingServiceAsync
mapUsingServiceAsyncBatched
mapUsingPython
hashJoin

有状态transforms

有状态的transforms累计数据,因此输出结果依赖之前的item。
在Jet中窗口只能用于流式数据(带时间),一次性的聚合操作只适合batch。
aggregate
聚合是分布式流式数据处理的基石。它对流式的item数据执行sum或者avg等算子。AggregateOperations提供了一系列的聚合方法,比如:

  • averagingLong()
  • averageDouble()
  • counting()
  • summingLong()
  • summingDouble()
  • maxBy()
  • minBy()
  • toList()
  • bottomN()
  • topN()
  • linearTrend()
  • allOf()

groupingKey

rollingAggregate
不等接收所有的item,每接收一个item就输出一个结果。所以它可以在流式处理中使用。
window
把一个无界的数据流分成一块块。
tumblingWindow

slidingWindow

sessionWindow

Early Results

distinct
略过重复
sort
只支持batch
mapStateful
是map transform的扩展,它可以保存一个可变的中间状态。可以根据这个基础算子写出任意复杂的检查模式的状态机。
比如有个流中有TRANSACTION_START和TRANSACTION_END两个item数据。这种情况用window不行,因为不确定window大小。sessionWindow也不行,因为它默认是等待到最后一次性emit所有items。

public class TransactionEvent {long timestamp();String transactionId();EventType type();
}public enum EventType {TRANSACTION_START,TRANSACTION_END
}

可以这么写代码:

p.readFrom(KafkaSources.kafka(.., "transaction-events")).withNativeTimestamps(0).groupingKey(event -> event.getTransactionId()).mapStateful(MINUTES.toMillis(10),() -> new TransactionEvent[2],(state, id, event) -> {if (event.type() == TRANSACTION_START) {state[0] = event;} else if (event.type() == TRANSACTION_END) {state[1] = event;}if (state[0] != null && state[1] != null) {// we have both start and end eventslong duration = state[1].timestamp() - state[0].timestamp();return MapUtil.entry(event.transactionId(), duration);}// we don't have both events, do nothing for now.return null;},(state, id, currentWatermark) ->// if we have not received both events after 10 minutes,// we will emit a timeout entry(state[0] == null || state[1] == null)? MapUtil.entry(id, TIMED_OUT): null).writeTo(Sinks.logger());

Hazelcast Jet Processor相关推荐

  1. Hazelcast Jet Pipeline详解

    前言 pipeline只有两种stage:stream和batch,主要看它的数据源是哪种,如果是StreamSource那就用StreamStage,如果是BatchSource那就用BatchSt ...

  2. Hazelcast发布开源流处理引擎Jet

    Hazelcast主要以开源缓存和内存数据网格技术(通常称为Hazelcast IMDG,或者只是Hazelcast)为人所熟知.然而过去的两年中,他们一直致力于一个新的.重要的开源项目Hazelca ...

  3. Hazelcast更换CEO,承诺继续造福开源社区

    Hazelcast是一家以开发人员为中心的基础设施组件开发商,其产品包括一个内存数据网格(IMDG)和一个流式处理引擎(Hazelcast Jet).近日,Hazelcast经历了一场管理层人员变动. ...

  4. Hazelcast IMDG参考中文版手册-第七章-分布式数据结构

    如概述部分所述,Hazelcast提供Java接口的分布式实现.以下是这些实现的列表,其中包含指向本手册中相应部分的链接. 标准实用程序集合 Map是分布式实现的java.util.Map.它可以让你 ...

  5. Hazelcast IMDGJet详解

    前言 Hazelcast已经有十多年的发展历史了,是基于Java的分布式内存网格IMDG,对标Redis:后来又引入了流式数据处理引擎Jet,对标Flink.因此从Hazelcast软件功能来说,它是 ...

  6. Hazelcast IMDG参考中文版手册-第十一章-分布式查询

    分布式查询访问存储在相同或不同成员上的多个数据源的数据. Hazelcast对您的数据进行分区,并将其分布到成员集群中.您可以迭代映射条目并查找您感兴趣的某些条目(由谓词指定).但是,这不是非常有效, ...

  7. Streaming流式框架汇总

    原文:https://github.com/InterestingLab/awesome-streaming#online-machine-learning A curated list of awe ...

  8. Apache Beam 是什么,它为什么比其他选择更受欢迎?

    1. 概述 在本教程中,我们将介绍 Apache Beam 并探讨其基本概念.我们将首先演示使用 Apache Beam 的用例和好处,然后介绍基本概念和术语.之后,我们将通过一个简单的例子来说明 A ...

  9. Java大数据处理的流行框架

    大数据挑战 在公司需要处理不断增长的数据量的各个领域中,对大数据的概念有不同的理解. 在大多数这些情况下,需要以某种方式设计所考虑的系统,以便能够处理该数据,而不会随着数据大小的增加而牺牲吞吐量. 从 ...

最新文章

  1. 深度学习最常用的10个激活函数
  2. 零散的MySQL基础总是记不住?看这一篇就够了!
  3. ​多分类下的ROC曲线和AUC​
  4. JavaScript对SEO的影响及解决之道
  5. 菜鸟教程 之 JavaScript 教程
  6. Integral Channel Features-论文整理
  7. c++获取子类窗口句柄位置_干货分享:用一百行代码做一个C/C++表白小程序,程序员的浪漫!...
  8. Python的PyDBG调试器的用法
  9. 李向阳教授谈中科大AI承继与挑战,IT校友影响力惊人
  10. Vijos P1596 加法表【迭代】
  11. ADT(Android)—Eclipse开发AndroidManifest.xml在哪里
  12. 计算机网络误区——源目IP和源目MAC变化问题
  13. 关闭计算机端口的命令行,关闭端口命令,小编教你如何关闭电脑80端口
  14. matlab xls转csv,使用python或Matlab将csv文件中的数据转换为csv文件
  15. 基于matlab的数字图像处理---图像的锐化与边缘提取
  16. 国二office计算机基础知识,国二office计算机基础知识选择题
  17. 小爱同学电脑版安装教程
  18. 0开始学py爬虫(学习笔记)(Scrapy框架)(爬取职友集招聘信息)
  19. linux判断文件类型是否存在脚本,shell脚本中的逻辑判断 文件目录属性判断 if特殊用法 case判断...
  20. 存储系统基础知识介绍

热门文章

  1. Linux通过df命令查看显示磁盘空间满,但实际未占用问题
  2. Echarts 贵州地图(增加贵安新区)
  3. i.MX6ULL驱动开发 | 04-Linux设备树基本语法与实例解析
  4. apdl与传统计算机语言,ANSYS经典APDL语言详解及ANSYS二次开发
  5. 【ANSYS APDL】如何将变量、矩阵等数据导出到TXT文件?
  6. 2014.03.31_一年很快过去了
  7. Socket:由于系统缓冲区空间不足或队列已满,不能执行套接字上的操作
  8. Echarts之饼图制作
  9. java正则匹配ip_正则表达式 - 匹配 IP 地址
  10. 【英语】 英语的重音怎么读