RocketMQ

  • RocketMQ安装
  • RocketMQ-console安装
  • RocketMQ简单使用
  • RabbitMQ核心概念
    • 消息发送状态(返回对象中的枚举类型有4种)
    • 重试次数
    • RocketMQ发送消息三种方式
    • RocketMQ延迟消息设置
    • 消息顺序消费
    • 消费端配置参数详情
    • tag标签
    • 消费模式
    • Offset和CommitLog
  • 事务使用
  • 集群部署

RocketMQ安装

官方安装方法
先后安装Name Server(起到路由功能)Broker(RocketMQ服务) 然后测试下发送和接收可用

内存不够在/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/bin目录下修改runbroker.sh tools.sh runserver.sh这三个文件中JAVA_OPT参数,修改完如下

#仅仅修改已经存在的配置即可,将4g换成256m或者128m
#runbroker.sh   broker占用的内存大小
#tools.sh       测试发送和接收工具的内存大小
#runserver.sh   路由的内存和大小
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"

RocketMQ-console安装

  1. 下载
方式一、git下载,执行如下命令
git clone https://github.com/apache/rocketmq-externals.git方式二、直接下载,访问如下地址即可
https://github.com/apache/rocketmq-externals/archive/master.zip
  1. 修改配置

找到rocketmq-console/src/main/resources/application.properties 修改配置

#console端口
server.port=17890
#name server地址
rocketmq.config.namesrvAddr=localhost:9876

找到rocketmq-console目录下的pom.xml文件,修改配置

<rocketmq.version>4.4.0XXX</rocketmq.version>
4.4.0XXX 修改为 4.4.0(你的RocketMQ版本)
<rocketmq.version>4.4.0</rocketmq.version>
  1. 编译

进入rocketmq-console目录,编译打包 mvn clean package -Dmaven.test.skip=true
进入target目录 ,启动 java -jar rocketmq-console-ng-1.0.0.jar
守护进程方式启动 nohup java -jar rocketmq-console-ng-1.0.0.jar &

RocketMQ简单使用

常见错误一

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout

解决办法
因为服务器中可能有多块网卡,rocketmq要指定公网ip
在配置文件rocketmq-all-4.8.0-source-release/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/conf/broker.conf中添加
brokerIP1=39.107.109.94此ip为公网ip
然后重新启动 ,启动命令如下:
jps 找到启动进程,kill -9 关闭进程,然后已守护进程启动
nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &

常见错误二

MQClientException: No route info of this topic, TopicTest1

解决办法
来到/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0目录下
查看已经配置信息sh bin/mqbroker -m 配置信息如下

2021-02-18 16\:37\:49 INFO main - namesrvAddr=
2021-02-18 16\:37\:49 INFO main - brokerIP1=172.18.0.1
2021-02-18 16\:37\:49 INFO main - brokerName=iZ2ze8twmjge9w7qf1yhyyZ
2021-02-18 16\:37\:49 INFO main - brokerClusterName=DefaultCluster
2021-02-18 16\:37\:49 INFO main - brokerId=0
2021-02-18 16\:37\:49 INFO main - autoCreateTopicEnable=true
2021-02-18 16\:37\:49 INFO main - autoCreateSubscriptionGroup=true
2021-02-18 16\:37\:49 INFO main - msgTraceTopicName=RMQ_SYS_TRACE_TOPIC
2021-02-18 16\:37\:49 INFO main - traceTopicEnable=false
2021-02-18 16\:37\:49 INFO main - rejectTransactionMessage=false
2021-02-18 16\:37\:49 INFO main - fetchNamesrvAddrByAddressServer=false
2021-02-18 16\:37\:49 INFO main - transactionTimeOut=6000
2021-02-18 16\:37\:49 INFO main - transactionCheckMax=15
2021-02-18 16\:37\:49 INFO main - transactionCheckInterval=60000
2021-02-18 16\:37\:49 INFO main - aclEnable=false
2021-02-18 16\:37\:49 INFO main - storePathRootDir=/root/store
2021-02-18 16\:37\:49 INFO main - storePathCommitLog=/root/store/commitlog
2021-02-18 16\:37\:49 INFO main - flushIntervalCommitLog=500
2021-02-18 16\:37\:49 INFO main - commitIntervalCommitLog=200
2021-02-18 16\:37\:49 INFO main - flushCommitLogTimed=false
2021-02-18 16\:37\:49 INFO main - deleteWhen=04
2021-02-18 16\:37\:49 INFO main - fileReservedTime=72
2021-02-18 16\:37\:49 INFO main - maxTransferBytesOnMessageInMemory=262144
2021-02-18 16\:37\:49 INFO main - maxTransferCountOnMessageInMemory=32
2021-02-18 16\:37\:49 INFO main - maxTransferBytesOnMessageInDisk=65536
2021-02-18 16\:37\:49 INFO main - maxTransferCountOnMessageInDisk=8
2021-02-18 16\:37\:49 INFO main - accessMessageInMemoryMaxRatio=40
2021-02-18 16\:37\:49 INFO main - messageIndexEnable=true
2021-02-18 16\:37\:49 INFO main - messageIndexSafe=false
2021-02-18 16\:37\:49 INFO main - haMasterAddress=
2021-02-18 16\:37\:49 INFO main - brokerRole=ASYNC_MASTER
2021-02-18 16\:37\:49 INFO main - flushDiskType=ASYNC_FLUSH
2021-02-18 16\:37\:49 INFO main - cleanFileForciblyEnable=true
2021-02-18 16\:37\:49 INFO main - transientStorePoolEnable=false

