志宇-RocketMQ学习
RocketMQ
- RocketMQ安装
- RocketMQ-console安装
- RocketMQ简单使用
- RabbitMQ核心概念
- 消息发送状态(返回对象中的枚举类型有4种)
- 重试次数
- RocketMQ发送消息三种方式
- RocketMQ延迟消息设置
- 消息顺序消费
- 消费端配置参数详情
- tag标签
- 消费模式
- Offset和CommitLog
- 事务使用
- 集群部署
RocketMQ安装
官方安装方法
先后安装Name Server(起到路由功能)
和 Broker(RocketMQ服务)
然后测试下发送和接收可用
内存不够在/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/bin
目录下修改runbroker.sh
tools.sh
runserver.sh
这三个文件中JAVA_OPT
参数,修改完如下
#仅仅修改已经存在的配置即可,将4g换成256m或者128m
#runbroker.sh broker占用的内存大小
#tools.sh 测试发送和接收工具的内存大小
#runserver.sh 路由的内存和大小
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
RocketMQ-console安装
- 下载
方式一、git下载,执行如下命令
git clone https://github.com/apache/rocketmq-externals.git方式二、直接下载,访问如下地址即可
https://github.com/apache/rocketmq-externals/archive/master.zip
- 修改配置
找到rocketmq-console/src/main/resources/application.properties
修改配置
#console端口
server.port=17890
#name server地址
rocketmq.config.namesrvAddr=localhost:9876
找到rocketmq-console
目录下的pom.xml
文件,修改配置
<rocketmq.version>4.4.0XXX</rocketmq.version>
4.4.0XXX 修改为 4.4.0(你的RocketMQ版本)
<rocketmq.version>4.4.0</rocketmq.version>
- 编译
进入rocketmq-console
目录,编译打包 mvn clean package -Dmaven.test.skip=true
进入target目录 ,启动 java -jar rocketmq-console-ng-1.0.0.jar
守护进程方式启动 nohup java -jar rocketmq-console-ng-1.0.0.jar &
RocketMQ简单使用
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
MQClientException: No route info of this topic, TopicTest1
解决办法
来到/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0
目录下
查看已经配置信息sh bin/mqbroker -m
配置信息如下
2021-02-18 16\:37\:49 INFO main - namesrvAddr=
2021-02-18 16\:37\:49 INFO main - brokerIP1=172.18.0.1
2021-02-18 16\:37\:49 INFO main - brokerName=iZ2ze8twmjge9w7qf1yhyyZ
2021-02-18 16\:37\:49 INFO main - brokerClusterName=DefaultCluster
2021-02-18 16\:37\:49 INFO main - brokerId=0
2021-02-18 16\:37\:49 INFO main - autoCreateTopicEnable=true
2021-02-18 16\:37\:49 INFO main - autoCreateSubscriptionGroup=true
2021-02-18 16\:37\:49 INFO main - msgTraceTopicName=RMQ_SYS_TRACE_TOPIC
2021-02-18 16\:37\:49 INFO main - traceTopicEnable=false
2021-02-18 16\:37\:49 INFO main - rejectTransactionMessage=false
2021-02-18 16\:37\:49 INFO main - fetchNamesrvAddrByAddressServer=false
2021-02-18 16\:37\:49 INFO main - transactionTimeOut=6000
2021-02-18 16\:37\:49 INFO main - transactionCheckMax=15
2021-02-18 16\:37\:49 INFO main - transactionCheckInterval=60000
2021-02-18 16\:37\:49 INFO main - aclEnable=false
2021-02-18 16\:37\:49 INFO main - storePathRootDir=/root/store
2021-02-18 16\:37\:49 INFO main - storePathCommitLog=/root/store/commitlog
2021-02-18 16\:37\:49 INFO main - flushIntervalCommitLog=500
2021-02-18 16\:37\:49 INFO main - commitIntervalCommitLog=200
2021-02-18 16\:37\:49 INFO main - flushCommitLogTimed=false
2021-02-18 16\:37\:49 INFO main - deleteWhen=04
2021-02-18 16\:37\:49 INFO main - fileReservedTime=72
2021-02-18 16\:37\:49 INFO main - maxTransferBytesOnMessageInMemory=262144
2021-02-18 16\:37\:49 INFO main - maxTransferCountOnMessageInMemory=32
2021-02-18 16\:37\:49 INFO main - maxTransferBytesOnMessageInDisk=65536
2021-02-18 16\:37\:49 INFO main - maxTransferCountOnMessageInDisk=8
2021-02-18 16\:37\:49 INFO main - accessMessageInMemoryMaxRatio=40
2021-02-18 16\:37\:49 INFO main - messageIndexEnable=true
2021-02-18 16\:37\:49 INFO main - messageIndexSafe=false
2021-02-18 16\:37\:49 INFO main - haMasterAddress=
2021-02-18 16\:37\:49 INFO main - brokerRole=ASYNC_MASTER
2021-02-18 16\:37\:49 INFO main - flushDiskType=ASYNC_FLUSH
2021-02-18 16\:37\:49 INFO main - cleanFileForciblyEnable=true
2021-02-18 16\:37\:49 INFO main - transientStorePoolEnable=false
compressMsgBodyOverHowmuch :消息超过默认字节4096后进行压缩
retryTimesWhenSendFailed : 失败重发次数
maxMessageSize : 最大消息配置,默认128k
topicQueueNums : 主题下面的队列数量,默认是4
autoCreateTopicEnable : 是否自动创建主题Topic, 开发建议为true,生产要为false
defaultTopicQueueNums : 自动创建服务器不存在的topic,默认创建的队列数
autoCreateSubscriptionGroup: 是否允许 Broker 自动创建订阅组,建议线下开发开启,线上关闭
brokerClusterName : 集群名称
brokerId : 0表示Master主节点 大于0表示从节点
brokerIP1 : Broker服务地址
brokerRole : broker角色 ASYNC_MASTER/ SYNC_MASTER/ SLAVE
deleteWhen : 每天执行删除过期文件的时间,默认每天凌晨4点
flushDiskType :刷盘策略, 默认为 ASYNC_FLUSH(异步刷盘), 另外是SYNC_FLUSH(同步刷盘)
listenPort : Broker监听的端口号
mapedFileSizeCommitLog : 单个conmmitlog文件大小,默认是1GB
mapedFileSizeConsumeQueue:ConsumeQueue每个文件默认存30W条,可以根据项目调整
storePathRootDir : 存储消息以及一些配置信息的根目录 默认为用户的 ${HOME}/store
storePathCommitLog:commitlog存储目录默认为${storePathRootDir}/commitlog
storePathIndex: 消息索引存储路径
syncFlushTimeout : 同步刷盘超时时间
diskMaxUsedSpaceRatio : 检测可用的磁盘空间大小,超过后会写入报错(磁盘没有满却写入不进RabbitMQ是就有可能是因为这个参数,这个参数就是当磁盘还剩余百分之多少时就不允许在写入消息了)
常见错误三
控制台查看不了数据,提示连接 10909
错误,因为RocketMQ
自带VIP虚拟ip技术,这时要防火墙要开放10909
端口,才能使用
RabbitMQ核心概念
消息发送状态(返回对象中的枚举类型有4种)
生产者发送完信息返回的类型(返回类型SendResult
中的SendStatus
成员变量)
FLUSH_DISK_TIMEOUT
在指定时间没有将消息同步到磁盘中(刷盘策略需要为SYNC_FLUSH
才会出这个错误)
例如:在cpu爆满时候导致没有刷盘成功
FLUSH_SLAVE_TIMEOUT
主从模式下,broker是SYNC_MASTER
, 没有在规定时间内完成主从同步
例如:网路原因等,导致主从同步没有成功,如果是异步复制则不会出现这个问题
SLAVE_NOT_AVAILABLE
从模式下,broker是SYNC_MASTER
, 但是没有找到被配置成Slave的Broker
例如:主从模式下,所有从节点宕机,如果是异步复制则不会出现这个问题
SEND_OK
发送成功,没有发生上面的三种问题
重试次数
生产者重试
:RocketMQ中默认次数是2次,一般不是跨国调用不用修改重试次数了
重试次数设置方法如下
//设置方法如下
private DefaultMQProducer producer;
//设置同步调用重试次数
producer.setRetryTimesWhenSendFailed(0);
//设置异步调用重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);
异步发送不会重试,需自己书写代码重试
//这里会开启一个线程异步发送消息
produceProxy.getProducet().send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {//处理消息}@Overridepublic void onException(Throwable e) {//进行重试}
});
消费者重试
:默认次数16次,当网络中断、消费者报错、ack确认失败导致重试
手动设置重试次数方法如下
//1、广播方式不支持重试机制
//2、无论发送多少遍key都不变
//3、重试间隔时间默认为:`messageDelayLevel`=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h//设置为广播模式,如果是广播模式则重试次数失效//consumer.setMessageModel(MessageModel.BROADCASTING);//设置为集群模式,默认是集群模式consumer.setMessageModel(MessageModel.CLUSTERING);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);//不会变这个keys,是发送过来的String keys = messageExt.getKeys();//重试次数int reconsumeTimes = messageExt.getReconsumeTimes();try{//TODO 业务逻辑处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}catch (Exception e){if (reconsumeTimes>=3) {//TODO 记录数据库或者发送邮件给运维人员return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}});
RocketMQ发送消息三种方式
同步发送: 发送验证码,邮件通知中使用,速快快,当前线程反馈,可靠
异步发送:注册发送优惠券中使用,速快快,非前线程反馈,可靠
OneWay: 日志采集中使用,速度最快,没有反馈,相对不可靠
RocketMQ延迟消息设置
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
Message message = new Message("topicName","TagA","sendMessage");
//0等级代表不延迟
message.setDelayTimeLevel(0)
//1代表延迟一秒钟
message.setDelayTimeLevel(1)
//2代表延迟5秒钟
//1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 以此类推
message.setDelayTimeLevel(2)
消息顺序消费
全局顺序消费
:例如比特币交易过程中人民币换成美元时候按照从低到高的顺序
局部顺序消费
:例如 订单状态的消息提醒
局部顺序的使用说明:
1、RocketMQ上默认一个topic上有4个queue,顺序消费要将消息投递到同一个topic对应的同一个队列中(根据业务id取模然后投递到同一个队列上,通过MessageQueueSelector
实现类实现)
2、顺序消息暂不支持广播模式
、异步发送
方式
3、顺序消费支持多个消费者进行消费(消费者消费前会对消费的队列加锁)
4、消费者部署的节点数要小于此topic
对应的queue
的数量(消费数量会均等分给消费者,不然有的消费者收不到消息)
5、消费者单线程处理,使用MessageListenerOrderly
替代MessageListenerConcurrently
发送消息伪代码如下
//假设是订单号
Integer orderId = 1;
Message message = new Message(RocketMQConfig.orderTopicName, "TagB", ("sendMessage").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult send = produceProxy.getOrderProducer().send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {//传递过来的 orderId (唯一标识)Long queueNum = (Long) arg;//此topic上有几个queueint size = mqs.size();//判断使用哪个keylong selectNo = queueNum % size;//选择的queue数量必须小于配置的,否则会出错//返回选择第几个queuereturn mqs.get((int) selectNo);}
}, orderId);
send.getSendStatus();
接收消息伪代码如下
//这里使用MessageListenerOrderly替代MessageListenerConcurrently(使用单线程消费)
consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {MessageExt messageExt = msgs.get(0);String body = new String(messageExt.getBody(), "utf-8");System.out.println(body);return ConsumeOrderlyStatus.SUCCESS;}}
);
消费端配置参数详情
//设置组 一个项目中一个组对应一个消费者
consumer = new DefaultMQPushConsumer(RocketMQConfig.groupName);
//设置地址
consumer.setNamesrvAddr(RocketMQConfig.serverAddresses);
//设置订阅对应的topic
consumer.subscribe(RocketMQConfig.topicName, "*");
//设置默认消费队列中最后一个,默认也是这个配置
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//消费者均等消费策略 ,默认也是这个配置
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
//设置存储在本地还是远程,默认广播存储在消费者本地、其他存储在远程
//consumer.setOffsetStore(new RemoteBrokerOffsetStore());
//设置消费者最大线程数
consumer.setConsumeThreadMax(100);
//设置消费者最小线程数
consumer.setConsumeThreadMin(5);
//消费者一次从mq中拉取多少条数据
consumer.setPullBatchSize(32);
//拉取后每次消费多少条
consumer.setConsumeMessageBatchMaxSize(1);
//设置为广播模式,如果是广播模式则重试次数失效
//consumer.setMessageModel(MessageModel.BROADCASTING);
//设置为集群模式,默认是集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
tag标签
消费者可以选择消费 某个Group 中的某个 topic 中的指定Tag
1、消费者手动过滤Tag (没有用到的tag也进行传输浪费资源)
2、RocketMQ选择发送给消费者
消费模式
1、Broker获得消息然后将消息发送给消费者(消费者压力大)
2、消费者间隔去向Broker拉取(没有消息也会去拉取、间隔时间不好设置)
3、Broker和消费者之间每15秒发起一次长连接(默认)
Offset和CommitLog
CommitLog
用于存储发送的消息内容
Offset
用于存储消息存储在队列中的下标
CommitLog
默认存储位置 根目录下store/consumequeue/{topicName}/{queueid}/fileName
事务使用
集群部署
推荐同步双写、异步刷盘
志宇-RocketMQ学习相关推荐
- 志宇-RabbitMQ学习
RabbitMQ RabbitMQ安装 RabbitMQ使用 RabbitMQ发送消息步骤图 公平消费和消息可靠性传递 防止重复消费 有序消费 消息堆积怎么处理 spring集成RabbitMQ使用 ...
- 志宇-Nginx学习
Nginx nginx如何去处理一个请求 Nginx可用性探测 Nginx搭建静态资源服务器 Nginx配置https服务 Nginx流量统计 Nginx黑白名单拦截 异常兜底返回 OpenResty ...
- 志宇-Jenkins学习
Jenkins 部署Jenkins 访问Jenkins Jenkins安装插件 Jenkins配置 配置JDK 配置MAVEN 配置Git 配置邮箱 Jenkins配置GitHub Jenkins授权 ...
- 志宇-nexus学习
nexus /ˈneksəs/ 环境安装 nexus安装 nexus 使用 登录nexus 创建一个公司的私有仓库,然后添加到私服的中央仓库中 私服代理配置成阿里云镜像 maven加载setting文 ...
- 志宇-gitlib学习
gitlib gitlib是什么 gitlib的部署 1.配置yum源 2.更新本地yum缓存 3.安装GitLab社区版 4.更新配置信息 1 修改内存配置 2 修改端口配置 3 修改邮箱配置 5. ...
- RocketMQ学习笔记(7)----RocketMQ的整体架构
1. RocketMQ主要的9个模块,如图: 2. 模块介绍 1. rocketmq-common:通用的常量枚举,基类方法或者数据结构,按描述的目标来分包,通俗易懂.报名有admin,consume ...
- 2020年,RocketMQ面试题 -面试题驱动RocketMQ学习
本文是<从 0 开始带你成为消息中间件实战高手>内容总结,版权问题,特此声明 本篇文章持续更新,大概有上百道题,用这些题来驱动RocketMQ学习,在面试中也会脱颖而出!! 15 解决订单 ...
- 机器学习(周志华)学习笔记(一)
目录 学习教材 学习内容 一.绪论 1.1 基本术语 1.2 假设空间 1.3 归纳偏好 二. 模型评估与选择 2.1 经验误差 2.2 评估方法 2.3 性能度量 2.4 方差与偏差 学习时间 学习 ...
- 一周年感谢信 | 黑萤科技赖志宇:志同者将道合一处
一周年感谢信 | 黑萤科技赖志宇:志同者将道合一处 赖志宇 IPFS黑萤科技 1周前 亲爱的萤火虫: 你们好! 自2018年4月30日公司的营业执照下来,黑萤科技已满一周年. 这一年,我们经历过高光时 ...
最新文章
- 工程项目管理丁士昭第二版_2021年软考系统集成项目管理工程师知识点预习第十四章第二节...
- nginx学习七 高级数据结构之动态数组ngx_array_t
- Vim与clang-format
- C语言数据结构与算法
- Maven 连接私服资源库配置
- 剑指offer之反向打印链表值
- 牛客网笔记之JAVA运算符
- mongodb $ifNull
- mongodb 持久化 mysql_最详细的python爬虫指南(四):持久化操作(mongoDB、mysql)...
- webpack1.x环境配置与打包基础【附带各种 “坑“ 与解决方案!持续更新中...】
- 小度智能音箱维修点_小度智能音箱——联通智慧生活语音服务入口
- spark架构设计编程模型 02
- 图像相似度对比分析软件,图像相似度计算方法
- 介绍一个可以轻松下载病毒样本的数据库
- 面试相关(技术汇总)
- 这个拥有中国血统的黑客,曾将美国搅得天翻地覆
- 推荐收藏 | 掌握这些步骤,机器学习模型问题药到病除
- opencv滤镜-素描
- MATLAB房价,MATLAB实现波士顿房价预测使用BP神经网络
- Excel TEXT函数怎么把数值转换成文本