传输层的各种模式——ZeroMQ 库的使用 .
最近在研究 ZeroMQ 库的使用,所以在这里总结一下各种模式,以便日后拿来使用。
关于 ZeroMQ 库,我就不多介绍了,大家可以参考下面一些文章,以及他的官网、使用指南、API 参考、项目仓库等内容。
开源点评:ZeroMQ简介
ZeroMQ的学习和研究
ZeroMQ 的模式
ZeroMQ 的目标是成为 OSI 模型的传输层(Transport Layer)的标准协议,所以他支持各种操作系统,支持多达30种以上的语言,是跨平台、跨语言的传输层库,而且性能是其绝对优势。所以对于进程间通信、节点间通信均可以使用他,可以用他直接替代 socket 的操作。而且用过之后就知道,他用起来非常的简单,学习成本很低,我只用了 1 天时间就把他的 3 种常用模式用 Python 实现了,代码在这里,大家可以参考一下,接下来准备用 C++ 再实现一遍。
ZeroMQ 总结的通信模式如下:
- PUB and SUB
- REQ and REP
- REQ and ROUTER
- DEALER and REP
- DEALER and ROUTER
- DEALER and DEALER
- ROUTER and ROUTER
- PUSH and PULL
- PAIR and PAIR
ZeroMQ 总结的应用模式如下:
- Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
- Publish-subscribe, which connects a set of publishers to a set of subscribers. This is a data distribution pattern.
- Pipeline, which connects nodes in a fan-out / fan-in pattern that can have multiple steps, and loops. This is a parallel task distribution and collection pattern.
当然,实际使用中还得了解一些解决具体问题的模式,所以下面把使用指南中的一些模式整理如下,方便自己日后拿来使用。
最常用的三种模式:
1. Request-Reply
服务器端代码:
package zmq20140508;import org.zeromq.ZMQ;
/*** * @author lijianhua**/public class ZMQServer {public static void main(String[] args) {Client client =new Client();client.start();}
public static class Client extends Thread{private String url_worker="inproc://workers";private String url_client="tcp://127.0.0.1:6666";private ZMQ.Poller poller;private ZMQ.Context context;private ZMQ.Socket clients;private ZMQ.Socket workers;@SuppressWarnings("deprecation")public void run(){context = ZMQ.context(1);clients = context.socket(ZMQ.ROUTER);clients.bind(url_client);workers = context.socket(ZMQ.DEALER);workers.bind(url_worker);for(int i=0;i<10;i++){new Worker(context,url_worker).start();}ZMQ.device(ZMQ.QUEUE, clients, workers);//开始因这个犯错,搞了半天poller=new ZMQ.Poller(2);//创建一个大小为2的poller//分别将上述的pull注册到poller上,注册的事件是读ZMQ.PollItem citem = new ZMQ.PollItem(clients, ZMQ.Poller.POLLIN);ZMQ.PollItem witem = new ZMQ.PollItem(workers, ZMQ.Poller.POLLIN);poller.register(citem);poller.register(witem);boolean ok =true;while(ok){poller.poll(); if(poller.getItem(0).isReadable()){System.out.println("当前发送:clients");byte[] recv = clients.recv();workers.send(recv);}else{System.out.println("当前发送:workers");byte[] recv = workers.recv();clients.send(recv);}}closeAll();}public void closeAll(){clients.close();workers.close();context.term();}
}public static class Worker extends Thread{private String url_worker1;private ZMQ.Context context1;public Worker(ZMQ.Context context,String url_worker) {this.url_worker1=url_worker;this.context1=context;}public void run() {ZMQ.Socket socket = context1.socket(ZMQ.REP);socket.connect(url_worker1);ZMQ.Poller poller=new ZMQ.Poller(1);ZMQ.PollItem item = new ZMQ.PollItem(socket, ZMQ.Poller.POLLIN);poller.register(item);while (true){if(poller.poll()==ZMQ.Poller.POLLIN){byte[] recv = socket.recv();System.out.println("fix========"+new String(recv));try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}socket.send("world========");}}}}
}
客服端代码
package zmq20140508;import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;public class Request {public static void main(String[] args) {System.out.println("客户端开始.........");Context context = ZMQ.context(1);Socket resSocket = context.socket(ZMQ.REQ);//客服端发送消息
// resSocket.connect("tcp://115.236.73.253:5570");resSocket.connect("tcp://127.0.0.1:6666");int i=0;while(i<5){i++;try {resSocket.send("hello");byte[] msg = resSocket.recv();String outputStr = new String(msg);System.out.println("#### Client Receive:" + outputStr);} catch (Exception e) {e.printStackTrace();}}}}
2. Publish-Subscribe
3. Parallel Pipeline
其他模式:
A. Request-Reply Patterns
1. The Load-balancing Pattern
代码示例:
package zmq20140505;
import java.util.LinkedList;import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;public class Balance {public static class Client {public void start() {new Thread(new Runnable(){public void run() {// TODO Auto-generated method stubZMQ.Context context = ZMQ.context(1);ZMQ.Socket socket = context.socket(ZMQ.REQ);socket.connect("ipc://127.0.0.1:5555"); //连接router,想起发送请求while (!Thread.currentThread().isInterrupted()) {socket.send("hello".getBytes(), 0); //发送hello请求String bb = new String(socket.recv()); //获取返回的数据System.out.println("recv Worker : "+bb); }socket.close();context.term();}}).start();}}public static class Worker {public void start() {new Thread(new Runnable(){public void run() {// TODO Auto-generated method stubZMQ.Context context = ZMQ.context(1);ZMQ.Socket socket = context.socket(ZMQ.REQ);socket.connect("ipc://127.0.0.1:6666"); //连接,用于获取要处理的请求,并发送回去处理结果socket.send("ready".getBytes()); //发送ready,表示当前可用while (!Thread.currentThread().isInterrupted()) {ZMsg msg = ZMsg.recvMsg(socket); //获取需要处理的请求,其实这里msg最外面的标志frame是router对分配给client的标志frameZFrame request = msg.removeLast(); //最后一个frame其实保存的就是实际的请求数据,这里将其移除,待会用新的frame代替String now = new String(request.getData());System.out.println("recv Client : " + now);ZFrame frame = new ZFrame("hello fjs".getBytes()); msg.addLast(frame); //将刚刚创建的frame放到msg的最后,worker将会收到msg.send(socket); //将数据发送回去}socket.close();context.term();}}).start();}}public static class Middle {private LinkedList<ZFrame> workers;private LinkedList<ZMsg> requests;private ZMQ.Context context;private ZMQ.Poller poller;public Middle() {this.workers = new LinkedList<ZFrame>();this.requests = new LinkedList<ZMsg>();this.context = ZMQ.context(1);this.poller = new ZMQ.Poller(2);}public void start() {ZMQ.Socket fronted = this.context.socket(ZMQ.ROUTER); //创建一个router,用于接收client发送过来的请求,以及向client发送处理结果ZMQ.Socket backend = this.context.socket(ZMQ.ROUTER); //创建一个router,用于向后面的worker发送数据,然后接收处理的结果fronted.bind("ipc://127.0.0.1:5555"); //监听,等待client的连接backend.bind("ipc://127.0.0.1:6666"); //监听,等待worker连接//创建pollItemZMQ.PollItem fitem = new ZMQ.PollItem(fronted, ZMQ.Poller.POLLIN); ZMQ.PollItem bitem = new ZMQ.PollItem(backend, ZMQ.Poller.POLLIN);this.poller.register(fitem); //注册pollItemthis.poller.register(bitem);while (!Thread.currentThread().isInterrupted()) {this.poller.poll();if (fitem.isReadable()) { //表示前面有请求发过来了ZMsg msg = ZMsg.recvMsg(fitem.getSocket()); //获取client发送过来的请求,这里router会在实际请求上面套一个连接的标志frame/*** //以下3行查看中间层客服端发过来的值*/ZFrame request = msg.getLast();String now = new String(request.getData());System.out.println("fix===="+now);this.requests.addLast(msg); //将其挂到请求队列}if (bitem.isReadable()) { //这里表示worker发送数据过来了ZMsg msg = ZMsg.recvMsg(bitem.getSocket()); //获取msg,这里也会在实际发送的数据前面包装一个连接的标志frame//这里需要注意,这里返回的是最外面的那个frame,另外它还会将后面的接着的空的标志frame都去掉ZFrame workerID = msg.unwrap(); //把外面那层包装取下来,也就是router对连接的标志framethis.workers.addLast(workerID); //将当前的worker的标志frame放到worker队列里面,表示这个worker可以用了ZFrame readyOrAddress = msg.getFirst(); //这里获取标志frame后面的数据,如果worker刚刚启动,那么应该是发送过来的ready,if (new String(readyOrAddress.getData()).equals("ready")) { //表示是worker刚刚启动,发过来的readymsg.destroy();} else {msg.send(fronted); //表示是worker处理完的返回结果,那么返回给客户端}}while (this.workers.size() > 0 && this.requests.size() > 0) {ZMsg request = this.requests.removeFirst();ZFrame worker = this.workers.removeFirst();request.wrap(worker); //在request前面包装一层,把可以用的worker的标志frame包装上,这样router就会发给相应的worker的连接request.send(backend); //将这个包装过的消息发送出去}}fronted.close();backend.close();this.context.term();}}public static void main(String args[]) {Worker worker = new Worker();worker.start();Client client = new Client();client.start();Middle middle = new Middle();middle.start();}
}
2. The Asynchronous Client-Server Pattern
3. Client-side Reliability (Lazy Pirate Pattern)
4. Basic Reliable Queuing (Simple Pirate Pattern)
5. Robust Reliable Queuing (Paranoid Pirate Pattern)
6. Service-Oriented Reliable Queuing (Majordomo Pattern)
7. Disconnected Reliability (Titanic Pattern)
8. High-availability Pair (Binary Star Pattern)
9. Brokerless Reliability (Freelance Pattern)
B. Publish-Subscribe Patterns
1. Pub-sub Tracing (Espresso Pattern)
2. Slow Subscriber Detection (Suicidal Snail Pattern)
3. High-speed Subscribers (Black Box Pattern)
4. Reliable Publish-Subscribe (Clone Pattern)
C. Parallel Pipeline Patterns
传输层的各种模式——ZeroMQ 库的使用 .相关推荐
- 计算机网络题库--第五单元传输层
主要选取谢希仁第八版(板块一),学校复习资料 (一) 1. 试说明运输层在协议栈中的地位和作用,运输层的通信和网络层的通信有什么重要区别?为什么运输层是必不可少的? 答: 2.网络层提供数据报或虚电路 ...
- ensp大型网络环境设计与实现_mongodb内核源码设计实现、性能优化、最佳运维系列-网络传输层模块源码实现三...
1. 说明 在之前的<<Mongodb网络传输处理源码实现及性能调优-体验内核性能极致设计>>和<<mongodb内核源码设计实现.性能优化.最佳运维系列-tran ...
- Windows网络驱动、NDIS驱动(微端口驱动、中间层驱动、协议驱动)、TDI驱动(网络传输层过滤)、WFP(Windows Filtering Platfrom))
catalog 0.引言 1.Windows 2000网络结构和OSI模型 2.NDIS驱动 3.NDIS微端口驱动编程实例 4.NDIS中间层驱动编程实例 5.TDI驱动 6.TDI驱动 7.TDI ...
- linux内核设计与实现 epub_mongodb内核源码设计实现、性能优化、最佳运维系列-网络传输层模块源码实现四...
1. 说明 本文分析网络传输层模块中的最后一个子模块:service_executor服务运行子模块,即线程模型子模块.在阅读该文章前,请提前阅读下<<Mongodb网络传输处理源码实现及 ...
- TSL 传输层安全性协议
一. 介绍 传输层安全性协议 Transport Layer Security,TLS 及其前身安全套接层 Secure Sockets Layer,SSL是一种安全协议,目的是为互联网通信提供安全及 ...
- 计算机网络传输层课件,计算机网络技术,传输层协议课件
计算机网络技术,传输层协议课件 lufei1108@ 阿迪达斯三条纹标志是由阿迪达斯的创办人阿迪·达斯勒设计的,三条纹的阿迪达斯标志代表山区,指出实现挑战.成就未来和不断达成目标的愿望. 第4章 传输 ...
- 传输层安全---SSL
传输层安全 在传输层之上实现数据的安全传输是另一种安全解决方案.一般SSL都是可执行协议软件包的一部分,从而对应用是透明的,可以也.嵌入到特殊软件包中(IE浏览器都配置了SSL). SSL / TLS ...
- 网络编程传输层——UDP通信
何为传输层? 在物理层.数据链路层.网络层解决了主机和主机之间能够发送接收数据,但是在计算机网络中,主机的通信主体还是进程,而传输层则解决应用进程的通信,所谓传输层协议也是端对端协议. ...
- 传输层安全协议(TLS)1.2版
1.介绍 TLS协议的主要目标是在两个通信应用之间提供私密性和数据完整性.这个协议由两层组成:TLS记录协议和TLS握手协议.最低层是基于一些可靠传输协议(如TCP)的TLS记录协议.TLS记录协议提 ...
最新文章
- linux里面查看llvm的版本,linux llvm安装
- 【嵌入式】C语言中volatile关键字
- 通过8个技巧让你成为一个超强的Linux终端用户
- 130701基础练习-first
- WebMagic写的网络爬虫优秀文章
- MySQL关于Table cache设置,看这一篇就够了
- sprint会议记录
- 010 Editor逆向分析文档
- 软件测试入门理论基础
- 关于opencv打开摄像头黑屏的问题
- Windows64位操作环境下,eclipse使用32位JDK
- Win7下安装Ubuntu16.04成双系统
- 《动态规划》— 动态规划分类
- Linux下的clk学习
- Google 黑客搜索技巧
- AVPlayer 本地、网络视频播放相关
- 初学mysql 建表:71个案例
- java h264 sps解码,H.264(H264)解码SPS获取分辨率和帧率
- Java学习-SpringBoot
- Java游戏开发组件LGame简易测试版发布(版本号 0 1 5)
热门文章
- 如何让matlab全速运行,提高matlab代码运行效率
- 陈为浙江大学计算机学院,浙大陈为
- .NET CORE 关于void返回类型的坑
- 两个一阶节的级联型_数字信号处理-第五章数字滤波器的基本结构(new).ppt
- rackspace_Rackspace专家指导的OpenStack基础知识
- javaweb中验证码验证实现
- [Hcia]No.11 OSPF协议(一)
- 雪花算法snowflake分布式id生成原理详解,以及对解决时钟回拨问题几种方案讨论
- 极域电子教室的应用功能有多少-电子教室
- python爬虫图片加速_Python爬虫加速神器的小试