1,主代码:

package application.storm;import entry.ConsumerDeserializationSchema;
import entry.TopicInfoTest;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.slf4j.LoggerFactory;import java.util.Properties;public class FlinkTest {private static org.slf4j.Logger logger = LoggerFactory.getLogger(FlinkTest.class);public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();args = new String[]{"--input-topic", "dianyou_wxgz3", "--bootstrap.servers", "node2.hadoop:9091,node3.hadoop:9091,node2.hadoop:9092,node3.hadoop:9092","--zookeeper.connect", "node1.hadoop:2181,node2.hadoop:2181,node3.hadoop:2181", "--group.id", "cc1"};logger.error("************进入了程序**************");ParameterTool parameterTool = ParameterTool.fromArgs(args);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);Properties pros = parameterTool.getProperties();//todo 指定输入数据为kafka topicDataStream<TopicInfoTest> kafkaDstream_test = env.addSource(new FlinkKafkaConsumer010<TopicInfoTest>("dianyou_wxgz3",new ConsumerDeserializationSchema(TopicInfoTest.class),
//                new MySchema(),pros).setStartFromGroupOffsets()).setParallelism(4);kafkaDstream_test.print();env.execute("startExecute");}}

2,实体类与自定义格式

package entry;import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;public class ConsumerDeserializationSchema implements DeserializationSchema<TopicInfoTest> {private Class<TopicInfoTest> clazz;public ConsumerDeserializationSchema(Class<entry.TopicInfoTest> clazz) {this.clazz = clazz;}@Overridepublic TopicInfoTest deserialize(byte[] message) throws IOException {ByteBuffer buffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN);String mess = byteBuffertoString(buffer);//封装为POJO类System.out.println("mess = " + mess);return new TopicInfoTest(mess);}@Overridepublic boolean isEndOfStream(TopicInfoTest t) {return false;}@Overridepublic TypeInformation<TopicInfoTest> getProducedType() {return TypeExtractor.getForClass(clazz);}public static String byteBuffertoString(ByteBuffer buffer) {Charset charset = null;CharsetDecoder decoder = null;CharBuffer charBuffer = null;try {charset = Charset.forName("UTF-8");decoder = charset.newDecoder();// charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空charBuffer = decoder.decode(buffer.asReadOnlyBuffer());return charBuffer.toString();} catch (Exception ex) {ex.printStackTrace();return "";}}
}
package entry;import java.io.Serializable;public class TopicInfoTest implements Serializable {private String topicName;public String getTopicName() {return topicName;}public void setTopicName(String topicName) {this.topicName = topicName;}public TopicInfoTest(String topicName) {this.topicName = topicName;}@Overridepublic String toString() {return "TopicInfoTest{" +"topicName='" + topicName + '\'' +'}';}
}

3,如果写成泛型:

