我正在建造一条从卡夫卡读取的阿帕奇光束管道

KafkaIO

但我不知道如何解决序列化问题。

如何使用Kafkaio:

this.pipeline

.apply("ReadFromKafka",

KafkaIO

.read()

.withConsumerFactoryFn(input -> {

this.updateKafkaConsumerProperties(this.kafkaConsumerConfig, input);

return new KafkaConsumer<>(input);

})

.withBootstrapServers(kafkaConsumerConfig.getBootstrapServer())

.withTopic(this.pipelineSourceKafkaConfiguration.getOnboardingTopic())

.withKeyDeserializer(ByteArrayDeserializer.class)

.withValueDeserializer(ByteArrayDeserializer.class))

.apply("WindowTheData", Window.into(FixedWindows.of(Duration.standardSeconds(5))))

...

但我的驱动程序无法启动,引发了以下问题:

java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@65bd19bf

at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:57)

at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:107)

at org.apache.beam.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:86)

at org.apache.beam.sdk.io.Read$Unbounded.(Read.java:137)

at org.apache.beam.sdk.io.Read$Unbounded.(Read.java:132)

at org.apache.beam.sdk.io.Read.from(Read.java:55)

at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:665)

at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:277)

at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)

at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)

at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56)

at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:188)

at com.company.lib.pipelines.DataPersistencePipeline.execute(DataPersistencePipeline.java:64)

at com.company.app.MainApp.registerPipelineEndpoints(MainApp.java:102)

at com.company.app.MainApp.run(MainApp.java:81)

at com.company.app.MainApp.run(MainApp.java:44)

at io.dropwizard.cli.EnvironmentCommand.run(EnvironmentCommand.java:43)

at io.dropwizard.cli.ConfiguredCommand.run(ConfiguredCommand.java:87)

at io.dropwizard.cli.Cli.run(Cli.java:78)

at io.dropwizard.Application.run(Application.java:93)

at com.company.app.MainApp.main(MainApp.java:51)

Caused by: java.io.NotSerializableException: com.company.lib.pipelines.DataPersistencePipeline

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)

at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)

... 20 more

这个例外抱怨

org.apache.beam.sdk.io.kafka.KafkaUnboundedSource

对象不可序列化。

这个类来自Apache Beam SDK,它实际上实现了

Serializable

接口。不知道我哪里做错了。

kafka java代码横杠_Apache Beam Kafkaio获取java.lang.illegalargumentException:无法序列化KafkaunBoundedSource...相关推荐

  1. java包间通信,诊断Java代码: 消除包间的耦合关联[Java编程]

    赞助商链接 本文"诊断Java代码: 消除包间的耦合关联[Java编程]"是由七道奇为您精心收集,来源于网络转载,文章版权归文章作者所有,本站不对其观点以及内容做任何评价,请读者自 ...

  2. 冒泡排序java代码_数据结构与算法—冒泡排序(Java实现)

    [toc] 冒泡排序 程序代码 package com.uplooking.bigdata.datastructure; import java.util.Arrays; public class B ...

  3. amd cpu不能在cmd环境下运行java代码_如何在Windows10中配置java的JDK环境

    今天给大家分享一下如何配置java的JDK环境.操作步骤如下: 1.下载好 jdk 的安装文件,我下载的是 jdk-10.0.1_windows-x64_bin.exe 这个版本的安装文件: 2.使用 ...

  4. java代码中的缓存类怎么找,JAVA缓存的实现 - dreamcloudz的个人空间 - OSCHINA - 中文开源技术交流社区...

    缓存可分为二大类: 一.通过文件缓存,顾名思义文件缓存是指把数据存储在磁盘上,不管你是以XML格式,序列化文件DAT格式还是其它文件格式: 二.内存缓存,也就是实现一个类中静态Map,对这个Map进行 ...

  5. 希尔排序java代码_希尔排序及希尔排序java代码

    由上图可看到希尔排序先约定一个间隔(图中是4),然后对0.4.8这个三个位置的数据进行插入排序,然后向右移一位对位置1.5.9进行插入排序按照此规律直到全部参与了排序.然后将间隔约定为4-1=3,然后 ...

  6. 关于equals的Java代码_与equals方法相关的Java代码

    t3.equals(t3)是唯一具有与方法签名public boolean equals (Test testje)匹配的正确参数的行,因此它是程序中唯一实际调用该print语句的行. 这个问题旨在教 ...

  7. 有趣的java代码_【有趣】这段java代码太古怪

    首先呢,来一段java代码来开点胃.等等等等,耍我呢,这是java代码? \u0070\u0075\u0062\u006c\u0069\u0063\u0020\u0063\u006c\u0061\u0 ...

  8. 基础贴吧java代码_原来你是这样的JAVA[01]-基础一瞥

    1.Java是什么呢? Java不仅是一门语言,而且是一个完整的平台,有一个庞大的库,其中包含了很多可重用的代码和一个提供安全性.跨操作系统的可移植性以及自动垃圾收集等服务的执行环境. 2.JDK 和 ...

  9. java时间中间加横杠方法_知识点:java一些方法会有横线?以Date 过期方法为例...

    原因:他们的开发者在升级方法后,添加了@Deprecated注释, 目的是为了提醒我们,这个方法现在已经有新的方法了,不建议继续使用! 比如: JAVA中Date的tolocalstring为什么不建 ...

最新文章

  1. Solr 使用Facet分组过程中与分词的矛盾解决办法
  2. 详解/etc/profile、/etc/bash.bahsrc、~/.profile、~/.bashrc的用途
  3. 怎样学java软件编程6_月光软件站 - 编程文档 - Java - 我学习使用java的一点体会(6)...
  4. 笔试算法复习——数组去重
  5. oracle透明网关 中文,Oracle透明网关的一些文章
  6. 【转载】要有梦想-创造卓越的职业生涯
  7. CSS学习笔记(更新中...)
  8. 【Python实例第13讲】识别手写数字
  9. App 抓包-Fiddler简单使用教程
  10. 微信小程序实现腾讯地图定位功能修改地址功能
  11. Windows下的Rsync(cwRsync)
  12. Cocos2dx基础手册
  13. 通过添加css样式cursor属性,改变鼠标的外形,变成放大镜
  14. 优秀开源云原生工具推荐——系列3
  15. 手机屏幕常见故障_手机屏幕失灵怎么回事 手机屏幕失灵解决办法
  16. 叁拾伍- Django Websocket 绝望之旅(dwebsocket 以及 channels)
  17. 【无标题】文档转成二维码添加到公众号文章(Word、Excel、PPT、PDF等)
  18. 无烦恼厨房android版,无烦恼厨房安卓版
  19. Activity启动模式singleInstance
  20. Outlook Express常见问答

热门文章

  1. sql 优化笔记(不全的,暂时存盘,不建议看,关闭留言,想喷也喷不了)
  2. 单端信号和差分信号区别
  3. DCS系统 采集OPC ua协议自动保存入数据库方案
  4. Minecraft 1.12.2 生化8 模组 1.9版本改版大更新
  5. 东秦计算机组成课设,东秦组成原理课设.docx
  6. SQL Server 智能感知插件SQL Prompt 4 智能提示
  7. OpenMP学习笔记
  8. Java项目经验——程序员成长的钥匙
  9. 年前最后一趟车,来年要少开车、慎开车
  10. 数组05-Contain、Find、Remove方法