轻量级进程间通信ZMQ详解(CPYTHON)
目录
一:ZMQ简介
二:ZMQ的request-reply模式
三:ZMQ的pub-sub模式
一:ZMQ简介
官方介绍:ZeroMQ (also known as ØMQ, 0MQ, or zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fan-out, pub-sub, task distribution, and request-reply. It’s fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems.
大概的意思就是:zmq看上去像是一个lib库,我们调用lib库里的函数来实现socket通信,但是其支持的功能远不止如此。可利用zmq进行消息的传递,无论消息是进程间的,程序内的传递,还是以TCP,多播方式传递。你可以使用套接字构建多对多的连接模式,如扇出、发布-订阅、任务分发、请求-应答等。ZMQ的快速足以胜任集群应用产品。它的异步I/O机制让你能够构建多核应用程序,完成异步消息处理任务。ZMQ有着多语言支持,并能在几乎所有的操作系统上运行。
为什么会接触到ZMQ,工作中有个需求就是有套用C语言写的测试工具,其在后台会在运行过程中产生一系列的测试数据,现需要将测试数据用图表动态的展示出来。用的是FLASK框架,则会产生一个需求,如何将测试工具产生的数据传递到FLASK框架里去给到前端去显示。FLASK用的是PYTHON代码,最开始想到的是用RABBITMQ,虽然其功能丰富,但是RABBITMQ对于此需求有点大材小用,于是想寻求一个轻量级的消息传递组件,ZMQ正好符合此需求,事实上在开发的过程中,其大大简化了socket编程,仅仅使用几个其提供的API即可完成进程间的通信。
下面会根据上述需求给出穿刺代码
二:ZMQ的request-reply模式
使用REQ-REP套接字发送和接受消息是需要遵循一定规律的。客户端首先使用zmq_send()发送消息,再用zmq_recv()接收,如此循环(这里所谓的客户端就是发消息的一方,服务端就是接收消息回响应的一方)。此模式支持阻塞式的和非阻塞式的。非阻塞式的,客户端可以不管服务端有没有接收到消息,一直不断的发。本次需求采用非阻塞式的,因为工具测试的时候可不能因为服务端没有收到消息,不回响应而停在那。
下面同时给出C语言版本和python版本的zmq代码
C语言:
服务端
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>int main(void)
{void * context = zmq_ctx_new();void * socket = zmq_socket(context, ZMQ_REP);zmq_connect(socket, "tcp://localhost:5555");char buffer[100] = {0};while(1){int bytes = zmq_recv(socket, buffer, 100, 0);printf("recv message from client:%s\n",buffer);sleep(1);const char * replyMsg = "I am xiaoming";bytes = zmq_send(socket, replyMsg, strlen(replyMsg), 0);}zmq_close(socket);zmq_ctx_destroy(context);return 0;
}
客户端:
#include <zmq.h>
#include <stdio.h>
#include <string.h>static void s_send(void *socket, char *string)
{// 初始化一个zmq_msg_t对象, 分配的大小为string的大小zmq_msg_t msg;zmq_msg_init_size(&msg, strlen(string));memcpy(zmq_msg_data(&msg), string, strlen(string));// 发送数据//printf("send data! %s\n",msg);int rc = zmq_msg_send(&msg, socket, ZMQ_DONTWAIT);if(rc == -1){printf("send dptData faild!");}// 关闭zmq_msg_t对象zmq_msg_close(&msg);
}int main (void)
{// Socket to talk to clientsvoid *context = zmq_ctx_new ();void *requester = zmq_socket (context, ZMQ_REQ);int rc = zmq_bind (requester, "tcp://*:5555");char input[100] = {0};char buffer[100] = {0};while (1) {snprintf(input,100,"who are you?");s_send(requester,input);zmq_recv (requester, buffer, 100, 0);printf("recv message from server:%s\n",buffer);memset(input,0,100);memset(buffer,0,100);}return 0;
}
编译:
gcc zmqclient.c -o zmqclient -lzmq
gcc zmqserver.c -o zmqserver -lzmq
先起客户端,其打印如下:
send dptData faild!recv message from server:
send dptData faild!recv message from server:
send dptData faild!recv message from server:
send dptData faild!recv message from server:
send dptData faild!recv message from server:
send dptData faild!recv message from server:
recv message from server:I am xiaoming
recv message from server:I am xiaoming
recv message from server:I am xiaoming
recv message from server:I am xiaoming
recv message from server:I am xiaoming
可以看到在非阻塞的方式下,客户端起来后,也不管服务端有没有连上,先发了再说,也不会阻塞在那。
起服务端,其打印如下:
recv message from client:who are you?
recv message from client:who are you?
recv message from client:who are you?
recv message from client:who are you?
recv message from client:who are you?
recv message from client:who are you?
recv message from client:who are you?
需要特别注意的是以下两个函数,消息收发函数
int zmq_recv (void *socket, void *buf, size_t len, int flags);
int zmq_send (void *socket, void *buf, size_t len, int flags);
zmq_recv 和 zmq_send 默认都是阻塞的,可以通过flags=ZMQ_DONTWAIT参数来设置为非阻塞模式。 buf 和 len都是靠应用程序来保证的。
对于阻塞模式,zmq_recv的返回值是接收到的字节数,注意如果超过 len,后面的数据将会被截断,但返回值的长度却是原本没有被截掉的长度。 如果错误,或者在非阻塞模式下没有消息,返回-1,并设置 errno。
另外对于zmq发送消息的理解可参考以下文章:https://dongshao.blog.csdn.net/article/details/105991716
下面给出Python代码,就不做详细解释了
客户端:
import zmqcontext = zmq.Context()
socket = context.socket(zmq.REP)
socket.connect('tcp://127.0.0.1:5555')while True:message = socket.recv()print(message)response = 'server response!'socket.send(response.encode())
服务端:
import zmqcontext = zmq.Context()
socket = context.socket(zmq.REQ)
socket.bind('tcp://*:5555')while True:data = input('input your data:')socket.send(data.encode())response = socket.recv()print(response)
三:ZMQ的pub-sub模式
PUB-SUB套接字组合是异步的。客户端在一个循环体中使用recv ()接收消息,如果向SUB套接字发送消息则会报错;类似地,服务端可以不断地使用send ()发送消息,但不能再PUB套接字上使用recv ()
下面给出一个用C代码写的PUB端,即主动产生消息的一方,PYTHON写的SUB端,取消息的一方
消息生产方PUB
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <time.h>
#include <zmq.h>// 随机生成0...num-1的随机数
#define randof(num) (int) ((float) (num) * random () / (RAND_MAX + 1.0))// 将string消息格式化为zmq_meg_t对象, 然后发往socket套接字上
static int s_send(void *socket, char *string);int main()
{// 1.初始化上下文void *context = zmq_ctx_new();// 2.创建、绑定套接字void *publisher = zmq_socket(context, ZMQ_PUB);assert(publisher != NULL);// 此处我们将发布者绑定到一个tcp节点上和一个ipc节点上, 但是本案例我们只使用tcp, ipc那个只是演示说明zmq的套接字可以绑定到多个节点上int rc = zmq_bind(publisher, "tcp://*:5555");assert(rc == 0);// 3.初始化随机数发生器srandom((unsigned)time(NULL));// 4.循环发送数据while(1){// 5.随机生成邮政编码、温度、适度int zipcode, temperature, relhumidity;zipcode = randof(100000);temperature = randof(215) - 80;relhumidity = randof(50) + 10;// 6.将消息发送给所有的订阅者char update[20];sprintf(update, "%05d %d %d", zipcode, temperature, relhumidity);rc = s_send(publisher, update);assert(rc);sleep(1);}// 7.关闭套接字、销毁上下文zmq_close(publisher);zmq_ctx_destroy(context);return 0;
}static int s_send(void *socket, char *string)
{// 初始化一个zmq_msg_t对象, 分配的大小为string的大小zmq_msg_t msg;zmq_msg_init_size(&msg, strlen(string));memcpy(zmq_msg_data(&msg), string, strlen(string));// 发送数据printf("send data! %s\n",msg);int rc = zmq_msg_send(&msg, socket, 0);// 关闭zmq_msg_t对象zmq_msg_close(&msg);return rc;
}
消息消费端SUB
import zmqcontext = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:5555')
socket.setsockopt(zmq.SUBSCRIBE, ''.encode())while True:data = socket.recv()print(data)
需要注意的是:
1,这个模式是单向的,就是说 pub只能发, sub只能收,
2,sub可以注册多个pub,并且多个pub上的消息会公平的过来。
3,如果pub没有任何sub,那么消息将会被丢弃。
4,如果sub消费得比较慢,消息就会堆积在pub端,在v3.x版本里面,tcp 或 ipc的过滤是发生在publisher,而在低版本里面,所有的过滤都是发生在subscriber,这样就比较浪费流量和资源
sub 除了要创建 ZMQ_SUB 类型的socket,并连接之外,还要 调用 zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter, strlen (filter)); 来进行注册,才有效果。 其中filter用来匹配消息开头的字符串,如果匹配则接受下来,否则丢弃;但如果filter = NULL,并且长度为0的话,则表示所有的消息都接收。
publisher的第一个包经常是会被丢掉的,即便 sub端先起来,然后启动push 发送消息,刚开始的消息也有可能丢的 。因为即便是再快的网络,建立连接都是需要一些时间的,比如几个毫秒,而用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的),再结合之前说的如果publisher没有任何subscriber连上来的话消息会被丢弃。官网给了两个解决方案;1, 让发布者先不发数据,而是等订阅者真正连上之后,再发数据; 2,就是发布是永不停止的,没有开始与尽头的概念。
轻量级进程间通信ZMQ详解(CPYTHON)相关推荐
- java代码轻量级锁_Java轻量级锁原理详解(Lightweight Locking)
转自http://www.cnblogs.com/redcreen/archive/2011/03/29/1998801.html 大家知道,Java的多线程安全是基于Lock机制实现的,而Lock的 ...
- Java 轻量级锁原理详解(Lightweight Locking)
2019独角兽企业重金招聘Python工程师标准>>> 大家知道,Java的多线程安全是基于Lock机制实现的,而Lock的性能往往不如人意. 原因是,monitorenter与mo ...
- Android进程间通信 Messenger详解
1. 概念 Messenger,即进程间通信的信使.它是基于Message的进程间通信,我们可以像在线程间利用Handler.send(Message)一样. Messenger是一种轻量级的IPC方 ...
- linux程序间管道通信,linux进程间通信——管道 详解
管道是Linux中很重要的一种通信方式,是把一个程序的输出直接连接到另一个程序的输入.常说的管道多是指无名管道, 无名管道只能用于具有亲缘关系的进程之间,这是它与有名管道的最大区别. 有名管道叫nam ...
- 进程间通信(IPC)详解
目录 进程间通信的目的: 进程间通信发展 进程间通信分类 管道: 命名管道 system V 共享内存 前言: 什么是进程间通信?顾名思义,就是进程之间的通信. 进程间通信介绍: 进程间通信的目的: ...
- 4种进程间通信方式详解
进程间通信有4种方式,以下从简单到复杂的方式出场: 1.管道(pipe) 管道是一种具有两个端点的通信通道,一个管道实际上就是只存在在内存中的文件,对这个文件操作需要两个已经打开文件进行,他们 ...
- Linux_进程间通信(详解)
目录 知识点1[无名管道](了解) 1.管道 2.无名管道 没有名字标记的管道 3.无名管道的特性 4.创建无名管道 5.案例1:父进程发 子进程收 6.无名管道的特点 知识点2[文件描述符复制--- ...
- android 之Fragment(轻量级的Activity)详解
创建一个Fragment: Fragment的几个子类. (1)对话框:DialogFragment (2)列表:ListFragment (3)非选项设置:PreferenceFragment (4 ...
- 【Linux】进程间通信-信号量详解及编程实例
前面一篇文章线程同步之信号量同步 讲的是线程之间的信号量,这篇讲的更加具有通用性,能够实现进程之间的同步. 信号量概述 信号量定义: 它是一个特殊变量,只允许对它进行等待和发送信号这两种操作. P(信 ...
最新文章
- 关于博客园与CSDN博客同步的说明
- BILIBILI 高并发实时弹幕系统的实战之路 | 架构师实践日
- 【转】高性能前端3-高性能javascript
- 安卓按键精灵_[按键精灵教程]学了这个你也能做出稳定的脚本
- jvm性能调优实战 -60 线上系统的JVM参数优化、GC问题定位排查、OOM分析解决
- Visual C++ 时尚编程百例005(菜单)
- 征战蓝桥 —— 2015年第六届 —— C/C++A组第10题——灾后重建
- 【theano-windows】学习笔记二十——LSTM理论及实现
- 实现div可以调整高度(div实现resize)
- lua 调用文件中的函数调用_深入Lua:调用相关的指令
- hdu 2553(N皇后)
- Javascript交互式金融股票基金图表JavaScript Stock Chart
- 苹果开发——设置iTunesnbsp;Connec…
- swagger常用注解
- 极大似然估计法的理解
- python代码求圆锥体积_计算圆锥体积的c++程序
- PS 宏使用方法记录
- 微信聊天软件测试用例设计,微信页面测试用例_20140819
- 了解利用API接口通过网格策略的增长模式
- Tracup Talk:如何制作项目管理的甘特图?
热门文章
- 管理咨询的甲方和乙方(转)
- mysql front 图表,MySQL图表数据统计常用方法
- 【转】使用DevExpress的WebChartControl控件绘制图表(柱状图、折线图、饼图)
- a problem occurred with this webpage so it was reloaded
- 深入浅出内存管理-- 伙伴系统(buddy system)
- 三路测径仪同时测量三根铜丝的外径
- 瑞银:2030年Waymo将占自动驾驶出租车市场60% 汽车私有化可能会受到冲击
- 90后男生全款4万买房移居鹤岗
- PowerBuilder 文本控件显示提示内容
- 织梦Dedecms SEO优化技巧