package entry;import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;import java.io.IOException;public class ConsumerDeserializationSchema<T> implements DeserializationSchema<T> {private Class<T> clazz;public ConsumerDeserializationSchema(Class<T> clazz) {this.clazz = clazz;}@Overridepublic T deserialize(byte[] bytes) throws IOException {//确保 new String(bytes) 是json 格式,如果不是,请自行解析System.out.println("bytes = " + new String(bytes));System.out.println("json-parser"+JSON.parseObject(new String(bytes), clazz));return JSON.parseObject(new String(bytes), clazz);}@Overridepublic boolean isEndOfStream(T t) {return false;}@Overridepublic TypeInformation<T> getProducedType() {return TypeExtractor.getForClass(clazz);}
}

测试数据为:

{"topicName":"1"}

打印结果:

Flink kafka 数据转成自己需要的实体类相关推荐

  1. Storm 消费Kafka数据及相关异常解决

    Storm 消费Kafka数据及相关异常解决 问题 KafkaTopoDemo类 bolt类 问题 storm报错:Exception in thread "main" java. ...

  2. (一)EasyExcel的使用(读取数据到实体类即绑定实体类)

    最近遇到了一个excel简单的导入导出的需求,因此就对easyexcel第三方插件的使用做一点总结,大家可以看一看,可能会对你有点帮助. 目录 前言: 1.引入easyexcel相关依赖 2.创建对应 ...

  3. 2017-4-20实体类,数据访问类.字符串攻击.防攻击

    程序分三层:界面层.业务逻辑层.数据访问层 这里主要操作的是数据访问层,数据访问层又分为: 1.实体类 2.数据访问类 存放位置:APP_Code文件中 一.实体类 最简单的封装 把数据库的表名变成类 ...

  4. 《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

    <!-- more --> 前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的 ...

  5. kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ

    前言 之前有文章 <从0到1学习Flink>-- Flink 写入数据到 Kafka 写过 Flink 将处理后的数据后发到 Kafka 消息队列中去,当然我们常用的消息队列可不止这一种, ...

  6. 实时数仓 大数据 Hadoop flink kafka

    ⼀.实时数仓建设背景 实时需求⽇趋迫切 ⽬前各⼤公司的产品需求和内部决策对于数据实时性的要求越来越迫切,需要实时数仓的能⼒来赋能.传统离 线数仓的数据时效性是 T+1,调度频率以天为单位,⽆法⽀撑实时 ...

  7. 写入mysql_《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

    前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的东西当时是写死的,不能够通用,最近知识星 ...

  8. flink 写入到es_《从0到1学习Flink》—— Flink 写入数据到 Kafka

    前言 之前文章 <从0到1学习Flink>-- Flink 写入数据到 ElasticSearch 写了如何将 Kafka 中的数据存储到 ElasticSearch 中,里面其实就已经用 ...

  9. flink读不到kafka数据问题

    [1]搭建集群 [2]测试kafka集群没问题后, 根据[3]中开始用flink消费kafka数据,发现啥也么有 调试办法如下: ①去$KAFKA/logs下面看了下,是有相关主题的文件夹生成的. ② ...

最新文章

  1. modelsim的destbench模型1
  2. 计算机网络实验三:网络层和链路层网络协议分析
  3. 最小熵原理系列:词向量的维度应该怎么选择?
  4. IOS成长之路-检测耳机插入/拔出
  5. Postgre合并多行数据为一行
  6. WordPress插件-WBOLT热门关键词推荐插件v1.3.0 Pro绿色版
  7. 如何把GIT项目push到公司review服务器
  8. tensorflow数据读取机制
  9. redis 获取不到_redis系列之——缓存穿透、缓存击穿、缓存雪崩
  10. android按键精灵 释放内存,【院刊】-【201408期】内存用完?院刊教你如何释放系统内存...
  11. 年后跳槽高峰期,字节跳动面试题拿走不谢(含答案)
  12. php处理excel里面的重复数据,表格中删除重复项怎么操作
  13. python sdk是什么_SDK 和 API 的区别是什么?
  14. FIR滤波器窗口设计法和频率采样设计法
  15. python之NetworkX的使用
  16. C语言如何判断数独是否正确,会数独的大佬请进。这是个判断九宫格数独是否正确的程序。...
  17. WorldFirst靠谱吗?跨境收款工具万里汇WorldFirst介绍!
  18. 下列选项中属于c语言合法变量名的是,计算机二级考试C语言模拟练习
  19. 通过js动态创建标签,并设置属性
  20. 2022建筑焊工(建筑特殊工种)考试题库及在线模拟考试

热门文章

  1. matlab:平稳随机过程的自相关函数及功率谱密度
  2. 有了开源ROS,机器人就能自由行走?
  3. IAT hook与inline hook的区别
  4. 云速汉字、组词、拼音、造句、用法、音标采集软件
  5. NSX-T 系列:第 18部分 - 配置内联负载均衡服务(LB)
  6. matlab从一加到一百,从1加到100的快速方法(如何快速算出从1加到100)
  7. 【因特网】网络杂谈(3)之你真的了解因特网吗?
  8. 2020 KALI 设置壁纸
  9. Google的技术剖析:创始人Sergey Brin 和 Lawrence Page的研究论文 (转)
  10. HDU4676 Sum Of Gcd