最近在研究 ZeroMQ 库的使用,所以在这里总结一下各种模式,以便日后拿来使用。

关于 ZeroMQ 库,我就不多介绍了,大家可以参考下面一些文章,以及他的官网、使用指南、API 参考、项目仓库等内容。

开源点评:ZeroMQ简介

ZeroMQ的学习和研究

ZeroMQ 的模式

ZeroMQ 的目标是成为 OSI 模型的传输层(Transport Layer)的标准协议,所以他支持各种操作系统,支持多达30种以上的语言,是跨平台、跨语言的传输层库,而且性能是其绝对优势。所以对于进程间通信、节点间通信均可以使用他,可以用他直接替代 socket 的操作。而且用过之后就知道,他用起来非常的简单,学习成本很低,我只用了 1 天时间就把他的 3 种常用模式用 Python 实现了,代码在这里,大家可以参考一下,接下来准备用 C++ 再实现一遍。

ZeroMQ 总结的通信模式如下:

  • PUB and SUB
  • REQ and REP
  • REQ and ROUTER
  • DEALER and REP
  • DEALER and ROUTER
  • DEALER and DEALER
  • ROUTER and ROUTER
  • PUSH and PULL
  • PAIR and PAIR

ZeroMQ 总结的应用模式如下:

  • Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
  • Publish-subscribe, which connects a set of publishers to a set of subscribers. This is a data distribution pattern.
  • Pipeline, which connects nodes in a fan-out / fan-in pattern that can have multiple steps, and loops. This is a parallel task distribution and collection pattern.

当然,实际使用中还得了解一些解决具体问题的模式,所以下面把使用指南中的一些模式整理如下,方便自己日后拿来使用。

最常用的三种模式:

1. Request-Reply

服务器端代码:

package zmq20140508;import org.zeromq.ZMQ;
/*** * @author lijianhua**/public class ZMQServer {public static void main(String[] args) {Client client =new Client();client.start();}
public static class Client extends Thread{private String url_worker="inproc://workers";private String url_client="tcp://127.0.0.1:6666";private ZMQ.Poller poller;private ZMQ.Context context;private ZMQ.Socket clients;private ZMQ.Socket workers;@SuppressWarnings("deprecation")public void run(){context = ZMQ.context(1);clients = context.socket(ZMQ.ROUTER);clients.bind(url_client);workers = context.socket(ZMQ.DEALER);workers.bind(url_worker);for(int i=0;i<10;i++){new Worker(context,url_worker).start();}ZMQ.device(ZMQ.QUEUE, clients, workers);//开始因这个犯错,搞了半天poller=new ZMQ.Poller(2);//创建一个大小为2的poller//分别将上述的pull注册到poller上,注册的事件是读ZMQ.PollItem citem = new ZMQ.PollItem(clients, ZMQ.Poller.POLLIN);ZMQ.PollItem witem = new ZMQ.PollItem(workers, ZMQ.Poller.POLLIN);poller.register(citem);poller.register(witem);boolean ok =true;while(ok){poller.poll(); if(poller.getItem(0).isReadable()){System.out.println("当前发送:clients");byte[] recv = clients.recv();workers.send(recv);}else{System.out.println("当前发送:workers");byte[] recv = workers.recv();clients.send(recv);}}closeAll();}public void closeAll(){clients.close();workers.close();context.term();}
}public static class Worker extends Thread{private String url_worker1;private ZMQ.Context context1;public Worker(ZMQ.Context context,String url_worker) {this.url_worker1=url_worker;this.context1=context;}public void run() {ZMQ.Socket socket = context1.socket(ZMQ.REP);socket.connect(url_worker1);ZMQ.Poller poller=new ZMQ.Poller(1);ZMQ.PollItem item = new ZMQ.PollItem(socket, ZMQ.Poller.POLLIN);poller.register(item);while (true){if(poller.poll()==ZMQ.Poller.POLLIN){byte[] recv = socket.recv();System.out.println("fix========"+new String(recv));try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}socket.send("world========");}}}}
}

客服端代码

