mac启动rabbitmq_从0到1学习Flink—— Flink 读取 Kafka 数据写入到 RabbitMQ
公众号(zhisheng)里回复 面经、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章!
前言
之前有文章 《从0到1学习Flink》—— Flink 写入数据到 Kafka 写过 Flink 将处理后的数据后发到 Kafka 消息队列中去,当然我们常用的消息队列可不止这一种,还有 RocketMQ、RabbitMQ 等,刚好 Flink 也支持将数据写入到 RabbitMQ,所以今天我们就来写篇文章讲讲如何将 Flink 处理后的数据写入到 RabbitMQ。
前提准备
安装 RabbitMQ
这里我直接用 docker 命令安装吧,先把 docker 在 mac 上启动起来。
在命令行中执行下面的命令:
docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management
![](/assets/blank.gif)
对这个命令不懂的童鞋可以看看我以前的文章:http://www.54tianzhisheng.cn/2018/01/26/SpringBoot-RabbitMQ/
登录用户名和密码分别是:admin / admin ,登录进去是这个样子就代表安装成功了:
![](/assets/blank.gif)
依赖
pom.xml 中添加 Flink connector rabbitmq 的依赖如下:
<dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-connector-rabbitmq_${scala.binary.version}artifactId> <version>${flink.version}version>dependency>
生产者
这里我们依旧自己写一个工具类一直的往 RabbitMQ 中的某个 queue 中发数据,然后由 Flink 去消费这些数据。
注意按照我的步骤来一步步操作,否则可能会出现一些错误!
RabbitMQProducerUtil.java
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQProducerUtil { public final static String QUEUE_NAME = "zhisheng";
public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ相关信息 factory.setHost("localhost"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setPort(5672);
//创建一个新的连接 Connection connection = factory.newConnection();
//创建一个通道 Channel channel = connection.createChannel();
// 声明一个队列// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送消息到队列中 String message = "Hello zhisheng";
//我们这里演示发送一千条数据 for (int i = 0; i 1000; i++) { channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes("UTF-8")); System.out.println("Producer Send +'" + message + i); }
//关闭通道和连接 channel.close(); connection.close(); }}
Flink 主程序
import com.zhisheng.common.utils.ExecutionEnvUtil;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
/** * 从 rabbitmq 读取数据 */public class Main { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ParameterTool parameterTool = ExecutionEnvUtil.PARAMETER_TOOL;
//这些配置建议可以放在配置文件中,然后通过 parameterTool 来获取对应的参数值 final RMQConnectionConfig connectionConfig = new RMQConnectionConfig .Builder().setHost("localhost").setVirtualHost("/") .setPort(5672).setUserName("admin").setPassword("admin") .build();
DataStreamSource zhisheng = env.addSource(new RMQSource<>(connectionConfig,"zhisheng",true,new SimpleStringSchema())) .setParallelism(1); zhisheng.print();//如果想保证 exactly-once 或 at-least-once 需要把 checkpoint 开启// env.enableCheckpointing(10000); env.execute("flink learning connectors rabbitmq"); }}
运行 RabbitMQProducerUtil 类,再运行 Main 类!注意⚠️:
1、RMQConnectionConfig 中设置的用户名和密码要设置成 admin/admin,如果你换成是 guest/guest,其实是在 RabbitMQ 里面是没有这个用户名和密码的,所以就会报这个错误:
nested exception is com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.
不出意外的话应该你运行 RabbitMQProducerUtil 类后,立马两个运行的结果都会出来,速度还是很快的。
![](/assets/blank.gif)
2、如果你在 RabbitMQProducerUtil 工具类中把注释的那行代码打开的话:
// 声明一个队列// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
就会出现这种错误:
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'zhisheng' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
这是因为你打开那个注释的话,一旦你运行了该类就会创建一个叫做 zhisheng
的 Queue,当你再运行 Main 类中的时候,它又会创建这样一个叫 zhisheng
的 Queue,然后因为已经有同名的 Queue 了,所以就有了冲突,解决方法就是把那行代码注释就好了。
3、该 connector(连接器)中提供了 RMQSource 类去消费 RabbitMQ queue 中的消息和确认 checkpoints 上的消息,它提供了三种不一样的保证:
Exactly-once(只消费一次): 前提条件有,1 是要开启 checkpoint,因为只有在 checkpoint 完成后,才会返回确认消息给 RabbitMQ(这时,消息才会在 RabbitMQ 队列中删除);2 是要使用 Correlation ID,在将消息发往 RabbitMQ 时,必须在消息属性中设置 Correlation ID。数据源根据 Correlation ID 把从 checkpoint 恢复的数据进行去重;3 是数据源不能并行,这种限制主要是由于 RabbitMQ 将消息从单个队列分派给多个消费者。
At-least-once(至少消费一次): 开启了 checkpoint,但未使用相 Correlation ID 或 数据源是并行的时候,那么就只能保证数据至少消费一次了
No guarantees(无法保证): Flink 接收到数据就返回确认消息给 RabbitMQ
Sink 数据到 RabbitMQ
RabbitMQ 除了可以作为数据源,也可以当作下游,Flink 消费数据做了一些处理之后也能把数据发往 RabbitMQ,下面演示下 Flink 消费 Kafka 数据后写入到 RabbitMQ。
public class Main1 { public static void main(String[] args) throws Exception { final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args); StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool); DataStreamSource data = KafkaConfigUtil.buildSource(env);final RMQConnectionConfig connectionConfig = new RMQConnectionConfig .Builder().setHost("localhost").setVirtualHost("/") .setPort(5672).setUserName("admin").setPassword("admin") .build();//注意,换一个新的 queue,否则也会报错 data.addSink(new RMQSink<>(connectionConfig, "zhisheng001", new MetricSchema())); env.execute("flink learning connectors rabbitmq"); }}
是不是很简单?但是需要注意的是,要换一个之前不存在的 queue,否则是会报错的。
不出意外的话,你可以看到 RabbitMQ 的监控页面会出现新的一个 queue 出来,如下图:
![](/assets/blank.gif)
总结
本文先把 RabbitMQ 作为数据源,写了个 Flink 消费 RabbitMQ 队列里面的数据进行打印出来,然后又写了个 Flink 消费 Kafka 数据后写入到 RabbitMQ 的例子!
本文原创地址是: http://www.54tianzhisheng.cn/2019/01/20/Flink-RabbitMQ-sink/ , 未经允许禁止转载。
关注我
微信公众号:zhisheng
另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了。你可以加我的微信:zhisheng_tian,然后回复关键字:Flink 即可无条件获取到。
更多私密资料请加入知识星球!
Github 代码仓库
https://github.com/zhisheng17/flink-learning/
以后这个项目的所有代码都将放在这个仓库里,包含了自己学习 flink 的一些 demo 和博客。
本文的项目代码在 https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-connectors/flink-learning-connectors-rabbitmq
相关文章
1、《从0到1学习Flink》—— Apache Flink 介绍
2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门
3、《从0到1学习Flink》—— Flink 配置文件详解
4、《从0到1学习Flink》—— Data Source 介绍
5、《从0到1学习Flink》—— 如何自定义 Data Source ?
6、《从0到1学习Flink》—— Data Sink 介绍
7、《从0到1学习Flink》—— 如何自定义 Data Sink ?
8、《从0到1学习Flink》—— Flink Data transformation(转换)
9、《从0到1学习Flink》—— 介绍Flink中的Stream Windows
10、《从0到1学习Flink》—— Flink 中的几种 Time 详解
11、《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch
12、《从0到1学习Flink》—— Flink 项目如何运行?
13、《从0到1学习Flink》—— Flink 写入数据到 Kafka
14、《从0到1学习Flink》—— Flink JobManager 高可用性配置
15、《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍
16、《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL
17、Blink 真香
18、《从0到1学习Flink》—— 你上传的 jar 包藏到哪里去了?
mac启动rabbitmq_从0到1学习Flink—— Flink 读取 Kafka 数据写入到 RabbitMQ相关推荐
- kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ
前言 之前有文章 <从0到1学习Flink>-- Flink 写入数据到 Kafka 写过 Flink 将处理后的数据后发到 Kafka 消息队列中去,当然我们常用的消息队列可不止这一种, ...
- 写入mysql_《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL
前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的东西当时是写死的,不能够通用,最近知识星 ...
- 《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL
<!-- more --> 前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的 ...
- mac r 导出csv文件_每天学习一点R:8.数据的导入和输出
数据的导入 在应用R进行数据分析之前,首先要做的一步工作就是将数据导入R工作环境. R所识别的数据通常为"X·Y"型的多变量数据,格式为txt或csv格式,不同数据间以制表符(Ta ...
- flink sql设置并行度_《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍
前言 之所以写这个是因为前段时间自己的项目出现过这样的一个问题: Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Acto ...
- Flink 从0到1学习—— 分享四本 Flink 国外的书和二十多篇 Paper 论文
前言 之前也分享了不少自己的文章,但是对于 Flink 来说,还是有不少新入门的朋友,这里给大家分享点 Flink 相关的资料(国外数据 pdf 和流处理相关的 Paper),期望可以帮你更好的理解 ...
- flink 自定义 窗口_《从0到1学习Flink》—— Flink Data transformation(转换)
前言 在第一篇介绍 Flink 的文章 <<从0到1学习Flink>-- Apache Flink 介绍> 中就说过 Flink 程序的结构 Flink 应用程序结构就是如上图 ...
- flink 写入到es_《从0到1学习Flink》—— Flink 写入数据到 Kafka
前言 之前文章 <从0到1学习Flink>-- Flink 写入数据到 ElasticSearch 写了如何将 Kafka 中的数据存储到 ElasticSearch 中,里面其实就已经用 ...
- centos7安装flink集群_《从0到1学习Flink》—— Flink 写入数据到 Kafka
前言 之前文章 <从0到1学习Flink>-- Flink 写入数据到 ElasticSearch 写了如何将 Kafka 中的数据存储到 ElasticSearch 中,里面其实就已经用 ...
- 大数据技术之_19_Spark学习_07_Spark 性能调优 + 数据倾斜调优 + 运行资源调优 + 程序开发调优 + Shuffle 调优 + GC 调优 + Spark 企业应用案例
大数据技术之_19_Spark学习_07 第1章 Spark 性能优化 1.1 调优基本原则 1.1.1 基本概念和原则 1.1.2 性能监控方式 1.1.3 调优要点 1.2 数据倾斜优化 1.2. ...
最新文章
- 如何为回归问题选择最合适的机器学习方法?
- 分块的单点修改查询区间和_树状数组的区间修改与单点查询与区间查询
- 修改sms_def的MOF文件收集网络共享信息
- 超详细!各种内部排序算法的比较
- java8避免null_在 Java 8 中避免 Null 检查
- linux镜像文件不要大于4g,Systemback制作大于4G的Ubuntu系统镜像
- [SCOI 2010]传送带
- html常用标签(form标签)
- 惯性矩和偏心距描述器
- 转行人工智能,不得不温故的数学基础知识
- matplotlib绘制K线图
- 如何在浏览器上安装 VueDevtools工具
- 在mysql数据库中创建Oracle数据库中的scott用户表
- java序列化的接口为什么是空的?
- java 图片去水印_如何用java去除图片水印?
- html相对定位 不占位置,CSS position 相对定位和绝对定位
- 《区块链革命》读书笔记1可信的协议 引导未来:区块链经济七大设计原则
- 写给文奇的阿里云建站教程
- python极简应用_30 个极简Python代码,拿走即用(真干货)
- 找不到模块“@/....”或其相应的类型声明。
热门文章
- Java设计模式——建造者模式
- 【九度OJ1348】|【剑指offer36】数组中的逆序对
- IBM存储扩展柜磁盘在线扩容(一)
- 继续开源还是走向封闭?谷歌未来
- 技术人频道的一个问题——“程序员言”
- Windows下运行linux桌面程序
- exawear能运行java_VirSCAN.org-多引擎在线病毒扫描网 v1.02,当前支持 47 款杀毒引擎...
- 无机金属专业里有计算机课吗,无机非金属材料工程专业课程有不少
- eclipse git拉取失败_收藏!工作中Git使用实践和常用命令流程合集
- 复习webpack4之PWA打包配置