修改配置文件distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/conf/broker.confautoCreateTopicEnable=true,true 则代表可以自动创建topic,生产上要关闭,如果这个参数是true还创建不了topic可能是因为程序引入jar包的版本和RabbitMQ版本不同导致的

常用配置说明

compressMsgBodyOverHowmuch :消息超过默认字节4096后进行压缩
retryTimesWhenSendFailed : 失败重发次数
maxMessageSize : 最大消息配置,默认128k
topicQueueNums : 主题下面的队列数量,默认是4
autoCreateTopicEnable : 是否自动创建主题Topic, 开发建议为true,生产要为false
defaultTopicQueueNums : 自动创建服务器不存在的topic,默认创建的队列数
autoCreateSubscriptionGroup: 是否允许 Broker 自动创建订阅组,建议线下开发开启,线上关闭
brokerClusterName : 集群名称
brokerId : 0表示Master主节点 大于0表示从节点
brokerIP1 : Broker服务地址
brokerRole : broker角色 ASYNC_MASTER/ SYNC_MASTER/ SLAVE
deleteWhen : 每天执行删除过期文件的时间,默认每天凌晨4点
flushDiskType :刷盘策略, 默认为 ASYNC_FLUSH(异步刷盘), 另外是SYNC_FLUSH(同步刷盘)
listenPort : Broker监听的端口号
mapedFileSizeCommitLog : 单个conmmitlog文件大小,默认是1GB
mapedFileSizeConsumeQueue:ConsumeQueue每个文件默认存30W条,可以根据项目调整
storePathRootDir : 存储消息以及一些配置信息的根目录 默认为用户的 ${HOME}/store
storePathCommitLog:commitlog存储目录默认为${storePathRootDir}/commitlog
storePathIndex: 消息索引存储路径
syncFlushTimeout : 同步刷盘超时时间
diskMaxUsedSpaceRatio : 检测可用的磁盘空间大小,超过后会写入报错(磁盘没有满却写入不进RabbitMQ是就有可能是因为这个参数,这个参数就是当磁盘还剩余百分之多少时就不允许在写入消息了)

常见错误三
控制台查看不了数据,提示连接 10909错误,因为RocketMQ自带VIP虚拟ip技术,这时要防火墙要开放10909端口,才能使用

RabbitMQ核心概念

消息发送状态(返回对象中的枚举类型有4种)

生产者发送完信息返回的类型(返回类型SendResult中的SendStatus成员变量)
FLUSH_DISK_TIMEOUT
在指定时间没有将消息同步到磁盘中(刷盘策略需要为SYNC_FLUSH 才会出这个错误)
例如:在cpu爆满时候导致没有刷盘成功

FLUSH_SLAVE_TIMEOUT
主从模式下,broker是SYNC_MASTER, 没有在规定时间内完成主从同步
例如:网路原因等,导致主从同步没有成功,如果是异步复制则不会出现这个问题

SLAVE_NOT_AVAILABLE
从模式下,broker是SYNC_MASTER, 但是没有找到被配置成Slave的Broker
例如:主从模式下,所有从节点宕机,如果是异步复制则不会出现这个问题

SEND_OK
发送成功,没有发生上面的三种问题

重试次数

生产者重试:RocketMQ中默认次数是2次,一般不是跨国调用不用修改重试次数了
重试次数设置方法如下

//设置方法如下
private DefaultMQProducer producer;
//设置同步调用重试次数
producer.setRetryTimesWhenSendFailed(0);
//设置异步调用重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);

异步发送不会重试,需自己书写代码重试

 //这里会开启一个线程异步发送消息
produceProxy.getProducet().send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {//处理消息}@Overridepublic void onException(Throwable e) {//进行重试}
});

消费者重试:默认次数16次,当网络中断、消费者报错、ack确认失败导致重试
手动设置重试次数方法如下