package zmq20140508;import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;public class Request {public static void main(String[] args) {System.out.println("客户端开始.........");Context context = ZMQ.context(1);Socket resSocket = context.socket(ZMQ.REQ);//客服端发送消息
//      resSocket.connect("tcp://115.236.73.253:5570");resSocket.connect("tcp://127.0.0.1:6666");int i=0;while(i<5){i++;try {resSocket.send("hello");byte[] msg = resSocket.recv();String outputStr = new String(msg);System.out.println("#### Client Receive:" + outputStr);} catch (Exception e) {e.printStackTrace();}}}}

2. Publish-Subscribe

3. Parallel Pipeline

其他模式:

其他模式只是上面三种模式的加强而已,所以也是分为三大类。

A. Request-Reply Patterns

1. The Load-balancing Pattern

代码示例:

package zmq20140505;
import java.util.LinkedList;import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;public class Balance {public static class Client {public void start() {new Thread(new Runnable(){public void run() {// TODO Auto-generated method stubZMQ.Context context = ZMQ.context(1);ZMQ.Socket socket = context.socket(ZMQ.REQ);socket.connect("ipc://127.0.0.1:5555");  //连接router,想起发送请求while (!Thread.currentThread().isInterrupted()) {socket.send("hello".getBytes(), 0);  //发送hello请求String bb = new String(socket.recv());  //获取返回的数据System.out.println("recv Worker : "+bb); }socket.close();context.term();}}).start();}}public static class Worker {public void start() {new Thread(new Runnable(){public void run() {// TODO Auto-generated method stubZMQ.Context context = ZMQ.context(1);ZMQ.Socket socket = context.socket(ZMQ.REQ);socket.connect("ipc://127.0.0.1:6666");  //连接,用于获取要处理的请求,并发送回去处理结果socket.send("ready".getBytes());  //发送ready,表示当前可用while (!Thread.currentThread().isInterrupted()) {ZMsg msg = ZMsg.recvMsg(socket);  //获取需要处理的请求,其实这里msg最外面的标志frame是router对分配给client的标志frameZFrame request = msg.removeLast();   //最后一个frame其实保存的就是实际的请求数据,这里将其移除,待会用新的frame代替String now = new String(request.getData());System.out.println("recv Client : " + now);ZFrame frame = new ZFrame("hello fjs".getBytes());  msg.addLast(frame);  //将刚刚创建的frame放到msg的最后,worker将会收到msg.send(socket);  //将数据发送回去}socket.close();context.term();}}).start();}}public static class Middle {private LinkedList<ZFrame> workers;private LinkedList<ZMsg> requests;private ZMQ.Context context;private ZMQ.Poller poller;public Middle() {this.workers = new LinkedList<ZFrame>();this.requests = new LinkedList<ZMsg>();this.context = ZMQ.context(1);this.poller = new ZMQ.Poller(2);}public void start() {ZMQ.Socket fronted = this.context.socket(ZMQ.ROUTER);  //创建一个router,用于接收client发送过来的请求,以及向client发送处理结果ZMQ.Socket backend = this.context.socket(ZMQ.ROUTER);  //创建一个router,用于向后面的worker发送数据,然后接收处理的结果fronted.bind("ipc://127.0.0.1:5555");  //监听,等待client的连接backend.bind("ipc://127.0.0.1:6666");  //监听,等待worker连接//创建pollItemZMQ.PollItem fitem = new ZMQ.PollItem(fronted, ZMQ.Poller.POLLIN);  ZMQ.PollItem bitem = new ZMQ.PollItem(backend, ZMQ.Poller.POLLIN);this.poller.register(fitem);  //注册pollItemthis.poller.register(bitem);while (!Thread.currentThread().isInterrupted()) {this.poller.poll();if (fitem.isReadable()) {  //表示前面有请求发过来了ZMsg msg = ZMsg.recvMsg(fitem.getSocket());  //获取client发送过来的请求,这里router会在实际请求上面套一个连接的标志frame/*** //以下3行查看中间层客服端发过来的值*/ZFrame request = msg.getLast();String now = new String(request.getData());System.out.println("fix===="+now);this.requests.addLast(msg);   //将其挂到请求队列}if (bitem.isReadable()) {  //这里表示worker发送数据过来了ZMsg msg = ZMsg.recvMsg(bitem.getSocket());  //获取msg,这里也会在实际发送的数据前面包装一个连接的标志frame//这里需要注意,这里返回的是最外面的那个frame,另外它还会将后面的接着的空的标志frame都去掉ZFrame workerID = msg.unwrap();  //把外面那层包装取下来,也就是router对连接的标志framethis.workers.addLast(workerID);  //将当前的worker的标志frame放到worker队列里面,表示这个worker可以用了ZFrame readyOrAddress = msg.getFirst(); //这里获取标志frame后面的数据,如果worker刚刚启动,那么应该是发送过来的ready,if (new String(readyOrAddress.getData()).equals("ready")) {  //表示是worker刚刚启动,发过来的readymsg.destroy();} else {msg.send(fronted);  //表示是worker处理完的返回结果,那么返回给客户端}}while (this.workers.size() > 0 && this.requests.size() > 0) {ZMsg request = this.requests.removeFirst();ZFrame worker = this.workers.removeFirst();request.wrap(worker);  //在request前面包装一层,把可以用的worker的标志frame包装上,这样router就会发给相应的worker的连接request.send(backend);  //将这个包装过的消息发送出去}}fronted.close();backend.close();this.context.term();}}public static void main(String args[]) {Worker worker = new Worker();worker.start();Client client = new Client();client.start();Middle middle = new Middle();middle.start();}
}

2. The Asynchronous Client-Server Pattern

3. Client-side Reliability (Lazy Pirate Pattern)

4. Basic Reliable Queuing (Simple Pirate Pattern)

5. Robust Reliable Queuing (Paranoid Pirate Pattern)

6. Service-Oriented Reliable Queuing (Majordomo Pattern)

7. Disconnected Reliability (Titanic Pattern)

8. High-availability Pair (Binary Star Pattern)

9. Brokerless Reliability (Freelance Pattern)

B. Publish-Subscribe Patterns

1. Pub-sub Tracing (Espresso Pattern)

2. Slow Subscriber Detection (Suicidal Snail Pattern)

3. High-speed Subscribers (Black Box Pattern)

4. Reliable Publish-Subscribe (Clone Pattern)

C. Parallel Pipeline Patterns

传输层的各种模式——ZeroMQ 库的使用 .相关推荐

  1. 计算机网络题库--第五单元传输层

    主要选取谢希仁第八版(板块一),学校复习资料 (一) 1. 试说明运输层在协议栈中的地位和作用,运输层的通信和网络层的通信有什么重要区别?为什么运输层是必不可少的? 答: 2.网络层提供数据报或虚电路 ...

  2. ensp大型网络环境设计与实现_mongodb内核源码设计实现、性能优化、最佳运维系列-网络传输层模块源码实现三...

    1. 说明 在之前的<<Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计>>和<<mongodb内核源码设计实现.性能优化.最佳运维系列-tran ...

  3. Windows网络驱动、NDIS驱动(微端口驱动、中间层驱动、协议驱动)、TDI驱动(网络传输层过滤)、WFP(Windows Filtering Platfrom))

    catalog 0.引言 1.Windows 2000网络结构和OSI模型 2.NDIS驱动 3.NDIS微端口驱动编程实例 4.NDIS中间层驱动编程实例 5.TDI驱动 6.TDI驱动 7.TDI ...

  4. linux内核设计与实现 epub_mongodb内核源码设计实现、性能优化、最佳运维系列-网络传输层模块源码实现四...

    1. 说明 本文分析网络传输层模块中的最后一个子模块:service_executor服务运行子模块,即线程模型子模块.在阅读该文章前,请提前阅读下<<Mongodb网络传输处理源码实现及 ...

  5. TSL 传输层安全性协议

    一. 介绍 传输层安全性协议 Transport Layer Security,TLS 及其前身安全套接层 Secure Sockets Layer,SSL是一种安全协议,目的是为互联网通信提供安全及 ...

  6. 计算机网络传输层课件,计算机网络技术,传输层协议课件

    计算机网络技术,传输层协议课件 lufei1108@ 阿迪达斯三条纹标志是由阿迪达斯的创办人阿迪·达斯勒设计的,三条纹的阿迪达斯标志代表山区,指出实现挑战.成就未来和不断达成目标的愿望. 第4章 传输 ...

  7. 传输层安全---SSL

    传输层安全 在传输层之上实现数据的安全传输是另一种安全解决方案.一般SSL都是可执行协议软件包的一部分,从而对应用是透明的,可以也.嵌入到特殊软件包中(IE浏览器都配置了SSL). SSL / TLS ...

  8. 网络编程传输层——UDP通信

    何为传输层?         在物理层.数据链路层.网络层解决了主机和主机之间能够发送接收数据,但是在计算机网络中,主机的通信主体还是进程,而传输层则解决应用进程的通信,所谓传输层协议也是端对端协议. ...

  9. 传输层安全协议(TLS)1.2版

    1.介绍 TLS协议的主要目标是在两个通信应用之间提供私密性和数据完整性.这个协议由两层组成:TLS记录协议和TLS握手协议.最低层是基于一些可靠传输协议(如TCP)的TLS记录协议.TLS记录协议提 ...

最新文章

  1. linux里面查看llvm的版本,linux llvm安装
  2. 【嵌入式】C语言中volatile关键字
  3. 通过8个技巧让你成为一个超强的Linux终端用户
  4. 130701基础练习-first
  5. WebMagic写的网络爬虫优秀文章
  6. MySQL关于Table cache设置,看这一篇就够了
  7. sprint会议记录
  8. 010 Editor逆向分析文档
  9. 软件测试入门理论基础
  10. 关于opencv打开摄像头黑屏的问题
  11. Windows64位操作环境下,eclipse使用32位JDK
  12. Win7下安装Ubuntu16.04成双系统
  13. 《动态规划》— 动态规划分类
  14. Linux下的clk学习
  15. Google 黑客搜索技巧
  16. AVPlayer 本地、网络视频播放相关
  17. 初学mysql 建表:71个案例
  18. java h264 sps解码,H.264(H264)解码SPS获取分辨率和帧率
  19. Java学习-SpringBoot
  20. Java游戏开发组件LGame简易测试版发布(版本号 0 1 5)

热门文章

  1. 如何让matlab全速运行,提高matlab代码运行效率
  2. 陈为浙江大学计算机学院,浙大陈为
  3. .NET CORE 关于void返回类型的坑
  4. 两个一阶节的级联型_数字信号处理-第五章数字滤波器的基本结构(new).ppt
  5. rackspace_Rackspace专家指导的OpenStack基础知识
  6. javaweb中验证码验证实现
  7. [Hcia]No.11 OSPF协议(一)
  8. 雪花算法snowflake分布式id生成原理详解,以及对解决时钟回拨问题几种方案讨论
  9. 极域电子教室的应用功能有多少-电子教室
  10. python爬虫图片加速_Python爬虫加速神器的小试