部署: 搭建 Apache RocketMQ 单机环境与Rocketmq-console
环境需求:
64位操作系统,建议使用Linux / Unix /
CentOs7.3
64bit JDK 1.8+
Maven 3.2.x
一、安装Maven
参考链接:
二、安装RocketMQ
1、关闭防火墙
systemctl stop firewalld.service
2、下载和构建
wget http://mirrors.hust.edu.cn/apache/rocketmq/4.2.0/rocketmq-all-4.2.0-source-release.zip
unzip rocketmq-all-4.2.0-source-release.zip
cd rocketmq-all-4.2.0
mvn -Prelease-all -DskipTests clean install -U
mv distribution/target/apache-rocketmq /usr/local/apache-rocketmq
cd /usr/local/apache-rocketmq/
编译成功的响应
3、配置rocketmq的环境变量,在/etc/profile最后添加
export ROCKETMQ_HOME=/usr/local/apache-rocketmq
export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH
4、使rocketmq的环境变量生效
source /etc/profile
三、内存问题:
注意:
启动NameServer 和Broker的时候可能会出现错误,请留意对应的日志文件。在测试环境中常见的错误是内存不足的错误,这时候可以修改NameSever和Broker的启动脚本。Xms\Mmx不小于1g。
另外:
#mqbroker.xml和mqnamesrv.xml的内存不要超过runbroker.sh 和runserver.sh的内存,不然会引起内存不够导致奔溃。
bin/mqnamesrv.xml
bin/mqbroker.xml
四、启动Name Server
1、修改runserver,默认 RocketMQ Server 内存需要很大的
vim bin/runserver.sh
--------------------------------------------------------------
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
2、启动 Name Server
#nohup来启动
nohup sh bin/mqnamesrv >/dev/null 2>&1 &
#查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log
启动成功信息如下
The Name Server boot success. serializeType=JSON
五、启动broker
1、修改runbroker,默认 RocketMQ Broker 内存需要很大的。
vim bin/runbroker.sh
--------------------------------------------------------------
JAVA_OPT="${JAVA_OPT} -server -Xms11g -Xmx1g -Xmn512m"
2、启动Broker
#nohup来启动
nohup sh bin/mqbroker -n localhost:9876 >/dev/null 2>&1 &
#查看日志
tail -f ~/logs/rocketmqlogs/broker.log
启动成功信息如下
2021-07-07 15:26:06 INFO main - Set user specified name server address: 192.168.133.116:9876
2021-07-07 15:26:06 INFO PullRequestHoldService - PullRequestHoldService service started
2021-07-07 15:26:06 INFO main - register broker to name server 192.168.133.116:9876 OK
2021-07-07 15:26:06 INFO main - The broker[env1, 192.168.133.116:10911] boot success. serializeType=JSON and name server is 192.168.133.116:9876
六、查看进程
[root@rich apache-rocketmq]# jps
3441 BrokerStartup
3606 Jps
3383 NamesrvStartup
[root@rich apache-rocketmq]# netstat -ntlp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:111 0.0.0.0:* LISTEN 1/systemd
tcp 0 0 192.168.122.1:53 0.0.0.0:* LISTEN 1270/dnsmasq
tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 991/sshd
tcp 0 0 127.0.0.1:631 0.0.0.0:* LISTEN 993/cupsd
tcp 0 0 0.0.0.0:3306 0.0.0.0:* LISTEN 1126/mysqld
tcp6 0 0 :::111 :::* LISTEN 1/systemd
tcp6 0 0 :::9876 :::* LISTEN 3383/java
tcp6 0 0 :::22 :::* LISTEN 991/sshd
tcp6 0 0 ::1:631 :::* LISTEN 993/cupsd
tcp6 0 0 :::10909 :::* LISTEN 3441/java
tcp6 0 0 :::10911 :::* LISTEN 3441/java
tcp6 0 0 :::10912 :::* LISTEN 3441/java
七、发送和收取消息
在发送和收取消息之前,我们需要告诉客户端Name Server的位置。RocketMQ有多种办法来实现,在这里我们使用最简单的环境变量 NAMESRV_ADDR
。
export NAMESRV_ADDR=localhost:9876
#生产者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
#消费者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
八、关闭服务
#关闭nameserver
[root@rich bin]# ./mqshutdown namesrv#关闭broker
[root@rich bin]# ./mqshutdown broker
九、生产者与消费者代码
(1)同步发送消息
可靠的同步传输应用于广泛的场景,如重要通知消息、短信通知、短信营销系统等。
public class SyncProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("192.168.133.117:9876");//Launch the instance.producer.start();for (int i = 0; i < 100; i++) {//Create a message instance, specifying topic, tag and message body.Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);//Call send message to deliver message to one of brokers.SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}
(2)异步发送消息
异步传输一般用于响应时间敏感的业务场景
public class AsyncProducer {public static void main(String[] args) throws Exception {//使用生产者组名称进行实例化。DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 指定服务器地址。producer.setNamesrvAddr("192.168.133.117:9876");//启动实例。producer.start();producer.setRetryTimesWhenSendAsyncFailed(0);int messageCount = 100;final CountDownLatch countDownLatch = new CountDownLatch(messageCount);for (int i = 0; i < messageCount; i++) {try {final int index = i;Message msg = new Message("Jodie_topic_1023","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}}countDownLatch.await(5, TimeUnit.SECONDS);producer.shutdown();}
}
(3)单向发送消息
单向传输用于需要中等可靠性的情况,例如日志收集。
public class OnewayProducer {public static void main(String[] args) throws Exception{//Instantiate with a producer group name.DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("192.168.133.117:9876");//Launch the instance.producer.start();for (int i = 0; i < 100; i++) {//Create a message instance, specifying topic, tag and message body.Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("wu ----- Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);//Call send message to deliver message to one of brokers.producer.sendOneway(msg);}//Wait for sending to completeThread.sleep(5000);producer.shutdown();}
}
(4)消费消息
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Specify name server addresses.consumer.setNamesrvAddr("192.168.133.117:9876");// Subscribe one more more topics to consume.consumer.subscribe("TopicTest", "*");// Register callback to execute on arrival of messages fetched from brokers.consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);byte[] bt = msgs.get(0).getBody();try {System.out.println("-------------" + new String(bt,"UTF-8")+ "------------");} catch (UnsupportedEncodingException e) {e.printStackTrace();}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//Launch the consumer instance.consumer.start();System.out.printf("Consumer Started.%n");}
}
以上代码来自rocketmq官网。
十、Rocketmq-console可视化的监控管理平台
安装流程如下:
(1)在NameServer的机器中选择一台,然后在里面执行如下命令拉取RocketMQ运维工作台的源码:
git clone https://github.com/apache/rocketmq-externals.git
(2)然后进入rocketmq-console的目录:
cd rocketmq-externals/rocketmq-console/
(3)然后进入target目录下,可以看到一个jar包,接着执行下面的命令启动工作台:
java -jar target/rocketmq-console-ng-2.0.0.jar
或者
java -jar target/rocketmq-console-ng-2.0.0.jar --server.port=8080 --rocketmq.config.namesrvAddr=127.0.0.1:9876
这里的2.0.0版本号不是固定的,第一次执行 mvn 打包操作应该是 1.0.0 ,然后多次打包版本号会有提升。
另外,如果配置文件有执行namesrvAddr的ip地址,就使用第一种执行方式,否则就选择第二种显式的去指定namesrvAddr所在的服务器地址。
这里在设置NameServer的地址时,如果有多个地址可以用分号隔开,接着就会看到工作台启动了,然后就通过浏览器访问那台机器的8080端口就可以了。
(4)布局说明
默认全英文页面,右上角有一个按钮是“ChangeLanguage” , 可以支持切换语言的,选择切换成简体中文就行了。
在这个界面里可以让你看到Broker的答题消息复制,还有各个Topic的消息负载,另外还可以选择日期要看哪一天的监控数据,都可以看到。
点击导航栏里的“集群” , 就会进入集群的一个监控界面。
这里可以看到很多的信息,包括可以看到各个Broker的分组,哪些是Master,哪些是Slave,他们各自的机器地址和端口号,还有版本号。
还有每台机器的生产小学TPS和消费消息TPS,还有消息总数。
通过这个TPS统计,就是每秒写入或者被消费的消息数量,就可以看出RocketMQ集群的TPS和并发访问量。
此外还有“消费者”、“生产者”、“消息” 等界面,都有各自的作用,这里不做赘述。
(5)注意事项
rocketmq-console是访问的服务器10911端口,所以需要每个namesrvAddr所在服务器都开发该端口,或者关闭防火墙,不然会运维平台会请求失败,拿不到MQ的Broker数据。
部署: 搭建 Apache RocketMQ 单机环境与Rocketmq-console相关推荐
- RocketMQ单机环境搭建
大家好,我是冰河~~ 今天,带来一篇搭建RocketMQ单机环境的文章,为后面的分布式事务专栏做准备.RocketMQ是阿里巴巴开源的一款高性能分布式消息中间件,有关RocketMQ的详细讲解,后面会 ...
- 在Centos 7下搭建Apache + PHP运行环境
之前都是在windows server2008下搭建PHP运行环境,用IIS+PHP或者phpstudy,或wamp.今天试了试,在linux下搭建apache+php运行环境.感觉还不错. 在Cen ...
- 阿里云apache配置php mysql_阿里云CentOS7搭建Apache+PHP+MySQL环境
最近要搭建一个阿里云的LMAP环境,选了CentOS7来做搭建. 1.Apache Centos7默认已经安装httpd服务,只是没有启动. 如果你需要全新安装,可以yum install -y ht ...
- RocketMQ单机环境搭建测试+springboot整合
1.资源下载 官网:下载 | RocketMQ 这里选择使用编译后可以直接用的 下载后解压:略 2.更改配置 主要是更改 conf/broker.conf 的配置,记得添加上下面这几行,否则消息发送失 ...
- 亿万流量消息中间件RocketMQ单机环境安装
1.环境要求 64bit OS, Linux/Unix/Mac is recommended; 64bit JDK 1.8 ; Maven 3.2.x; Git; 4g free disk for B ...
- 使用WAMP5搭建Apache+MySQL+PHP环境
目前有不少AMP(Apache\MySQL\PHP)的集成软件,可以让我们一次安装并设置好.这对于不熟悉AMP的用户来说,好处多多. 一.使用AMP集成软件的优点: 1.可避免由于缺乏AMP的知识,而 ...
- win10下搭建Apache+Mysql+PHP环境
之前在本地都是使用wampserver集成包,一键安装稍微配置下就可以了.今天到了新公司,使用公司的新电脑搭建环境,想自己分别安装Apache.Mysql和PHP,不使用集成包,于是百度查询资料.下面 ...
- windows 2012 apache php mysql_Windows Server 2012 R2搭建 Apache+PHP+MYSQL环境
环境说明: 操作系统:Windows Server 2012 R2 PHP版本:php-5.6.35-Win32-VC11-x64 下载地址:https://windows.php.net/downl ...
- windows2003 apache php mysql_Windows 2003搭建Apache PHP MySQL环境经验分享
本文所述是在windows2003上搭建,软件版本为Apache2.2.6(Win32),PHP5.2.4,MySQL5.0.45,phpMyAdmin2.11.1.Apache和MySQL需要安装, ...
最新文章
- 将 instance 连接到 first_local_net - 每天5分钟玩转 OpenStack(82)
- 我明明只是在努力工作,却被同事说成是“卷王”!!!
- 查看linux中的TCP连接数
- 成功解决Module Not Found Error : No module named mglearn
- 金融运营智能化搞不定?百度智能云有妙方
- bat判断文件是否存在_BAT面试必问题系列:JVM判断对象是否已死和四种垃圾回收算法总结...
- Gartner:2022年全球IT支出将超4万亿美元,软件增速最高
- JS判断两个日期的差或者判断两个日期的大小
- Ubuntu搭建嵌入式开发(交叉编译)环境-转
- Node.js连接mysql报加密方式错误解决方案
- HDU 5009 Paint Pearls
- sqlserver入门
- 怎么给计算机管理设置密码,如何给电脑设置密码
- 服务器系统分辨率调不了,win10系统分辨率调整显示灰色_网站服务器运行维护
- 活动效果评价模型--原理和实现(基于python)
- 什么是5G LAN 5G LAN商用爆发推动5G创新应用 提速数字转型新引擎
- python 如何提取 word 内的图片
- 基于51的MPX4115压力检测仿真
- QQ使用的一个小问题
- 关于 curl: (52) Empty reply from server 问题的一种解决方案