//1、广播方式不支持重试机制
//2、无论发送多少遍key都不变
//3、重试间隔时间默认为:`messageDelayLevel`=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h//设置为广播模式,如果是广播模式则重试次数失效//consumer.setMessageModel(MessageModel.BROADCASTING);//设置为集群模式,默认是集群模式consumer.setMessageModel(MessageModel.CLUSTERING);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);//不会变这个keys,是发送过来的String keys = messageExt.getKeys();//重试次数int reconsumeTimes = messageExt.getReconsumeTimes();try{//TODO 业务逻辑处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}catch (Exception e){if (reconsumeTimes>=3) {//TODO 记录数据库或者发送邮件给运维人员return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}});

RocketMQ发送消息三种方式

同步发送: 发送验证码,邮件通知中使用,速快快,当前线程反馈,可靠
异步发送:注册发送优惠券中使用,速快快,非前线程反馈,可靠
OneWay: 日志采集中使用,速度最快,没有反馈,相对不可靠

RocketMQ延迟消息设置

开源版本不支持定时发送,只支持固定的发送时间
在源码中rocketmq-store 项目> MessageStoreConfig.java >的成员变量 messageDelayLevel中有固定的延迟时间

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

发送消息时设定延迟等级

 Message message  = new Message("topicName","TagA","sendMessage");
//0等级代表不延迟
message.setDelayTimeLevel(0)
//1代表延迟一秒钟
message.setDelayTimeLevel(1)
//2代表延迟5秒钟
//1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h  以此类推
message.setDelayTimeLevel(2)

消息顺序消费

全局顺序消费:例如比特币交易过程中人民币换成美元时候按照从低到高的顺序
局部顺序消费:例如 订单状态的消息提醒
局部顺序的使用说明:
1、RocketMQ上默认一个topic上有4个queue,顺序消费要将消息投递到同一个topic对应的同一个队列中(根据业务id取模然后投递到同一个队列上,通过MessageQueueSelector实现类实现)
2、顺序消息暂不支持广播模式异步发送方式
3、顺序消费支持多个消费者进行消费(消费者消费前会对消费的队列加锁)
4、消费者部署的节点数要小于此topic对应的queue的数量(消费数量会均等分给消费者,不然有的消费者收不到消息)
5、消费者单线程处理,使用MessageListenerOrderly替代MessageListenerConcurrently
发送消息伪代码如下

//假设是订单号
Integer orderId = 1;
Message message = new Message(RocketMQConfig.orderTopicName, "TagB", ("sendMessage").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult send = produceProxy.getOrderProducer().send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {//传递过来的 orderId (唯一标识)Long queueNum = (Long) arg;//此topic上有几个queueint size = mqs.size();//判断使用哪个keylong selectNo = queueNum % size;//选择的queue数量必须小于配置的,否则会出错//返回选择第几个queuereturn mqs.get((int) selectNo);}
}, orderId);
send.getSendStatus();

接收消息伪代码如下

//这里使用MessageListenerOrderly替代MessageListenerConcurrently(使用单线程消费)
consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {MessageExt messageExt = msgs.get(0);String body = new String(messageExt.getBody(), "utf-8");System.out.println(body);return ConsumeOrderlyStatus.SUCCESS;}}
);

消费端配置参数详情

//设置组 一个项目中一个组对应一个消费者
consumer = new DefaultMQPushConsumer(RocketMQConfig.groupName);
//设置地址
consumer.setNamesrvAddr(RocketMQConfig.serverAddresses);
//设置订阅对应的topic
consumer.subscribe(RocketMQConfig.topicName, "*");
//设置默认消费队列中最后一个,默认也是这个配置
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//消费者均等消费策略 ,默认也是这个配置
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
//设置存储在本地还是远程,默认广播存储在消费者本地、其他存储在远程
//consumer.setOffsetStore(new RemoteBrokerOffsetStore());
//设置消费者最大线程数
consumer.setConsumeThreadMax(100);
//设置消费者最小线程数
consumer.setConsumeThreadMin(5);
//消费者一次从mq中拉取多少条数据
consumer.setPullBatchSize(32);
//拉取后每次消费多少条
consumer.setConsumeMessageBatchMaxSize(1);
//设置为广播模式,如果是广播模式则重试次数失效
//consumer.setMessageModel(MessageModel.BROADCASTING);
//设置为集群模式,默认是集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);

tag标签

消费者可以选择消费 某个Group 中的某个 topic 中的指定Tag
1、消费者手动过滤Tag (没有用到的tag也进行传输浪费资源)
2、RocketMQ选择发送给消费者

消费模式

1、Broker获得消息然后将消息发送给消费者(消费者压力大)
2、消费者间隔去向Broker拉取(没有消息也会去拉取、间隔时间不好设置)
3、Broker和消费者之间每15秒发起一次长连接(默认)

Offset和CommitLog

CommitLog用于存储发送的消息内容
Offset 用于存储消息存储在队列中的下标
CommitLog默认存储位置 根目录下store/consumequeue/{topicName}/{queueid}/fileName

