1,新建消息处理中心

package com.smartnest.ability.module.RabbitMq;import java.util.concurrent.ArrayBlockingQueue;/*** @Author LS* @Description 消息处理中心 Broker* @Date 17:00 2022/9/21 0021* @Param* @return**/
public class Broker {// 队列存储消息的最大数量private final static int MAX_SIZE = 5;// 保存消息数据的容器private static ArrayBlockingQueue messageQueue = new ArrayBlockingQueue(MAX_SIZE);/*** @Author LS* @Description 生产消息* @Date 17:02 2022/9/21 0021* @Param* @return**/public static void produce(String msg) {if (messageQueue.offer(msg)) {System.out.println("成功向消息处理中心投递消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size());} else {System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");}System.out.println("=======================");}/*** @Author LS* @Description 消费消息* @Date 17:01 2022/9/21 0021* @Param* @return**/public static String consume() {String msg = (String) messageQueue.poll();if (msg != null) {// 消费条件满足情况,从消息容器中取出一条消息System.out.println("已经消费消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size());} else {System.out.println("消息处理中心内没有消息可供消费!");}System.out.println("=======================");return msg;}}

2,建立消息处理中心服务 BrokerServer

package com.smartnest.ability.module.RabbitMq;import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;/*** @Author LS* @Description 消息处理中心服务 BrokerServer* @用于启动消息处理中心* @Date 17:13 2022/9/21 0021* @Param* @return**/
public class BrokerServer  implements Runnable {public static int SERVICE_PORT = 9999;private final Socket socket;public BrokerServer(Socket socket) {this.socket = socket;}/*** @Author LS* @Description 消息服务* @Date 17:47 2022/9/21 0021* @Param []* @return void**/@Overridepublic void run() {try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream())) {while (true) {String str = in.readLine();if (str == null) {continue;}System.out.println("接收到原始数据:" + str);if (str.equals("CONSUME")) { //CONSUME 表示要消费一条消息//从消息队列中消费一条消息String message = Broker.consume();out.println(message);out.flush();} else if (str.contains("SEND:")){//接受到的请求包含SEND:字符串 表示生产消息放到消息队列中Broker.produce(str);} else {System.out.println("原始数据:"+str+"没有遵循协议,不提供相关服务");}}} catch (Exception e) {e.printStackTrace();}}/*** @Author LS* @Description 启动服务 建立 链接* @Date 17:54 2022/9/21 0021* @Param [args]* @return void**/public static void main(String[] args) throws Exception {ServerSocket server = new ServerSocket(SERVICE_PORT);while (true) {BrokerServer brokerServer = new BrokerServer(server.accept());new Thread(brokerServer).start();}}}

3,消费者 - 消费消息 代码

package com.smartnest.ability.module.RabbitMq;/*** @Author LS* @Description 消费者* @Date 17:16 2022/9/21 0021* @Param* @return**/
public class ConsumeClient {public static void main(String[] args) throws Exception {System.out.println("---------------------------消费====消息----------------------------");for (int i = 0; i < 7; i++) {String message = MqClient.consume();System.out.println("消费者 获取的消息 = :" + message);}}
}

4,客户端 MqClient

package com.smartnest.ability.module.RabbitMq;import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;/*** @Author LS* @Description 客户端 MqClient* @访问消息队列的客户端* @Date 17:14 2022/9/21 0021* @Param* @return**/
public class MqClient {//生产消息public static void produce(String message) throws Exception {//本地的的BrokerServer.SERVICE_PORT 创建SOCKETSocket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);try (PrintWriter out = new PrintWriter(socket.getOutputStream())) {System.out.println("生产者 发布的消息 = : " + message);out.println(message);out.flush();}}//消费消息public static String consume() throws Exception {Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream())) {//先向消息队列发送命令out.println("CONSUME");out.flush();//再从消息队列获取一条消息String message = in.readLine();return message;}}}

5,生产者

package com.smartnest.ability.module.RabbitMq;/*** @Author LS* @Description 生产者* @Date 17:15 2022/9/21 0021* @Param* @return**/
public class ProduceClient {public static void main(String[] args) throws Exception {//SEND:  表示通道, 可以是不同的通道//生产  消息MqClient.produce("SEND:Hello World");MqClient.produce("SEND:Hello World1");MqClient.produce("SEND:Hello World2");MqClient.produce("SEND:Hello World3");MqClient.produce("SEND:Hello World4");MqClient.produce("SEND:Hello World5");MqClient.produce("SEND:Hello World6");}}

需要先启动:

BrokerServer

然后启动:生成消息服务

ProduceClient

最后测试运行:

ConsumeClient

控制台输出:

Connected to the target VM, address: '127.0.0.1:62545', transport: 'socket'
接收到原始数据:SEND:Hello World1
接收到原始数据:SEND:Hello World2
接收到原始数据:SEND:Hello World3
接收到原始数据:SEND:Hello World
接收到原始数据:SEND:Hello World5
接收到原始数据:SEND:Hello World6
接收到原始数据:SEND:Hello World4
成功向消息处理中心投递消息:SEND:Hello World1,当前暂存的消息数量是:3
=======================
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
=======================
成功向消息处理中心投递消息:SEND:Hello World3,当前暂存的消息数量是:3
=======================
成功向消息处理中心投递消息:SEND:Hello World5,当前暂存的消息数量是:3
=======================
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
=======================
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
=======================
=======================
Disconnected from the target VM, address: '127.0.0.1:62545', transport: 'socket'

MQ实现DEMO-入门相关推荐

