kafka java代码横杠_Apache Beam Kafkaio获取java.lang.illegalargumentException:无法序列化KafkaunBoundedSource...
我正在建造一条从卡夫卡读取的阿帕奇光束管道
KafkaIO
但我不知道如何解决序列化问题。
如何使用Kafkaio:
this.pipeline
.apply("ReadFromKafka",
KafkaIO
.read()
.withConsumerFactoryFn(input -> {
this.updateKafkaConsumerProperties(this.kafkaConsumerConfig, input);
return new KafkaConsumer<>(input);
})
.withBootstrapServers(kafkaConsumerConfig.getBootstrapServer())
.withTopic(this.pipelineSourceKafkaConfiguration.getOnboardingTopic())
.withKeyDeserializer(ByteArrayDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class))
.apply("WindowTheData", Window.into(FixedWindows.of(Duration.standardSeconds(5))))
...
但我的驱动程序无法启动,引发了以下问题:
java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@65bd19bf
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:57)
at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:107)
at org.apache.beam.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:86)
at org.apache.beam.sdk.io.Read$Unbounded.(Read.java:137)
at org.apache.beam.sdk.io.Read$Unbounded.(Read.java:132)
at org.apache.beam.sdk.io.Read.from(Read.java:55)
at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:665)
at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:277)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:188)
at com.company.lib.pipelines.DataPersistencePipeline.execute(DataPersistencePipeline.java:64)
at com.company.app.MainApp.registerPipelineEndpoints(MainApp.java:102)
at com.company.app.MainApp.run(MainApp.java:81)
at com.company.app.MainApp.run(MainApp.java:44)
at io.dropwizard.cli.EnvironmentCommand.run(EnvironmentCommand.java:43)
at io.dropwizard.cli.ConfiguredCommand.run(ConfiguredCommand.java:87)
at io.dropwizard.cli.Cli.run(Cli.java:78)
at io.dropwizard.Application.run(Application.java:93)
at com.company.app.MainApp.main(MainApp.java:51)
Caused by: java.io.NotSerializableException: com.company.lib.pipelines.DataPersistencePipeline
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
... 20 more
这个例外抱怨
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
对象不可序列化。
这个类来自Apache Beam SDK,它实际上实现了
Serializable
接口。不知道我哪里做错了。
kafka java代码横杠_Apache Beam Kafkaio获取java.lang.illegalargumentException:无法序列化KafkaunBoundedSource...相关推荐
- java包间通信,诊断Java代码: 消除包间的耦合关联[Java编程]
赞助商链接 本文"诊断Java代码: 消除包间的耦合关联[Java编程]"是由七道奇为您精心收集,来源于网络转载,文章版权归文章作者所有,本站不对其观点以及内容做任何评价,请读者自 ...
- 冒泡排序java代码_数据结构与算法—冒泡排序(Java实现)
[toc] 冒泡排序 程序代码 package com.uplooking.bigdata.datastructure; import java.util.Arrays; public class B ...
- amd cpu不能在cmd环境下运行java代码_如何在Windows10中配置java的JDK环境
今天给大家分享一下如何配置java的JDK环境.操作步骤如下: 1.下载好 jdk 的安装文件,我下载的是 jdk-10.0.1_windows-x64_bin.exe 这个版本的安装文件: 2.使用 ...
- java代码中的缓存类怎么找,JAVA缓存的实现 - dreamcloudz的个人空间 - OSCHINA - 中文开源技术交流社区...
缓存可分为二大类: 一.通过文件缓存,顾名思义文件缓存是指把数据存储在磁盘上,不管你是以XML格式,序列化文件DAT格式还是其它文件格式: 二.内存缓存,也就是实现一个类中静态Map,对这个Map进行 ...
- 希尔排序java代码_希尔排序及希尔排序java代码
由上图可看到希尔排序先约定一个间隔(图中是4),然后对0.4.8这个三个位置的数据进行插入排序,然后向右移一位对位置1.5.9进行插入排序按照此规律直到全部参与了排序.然后将间隔约定为4-1=3,然后 ...
- 关于equals的Java代码_与equals方法相关的Java代码
t3.equals(t3)是唯一具有与方法签名public boolean equals (Test testje)匹配的正确参数的行,因此它是程序中唯一实际调用该print语句的行. 这个问题旨在教 ...
- 有趣的java代码_【有趣】这段java代码太古怪
首先呢,来一段java代码来开点胃.等等等等,耍我呢,这是java代码? \u0070\u0075\u0062\u006c\u0069\u0063\u0020\u0063\u006c\u0061\u0 ...
- 基础贴吧java代码_原来你是这样的JAVA[01]-基础一瞥
1.Java是什么呢? Java不仅是一门语言,而且是一个完整的平台,有一个庞大的库,其中包含了很多可重用的代码和一个提供安全性.跨操作系统的可移植性以及自动垃圾收集等服务的执行环境. 2.JDK 和 ...
- java时间中间加横杠方法_知识点:java一些方法会有横线?以Date 过期方法为例...
原因:他们的开发者在升级方法后,添加了@Deprecated注释, 目的是为了提醒我们,这个方法现在已经有新的方法了,不建议继续使用! 比如: JAVA中Date的tolocalstring为什么不建 ...
最新文章
- Solr 使用Facet分组过程中与分词的矛盾解决办法
- 详解/etc/profile、/etc/bash.bahsrc、~/.profile、~/.bashrc的用途
- 怎样学java软件编程6_月光软件站 - 编程文档 - Java - 我学习使用java的一点体会(6)...
- 笔试算法复习——数组去重
- oracle透明网关 中文,Oracle透明网关的一些文章
- 【转载】要有梦想-创造卓越的职业生涯
- CSS学习笔记(更新中...)
- 【Python实例第13讲】识别手写数字
- App 抓包-Fiddler简单使用教程
- 微信小程序实现腾讯地图定位功能修改地址功能
- Windows下的Rsync(cwRsync)
- Cocos2dx基础手册
- 通过添加css样式cursor属性,改变鼠标的外形,变成放大镜
- 优秀开源云原生工具推荐——系列3
- 手机屏幕常见故障_手机屏幕失灵怎么回事 手机屏幕失灵解决办法
- 叁拾伍- Django Websocket 绝望之旅(dwebsocket 以及 channels)
- 【无标题】文档转成二维码添加到公众号文章(Word、Excel、PPT、PDF等)
- 无烦恼厨房android版,无烦恼厨房安卓版
- Activity启动模式singleInstance
- Outlook Express常见问答