1 应用场景

当前要做的一个内容是通过好几个pub 客户端向一个 sub 服务器publish数据信息,sub服务器用于将所得到的信息解析存储到数据库,但是出现了中间丢掉一些数据包的问题,如下所示,我总记发送类了4w的数据 丢了大概8个包.

2 推测可能的原因

The downside is that we actually need all of these if we want to do reliable multicast. The ZeroMQ pub-sub pattern will lose messages arbitrarily when a subscriber is connecting, when a network failure occurs, or just if the subscriber or network can't keep up with the publisher.

(1)slower subscriber : 我设定的高水位是非常 非常的高

(2)我也没有出现 网络的faiure

(3)在建立链接到publishers的时候,连接握手成功之前就已经发布数据的情况,为了避免这个问题,我们采用了req-rep的同步的方案

(4)目前唯一的可能性就是network can't keep up with the publisher.

(思考,我的测试都是采用本机地址的tcp//127.0.0.1:6001, 有可能既要发送,又要接收,出现的网卡不能忙过来的情况.?后面进行验证)

3 中间的验证过程

(1) publisher

9个进程同时运行,向sub中发布数据....我们使用zeromq 是使用的是3.x的版本,也就是根据官方手册,其对于pub/sub 的模式来说,对于消息的过滤是发生在发送端(对于2.x版本是发生在接收端的过滤,查看zmq的版本的方式python语言的 zmq.zmq_version_info())

具体测试的代码的逻辑就是publisher 通过TCP接收数据,然后通过publisher 将数据下subscriber.

(为什么不采用更加可靠的push/pull 这种阻塞的模型,因为我们的publisher是要通过TCP接收数据的,如果说subscriber 没能及时处理我们的当前的数据的情况下,会造成publisher的tcp的部分的阻塞,造成没有办法及时接收tcp的数据包....)

如下所示,为我的测试的代码中的一个,其他的8个与此类似,其中不同进程的速度各自不尽相同,所以,就不一一贴出来.

'''
子系统自身信息:
IP:192.168.127.10
slave:10
port:5001子系统需要检测的信息
电源电压采样 value1:10 03 07 02  data crc1 crc2----registerid=07   datatype=int
电源电流采样 value1:10 03 08 04 data crc1 crc2----registerid=08   datatype=float'''import threading
import zmq
import time
import socket
import datetimeHWM_VAL = 100000*60*31*5def zmq_recv(context, url):socket = context.socket(zmq.SUB)# socket = context.socket(zmq.REP)socket.connect(url)socket.setsockopt(zmq.SUBSCRIBE, ''.encode('utf-8'))  # 接收所有消息zhanbao = 0buzhanbao = 0start_time = time.clock()while True:b = socket.recv();# socket.send(b'1')# print(b)end_time = time.clock()if len(b) == 1:# print('总计耗时',end_time-start_time)breaksize = len(b)# print(size)# if end_time-start_time > 10:#     pass#     breakif size > 10:zhanbao = zhanbao + 1else:buzhanbao = buzhanbao + 1print('接收不粘包', buzhanbao)print('接收粘包', zhanbao)def set_keepalive_linux(sock, after_idle_sec=1, interval_sec=3, max_fails=5):"""Set TCP keepalive on an open socket.It activates after 1 second (after_idle_sec) of idleness,then sends a keepalive ping once every 3 seconds (interval_sec),and closes the connection after 5 failed ping (max_fails), or 15 seconds"""sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, after_idle_sec)sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval_sec)sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, max_fails)return sockdef tcp_recv_zmq_send(context, sub_server_addr, syncaddr, down_computer_addr, port):# socketzmq = context.socket(zmq.PUB)# socketzmq.bind("tcp://115.156.162.76:6000")socketzmq = context.socket(zmq.PUB)socketzmq.set_hwm(HWM_VAL)socketzmq.connect(sub_server_addr)# ## #为了等待远端的电脑的sub的内容全部都连接上来。进行的延迟# time.sleep(3)# # 保证同步的另r外的一种方案就是采用req-rep的同步sync_client = context.socket(zmq.REQ)sync_client.connect(syncaddr)## 发送同步信号sync_client.send(b'')# 等待同步回应,完成同步sync_client.recv()# 为了定义一个对象线程# 创建一个socket:s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)s=set_keepalive_linux(s)# 建立连接:s.connect((down_computer_addr, port))# s.connect(('192.168.127.5', 5001))print('we have connected to the tcp data send server!---port is :', port)packagenum = 0zhanbao = 0buzhanbao = 0start_time_perf = time.perf_counter()start_time_process = time.process_time()count = 0# 实际上应当启用的市多线程来做这些事情的# 每一个线程要做的事情就是接收对应的内容# 我想epics里面做的也是基本想同样的事情  ---最后写一个自动化的脚本多线程while True:# s.setsockopt(SO_KEEPALIVE=1)#解决这个问题的方案就是通过补全那些不同长度长度数据b = s.recv(10)# print('we are receiving ', b)# print(b)# print(b)if b[5] == 115:  ##最后一个接收的数据包时ssssssssssprint('ready to exit')socketzmq.send(b)passbreak# print(len(b))# print(b)# s.send(b'i')# packagenum = packagenum + 1# print(b)size = len(b)count = count + 1# if count==10000:#     break# r.set('name',b)# f.write(str(b)+'\n')if size == 10:zhanbao = zhanbao + 1# print(size)else:buzhanbao = buzhanbao + 1timestample = str(datetime.datetime.now()).encode()b = b + timestample# print(len(b))socketzmq.send(b)  # 显然,zeromq 这句话几乎消耗了很多很多的时间# x=socketzmq.recv()print(packagenum)end_time_perf = time.perf_counter()end_time_process = time.process_time()print('the port is: ', port)# print('程序的clock time消耗: ', end_time_clock - start_time_clock)# print('程序_process', end_time_process - start_time_process)  # process time 不包含time sleep 的# print('程序执行perf_count', end_time_perf - start_time_perf)  #print('tcp接收不粘包', buzhanbao)print('tcp接收粘包', zhanbao)print('tcp接收包个数', count)socketzmq.close()s.close()if __name__ == '__main__':print('Kaishile ')context = zmq.Context()  # 这个上下文是真的迷,到底什么情况下要用共同的上下文,什么时候用单独的上下文,找时间测试清楚sub_server_addr = "tcp://115.156.162.123:6000"syncaddr = "tcp://115.156.162.76:5555"down_computer_addr = '115.156.163.107'down_computer_addr = '192.168.127.10'down_computer_addr = '127.0.0.1'sub_server_addr = "tcp://127.0.0.1:6001"# sub_server_addr = "tcp://192.168.127.100:6001"syncaddr = "tcp://127.0.0.1:5555"port = [5001, 5002, 5003, 5004, 5005, 5006, 5007, 5008, 5009, 5010]tcp_recv_zmq_send(context,sub_server_addr,syncaddr,down_computer_addr,5010)# for i in port:#     t2 = threading.Thread(target=tcp_recv_zmq_send,#                           args=(context, sub_server_addr, syncaddr, down_computer_addr, port))#     t2.start()

(2) subscriber 订阅所有的数据.

单个进程运行,收到数据然后进行记录

2020/04/27

--------------------------------------时间较晚,先记录到这-------------------

当前,我是希望subscriber 能够对我所得到的数据进行一定解析,并存储,但是现在目前还停留在数据包的接收的角度下,所以如下所示的是我代理部分的脚本,他的逻辑如下:

a 首先进行本地的worker(用来解析数据与存储的部分的逻辑)的同步,当前采用的是push 与 pull 的方案

b 然后进行与远端的publisher 的消息的订阅,以确保本身已经订阅了所有的publisher的部分

c 进行数据的接收与计数

代码如下

import zmqlocal_NUM_SUBSCRIBERS_EXPECTED = 1
remote_NUM_PUBLISHERS_EXPECTED = 3HWM_VAL = 100000*60*31*5def broker_proxy():#创建本进程使用的上下文context = zmq.Context()#建立sub 套接字以供远端的多个不同子系统的pub 进行链接使用# url =  "tcp://115.156.162.76:6000"url =  "tcp://127.0.0.1:6001"# url =  "ipc://sub_server_proxy"socketsub = context.socket(zmq.SUB)socketsub.set_hwm(HWM_VAL)socketsub.bind(url)#订阅内容设定为所有的套接字的所有的消息都要订阅socketsub.setsockopt(zmq.SUBSCRIBE,''.encode('utf-8'))#建立自身的分发系统,采用的是进程间的通信的机制,或者采用的是线程间的通信的机制socketpub = context.socket(zmq.PUB)socketpub.set_hwm(HWM_VAL)urlzmq = "tcp://127.0.0.1:6006"# urlzmq = "ipc://main"socketpub.bind(urlzmq)#connect同步自身子系统不同寄存器的订阅者,只有当自身子系统的所有的订阅者都已经发出订阅同步信号的情况下,才算完成订阅# sync_addr ='ipc://main_sync_server'# syncservice = context.socket(zmq.REP)# syncservice.bind(sync_addr)##   # Get synchronization from subscribers# subscribers = 0# while subscribers < local_NUM_SUBSCRIBERS_EXPECTED:#     # wait for synchronization request#     msg = syncservice.recv()#     # send synchronization reply#     syncservice.send(b'')#     subscribers += 1#     print("+1 subscriber (%i/%i)" % (subscribers, local_NUM_SUBSCRIBERS_EXPECTED))## print('同步了本地的订阅者')## #开始同步远端的pub端,目前是仅仅有一个同步端,实际上未来可能有很多的pub端# # ,需要我们进行同步只有当他们准备好了,也就是,我们得到了我们需要订阅的数量# #然后对面才能进行信息的发布,否则我们这边的第一层的sub就会丢失一定的消息# syncaddr = "tcp://115.156.162.76:5555"# sync_server = context.socket(zmq.REP)# sync_server.bind(syncaddr)#接收同步信号# sync_server.recv()#发送已经接收到同步信号的回应,完成同步# sync_server.send(b'')# publishers = 0# while publishers < remote_NUM_PUBLISHERS_EXPECTED:#     # wait for synchronization request#     msg = sync_server.recv()#     # send synchronization reply#     sync_server.send(b'')#     publishers += 1#     print("+1 Publisher (%i/%i)" % (publishers, remote_NUM_PUBLISHERS_EXPECTED))### print('同步了远端')import  timenumpage=0while True:response = socketsub.recv()# time.sleep(1)# response=b'hello world'numpage = numpage  + 1print(numpage)# socketpub.send(response)def broker_proxy_push():#创建本进程使用的上下文context = zmq.Context()#建立sub 套接字以供远端的多个不同子系统的pub 进行链接使用# url =  "tcp://115.156.162.76:6000"url =  "tcp://127.0.0.1:6001"# url =  "tcp://192.168.127.101:6001"# url =  "ipc://sub_server_proxy"socketsub = context.socket(zmq.SUB)socketsub.set_hwm(HWM_VAL)socketsub.bind(url)#订阅内容设定为所有的套接字的所有的消息都要订阅socketsub.setsockopt(zmq.SUBSCRIBE,''.encode('utf-8'))#建立自身的分发系统,采用的是进程间的通信的机制,或者采用的是线程间的通信的机制sender = context.socket(zmq.PUSH)sender.set_hwm(HWM_VAL)# url =  "tcp://127.0.0.1:6006"url =  "ipc://main"sender.bind(url)#connect同步自身子系统不同寄存器的订阅者,只有当自身子系统的所有的订阅者都已经发出订阅同步信号的情况下,才算完成订阅sync_addr ='ipc://main_sync_server'syncservice = context.socket(zmq.REP)syncservice.bind(sync_addr)# Get synchronization from subscriberssubscribers = 0while subscribers < local_NUM_SUBSCRIBERS_EXPECTED:# wait for synchronization requestmsg = syncservice.recv()# send synchronization replysyncservice.send(b'')subscribers += 1print("+1 subscriber (%i/%i)" % (subscribers, local_NUM_SUBSCRIBERS_EXPECTED))## print('同步了本地的订阅者')## #开始同步远端的pub端,目前是仅仅有一个同步端,实际上未来可能有很多的pub端# # ,需要我们进行同步只有当他们准备好了,也就是,我们得到了我们需要订阅的数量# #然后对面才能进行信息的发布,否则我们这边的第一层的sub就会丢失一定的消息# syncaddr = "tcp://115.156.162.76:5555"syncaddr = "tcp://127.0.0.1:5555"sync_server = context.socket(zmq.REP)sync_server.bind(syncaddr)# #接收同步信号# sync_server.recv()## #发送已经接收到同步信号的回应,完成同步# sync_server.send(b'')print("we are syncing the remote punlisher")publishers = 0while publishers < remote_NUM_PUBLISHERS_EXPECTED:# wait for synchronization requestmsg = sync_server.recv()# send synchronization replysync_server.send(b'')publishers += 1print("+1 Publisher (%i/%i)" % (publishers, remote_NUM_PUBLISHERS_EXPECTED))print('同步了远端')import  timenumpage=0while True:response = socketsub.recv()# time.sleep(0.00001)# response=b'hello world'numpage = numpage  + 1# if numpage>26000:print(numpage)sender.send(response)'''这个地方为什么要做一个中介因为,如果不中介的话,1、我的subscribe 端 就要订阅所有的publisher 端2、同时,同步的情况也是相对比较麻烦----------所谓同步,是表示,publish 和 sub端平衡起来如果有中介解决1、可以解决同步的问题......对时间时间信息的处理2、其两端的部分,都可以各自变化,不会彼此受到影响
'''
if __name__=='__main__':broker_proxy_push() #不增加同步的方案,容易造成数据接收的丢失# broker_proxy()

4 验证的结果

如果我们采用单个tcp发送---->-tcp接收___publish------>subscribe,

发送速度延时间隔0.0000001s,这个延时是绝对精确的,,,总共publisher模拟发送200001个数据包,然后起能够完整的接收到数据包.

测试结果:----发送速度实际上是100k/s的速度

左上窗口TCP发送---右上的TCP接收,publisher 发布
左下subscriber- 计数------------右下pull worker处理

(2)采用进程同时进行100k/s的速度的发送因为进程的发送一个循环就要发送2个数据包,所以总计的综合的速度,400k/s的速度的接收验证...但是为了解决这种缓存的问题,我们进行了高水位的设定,能够足以满足数据的接收,.

结果如下:

验证其能够完成正常的所有的数据的接收并且不会出现数据包的丢失的情况

(3) 当采用三个进程进行数据发送的时候,(tcp发送三个进程,tcp接收_publisher 三个进程,proxy代理三个进程,作为四核的电脑,很有可能没有办法完成如此大量的cpu的并行,导致数据的接收,此处仅仅做一定的假设,我们随后进行结果的验证)

我们三个进程分别发送1000 2000 4000 个数据包,总计7003个数据包(每个进程具有1个停止数据包),结果丢失了大概7个数据包. 请注意,此时的我的tcp的每个进程的发送的速度仅仅有1k/s的速度个数.

当前怀疑是由于本机的电脑cpu的核心数不足以支撑三个进程数据发送,接收,发布,订阅,因此分散算力,分成两台电脑进行

(4)  tcp的发送,tcp的接收与publisher的发布 在电脑A,  订阅部分在电脑b(采用三发送与三接收的情况)

本地的tcp的发送与接收都为出现数据包丢失的情况的,而远端的订阅部分会出现丢失几个数据包的情况.

tcp发送--tcp接收-publisher

subsriber订阅部分

(5) 开始怀疑是不是三个发布者比较多了,subscriber跟不上,开始验证,远端发送两个订阅者的情况,这个速度两个进程分别是1k/s,分别是发送2个和4个数据,这样也即是8k/s 的数据的,此时导致的测试结果如下;

发送端

总计发送6000个数据包,加上两个停止数据包,总计6002个数据包,另外一台电脑的接收的情况是

5 \最终的结论

出现丢书数据包的原因通过上面的总结可以有以下两个:

(1) 单纯的 订阅的客户端没有办法很快的处理两个数据同时到哒的情况,也就是同一个时刻多个进程同时发布数据的时候,也就是容易出现数据丢失的情况---------------跟不上发布者的速度

(2) 由于网络的原因就是会丢失一部分的数据,在不深入底层的情况下没有办法解决.....但是在某些情况下,可以接受,我们当前的应用场景即使是1/1000的丢失率,并不会受到很大的影响......

如下图,是别人的测试结构,在100k/s 也就是大概是10us的的延时的情况下,发送32bit,大概的丢包率0.003% ,相对比较低。

------------update on 2020-04-29------------------------------------

6 解决的方案

实际上,对于这样的结果似乎基本没有解决的方案,既想要高速的多个数据快速接收,又想保证数据可靠的稳定的到达,是鱼和熊掌的关系.

我们当前的采用的架构是

(publishers)N:1(subscriber) 的方案,这样方案的灵活性在于多个publisher 可以连接(connect) 订阅者(subscriber0 , 如果系统的节点是经常动态性的变化的情况,这个时候,单个subscriber 采用bind 就可以很容易应对这种情况的出现.....但是你就不得不对高速且可靠地数据传输说再见

但是如果你的系统是相对节点不容易变化的情况,那么可以采用我说的下面的解决的方案

6.1 方案1:  1:1的方案--套接字的使用

回忆一下,为什么上面的subscriber会出现丢失数据的原因,原因就是由于有多个publisher 同时发送了消息,所以,对于这种情况的出现,我们只需要保证每个时刻都只有一个消息到达 subscriber的方案.

(1)选择1 可以扩充subscriber 的个数,来实现此方案-------------------------------->评价:这样和仅仅使用TCP的传输也就都没有区别了

(2)选择2  可以采用router 和dealer套接字,来实现,N个1:1 的方案--------->评价:这样子和TCP的方案也同样没有什么区别了

6.2  方案2 考虑系统的特点,采用额外的协调线程,对系统的数据传输进行协调,

--------但是仔细考虑下,这样的方案,,

zmq之pub/sub会出现丢包的问题的解决方案相关推荐

  1. linux 系统 UDP 丢包问题分析思路

    转自:http://cizixs.com/2018/01/13/linux-udp-packet-drop-debug?hmsr=toutiao.io&utm_medium=toutiao.i ...

  2. 云网络丢包故障定位全景指南

    作者简介:冯荣,腾讯云网络高级工程师,腾讯云网络核心开发人员. 万字长文  建议收藏 引言 本期分享一个比较常见的⽹络问题--丢包.例如我们去ping⼀个⽹站,如果能ping通,且⽹站返回信息全⾯,则 ...

  3. 深度好文:云网络丢包故障定位,看这一篇就够了~

    深度好文:云网络丢包故障定位,看这一篇就够了~ https://mp.weixin.qq.com/s/-Q1AkxUr9xzGKwUMV-FQhQ Alex 高效运维 今天 来源:本文经授权转自公众号 ...

  4. Linux系统 UDP 丢包问题分析思路和修改网卡缓存

    最近工作中遇到某个服务器应用程序 UDP 丢包,在排查过程中查阅了很多资料,我在排查过程中基本都是通过使用 tcpdump 在出现问题的各个环节上进行抓包.分析在那个环节出现问题.针对性去排查解决问题 ...

  5. Linux下网络丢包故障定位

    Linux下网络丢包故障定位 | syxdevcode博客转载: 云网络丢包故障定位全景指南 硬件网卡丢包Ring Buffer溢出 如图所示,物理介质上的数据帧到达后首先由NIC(网络适配器)读取, ...

  6. Linux 丢包分析

    最近工作中遇到某个服务器应用程序 UDP 丢包,在排查过程中查阅了很多资料,总结出来这篇文章,供更多人参考. 在开始之前,我们先用一张图解释 linux 系统接收网络报文的过程. 首先网络报文通过物理 ...

  7. UDP主要丢包原因及具体问题分析

    转载:https://www.cnblogs.com/Zhaols/p/6105926.html 一.主要丢包原因 1.接收端处理时间过长导致丢包:调用recv方法接收端收到数据后,处理数据花了一些时 ...

  8. 网络编程之网络丢包故障如何定位?如何解决?

    引言 本期分享一个比较常见的网络问题--丢包.例如我们去ping一个网站,如果能ping通,且网站返回信息全面,则说明与网站服务器的通信是畅通的,如果ping不通,或者网站返回的信息不全等,则很可能是 ...

  9. ethtool 原理介绍和解决网卡丢包排查思路(附ethtool源码下载)

    Table of Contents 1. 了解接收数据包的流程 将网卡收到的数据包转移到主机内存(NIC 与驱动交互) 通知系统内核处理(驱动与 Linux 内核交互) 2. ifconfig 解释 ...

最新文章

  1. stringbuilder 拼接语句缺失右括号_Leetcode No.22 括号生成
  2. linux入门教程(二)
  3. [caffe]深度学习之图像分类模型AlexNet解读
  4. linux怎么查端口是否被占用,LINUX中如何查看某个端口是否被占用
  5. 淘系的音视频编辑方案:非线性编辑引擎
  6. WeihanLi.Npoi 支持 ShadowProperty 了
  7. 用URLGather来管理和保存你的页面
  8. 李代数(Lie algebra)有哪些应用
  9. HTTPS协议详解:TLS/SSL握手过程
  10. Spring+SpringMVC+mybatis+Quartz整合
  11. mybatis的mysql分页_使用MyBatis+Mysql实现分页的插件PageInfo使用介绍
  12. mysql:字符串拼接
  13. 机器学习UCI数据库说明
  14. 鼎立测试软件能锁网吗,路测软件鼎利通信Pioneer操作培训PPT(ET128).ppt
  15. java做橡皮擦效果_Android自定义橡皮擦效果
  16. GPGPU数学基础教程
  17. qiankun加载react子应用报错[import-html-entry]: error occurs while executing normal script
  18. 关于C与C++的区别
  19. Target: x86_64-linux-gnu下实现gcc -m32
  20. C语言克鲁斯卡尔算法的实现

热门文章

  1. android类加载
  2. 随机森林的随机性体现和优缺点
  3. 齿轮函数怎么用c语言打出来,C语言 齿轮基本参数的计算程序
  4. Dmc雷赛板卡仿写(一):exe新建和库文件的添加
  5. HTML服务器发送事件
  6. Python爬虫之如何爬取抖音小姐姐的视频
  7. debian 开发版 移植_那些PC市场里取得成功的游戏,却在移植手游时吃瘪了
  8. 【一牛网科大讯飞】AIUI评估板免费赠?讯飞评测活动告诉你:有戏!
  9. 七牛云申请空间及配置CDN加速
  10. c语言程序设计基础课后习题答案,2011级C语言程序设计基础教程课后习题答案