  1. RabbitMQ快速入门,5个MQ的Demo示例

    RabbitMQ部署 我们在Centos7虚拟机中使用Docker来安装. 在线拉取 docker pull rabbitmq:3-management 解压 docker load -i mq.ta ...

  2. moco工具第一个Demo入门

    moco工具第一个Demo入门 1.moco概述 moco工具是在github开源的一个项目,github地址:https://github.com/dreamhead/moco. moco使用场景: ...

  3. Linux上的WebSphere MQ开发快速入门

    本文将通过用于向队列发送消息和从队列接收消息的示例应用程序说明如何在 Linux 上着手安装和配置 WebSphere MQ,以及如何为 MQ 环境开发 Java 应用程序. 引言 本文的目标是让您轻 ...

  4. 【python OCR】PaddleOCR简单demo入门

    文章目录 一.配置环境 二.下载预训练模型 三.简单代码入门 3.1 代码: 3.2 识别结果 3.3 关于识别结果的说明 PaddleOCR开源地址: https://github.com/Padd ...

  5. 用Unity做一个小Demo入门Unity

    文章目录 前言 个人介绍 一.准备工作 1. unity下载安装 2. 个人许可证激活 3. 素材下载 二.项目准备 1. 项目创建 2. 素材导入 三.开始项目 1. 将素材变为精灵 2. 将精灵放 ...

  6. Android 自定义相机Demo 入门学习

    Android 自定义相机Demo 本文是参考网上一些自定义相机示例,再结合自己对相机的功能需求写的,基本上包含了很多基本功能,比如相机对焦.闪光灯,以及在手机预览界面上绘制自己想要绘制的图案. 话不 ...

  7. python框架实例_Python之Flask框架项目Demo入门

    Flask是微型web框架,框架本身十分精简,微型并不代表其功能弱,核心代码基于Werkzeug, Jinja 2 这两个库,它以插件形式的进行功能扩展,且插件易于安装与使用,并且可以自行开发扩展插件 ...

  8. 微信小游戏_2、demo入门

    创建过程 使用微信开发工具创建微信小程序项目,在创建界面先择小游戏,输入自己的id或者使用体验id,如下图: 项目架构 小游戏使用javascript语言作为主要逻辑语言进行开发 game.js 游戏 ...

  9. MQ(二)RabbitMQ快速入门

    一.RabbitMQ 概述和安装 RabbitMQ 是基于 Erlang 语言开发的开源消息通信中间件 1. RabbitMQ的结构和概念 RabbitMQ中的几个概念: (1)channel:操作M ...

  10. ibatis轻松入门

    近日,由于公司项目应用开发的逻辑层使用的是iBatis.上网查了些资料,自己写了点demo入门.感觉良好.iBatis实在是比Hibernate很容易入门,贡献出来与各路菜鸟分享(后文附源码),希望得 ...

最新文章

  1. GitHub上README.md教程
  2. python连接字符串的join
  3. 谈谈大家对PHP框架的各种误解
  4. Docker安装和helloworld
  5. 动态规划之----最长公共子序列
  6. 大数据平台容量评估_大数据平台
  7. LeetCode 5. 最长回文子串(动态规划)
  8. Java随机生成长宽的矩形_java – 将正方形或矩形分解为大量随机大小的正方形或矩形...
  9. python coding utf-8_【转】怎么在Python里使用UTF-8编码
  10. oracle数据库创建回滚,如何重建Oracle数据库的回滚段?
  11. php 图片 byte数组,php – 将图像存储在PostgreSQL数据库的bytea字段中
  12. Docker基础学习笔记02:Docker基本操作
  13. obs多推流地址_什么都比不上动手能力,OBS 推流实践小记
  14. linux6.5禁用防火墙,Centos6.5,Centos7分别关闭selinux和防火墙
  15. linux串口导致死机,Linux系统死机情况分析与处理方案介绍
  16. 使用ExtendSim进行物流、供应链和运输建模仿真
  17. TI DSP 28335 自学之路,到此止步
  18. 【PMP】关键路径法
  19. 四季电台应用项目源码
  20. Leetcode 387. 字符串中的第一个唯一字符

热门文章

  1. 云服务器怎么搬砖,逆水寒,预约五个新区 , 新区如何搬砖,来看看你该如何搞...
  2. Mac上微信、QQ的聊天图片和记录保存在哪?如何清理或导出?
  3. matlab xk-xk-1,数学符号中conj是什么意思
  4. 计算机组成原理——输入输出设备(Input Output Equip-ment)
  5. 计算机组成原理——输入输出设备(I/O设备)
  6. Tobii5的反复无常
  7. 站长说说之SEO中跳出率是多少才正常呢
  8. jmeter实现多个请求并行执行,验证线程安全
  9. 坚持,一种可以养成的习惯
  10. 大周教你做个游戏博主,做自媒体游戏视频怎么剪辑?没时间拍视频