事务使用

集群部署

推荐同步双写、异步刷盘

志宇-RocketMQ学习相关推荐

  1. 志宇-RabbitMQ学习

    RabbitMQ RabbitMQ安装 RabbitMQ使用 RabbitMQ发送消息步骤图 公平消费和消息可靠性传递 防止重复消费 有序消费 消息堆积怎么处理 spring集成RabbitMQ使用 ...

  2. 志宇-Nginx学习

    Nginx nginx如何去处理一个请求 Nginx可用性探测 Nginx搭建静态资源服务器 Nginx配置https服务 Nginx流量统计 Nginx黑白名单拦截 异常兜底返回 OpenResty ...

  3. 志宇-Jenkins学习

    Jenkins 部署Jenkins 访问Jenkins Jenkins安装插件 Jenkins配置 配置JDK 配置MAVEN 配置Git 配置邮箱 Jenkins配置GitHub Jenkins授权 ...

  4. 志宇-nexus学习

    nexus /ˈneksəs/ 环境安装 nexus安装 nexus 使用 登录nexus 创建一个公司的私有仓库,然后添加到私服的中央仓库中 私服代理配置成阿里云镜像 maven加载setting文 ...

  5. 志宇-gitlib学习

    gitlib gitlib是什么 gitlib的部署 1.配置yum源 2.更新本地yum缓存 3.安装GitLab社区版 4.更新配置信息 1 修改内存配置 2 修改端口配置 3 修改邮箱配置 5. ...

  6. RocketMQ学习笔记(7)----RocketMQ的整体架构

    1. RocketMQ主要的9个模块,如图: 2. 模块介绍 1. rocketmq-common:通用的常量枚举,基类方法或者数据结构,按描述的目标来分包,通俗易懂.报名有admin,consume ...

  7. 2020年,RocketMQ面试题 -面试题驱动RocketMQ学习

    本文是<从 0 开始带你成为消息中间件实战高手>内容总结,版权问题,特此声明 本篇文章持续更新,大概有上百道题,用这些题来驱动RocketMQ学习,在面试中也会脱颖而出!! 15 解决订单 ...

  8. 机器学习(周志华)学习笔记(一)

    目录 学习教材 学习内容 一.绪论 1.1 基本术语 1.2 假设空间 1.3 归纳偏好 二. 模型评估与选择 2.1 经验误差 2.2 评估方法 2.3 性能度量 2.4 方差与偏差 学习时间 学习 ...

  9. 一周年感谢信 | 黑萤科技赖志宇:志同者将道合一处

    一周年感谢信 | 黑萤科技赖志宇:志同者将道合一处 赖志宇 IPFS黑萤科技 1周前 亲爱的萤火虫: 你们好! 自2018年4月30日公司的营业执照下来,黑萤科技已满一周年. 这一年,我们经历过高光时 ...

最新文章

  1. 工程项目管理丁士昭第二版_2021年软考系统集成项目管理工程师知识点预习第十四章第二节...
  2. nginx学习七 高级数据结构之动态数组ngx_array_t
  3. Vim与clang-format
  4. C语言数据结构与算法
  5. Maven 连接私服资源库配置
  6. 剑指offer之反向打印链表值
  7. 牛客网笔记之JAVA运算符
  8. mongodb $ifNull
  9. mongodb 持久化 mysql_最详细的python爬虫指南(四):持久化操作(mongoDB、mysql)...
  10. webpack1.x环境配置与打包基础【附带各种 “坑“ 与解决方案!持续更新中...】
  11. 小度智能音箱维修点_小度智能音箱——联通智慧生活语音服务入口
  12. spark架构设计编程模型 02
  13. 图像相似度对比分析软件,图像相似度计算方法
  14. 介绍一个可以轻松下载病毒样本的数据库
  15. 面试相关(技术汇总)
  16. 这个拥有中国血统的黑客,曾将美国搅得天翻地覆
  17. 推荐收藏 | 掌握这些步骤,机器学习模型问题药到病除
  18. opencv滤镜-素描
  19. MATLAB房价,MATLAB实现波士顿房价预测使用BP神经网络
  20. Excel TEXT函数怎么把数值转换成文本

热门文章

  1. 【持续更新】阿里云盘扩容码!不限速!博主已经扩容超过3TB!
  2. 计算机录音机应用程序在哪,录音机有什么作用?
  3. 我的工具箱-Web2.0
  4. 分享一些videoaudio格式
  5. 链式解决冲突散列表计算asl方法
  6. TP 结构和材料之基本概念
  7. http断点续传java_java中http断点续传的原理
  8. 网络设备应急响应指南
  9. 使用while语句与do...while语句计算多个整数的和
  10. Jakarta Commons Logging(JCL)开发手记