原文转载自「刘悦的技术博客」https://v3u.cn/a_id_202

“表达欲”是人类成长史上的强大“源动力”,恩格斯早就直截了当地指出,处在蒙昧时代即低级阶段的人类,“以果实、坚果、根作为食物;音节清晰的语言的产生是这一时期的主要成就”。而在网络时代人们的表达欲往往更容易被满足,因为有聊天软件的存在。通常意义上,聊天大抵都基于两种形式:群聊和单聊。群聊或者群组聊天我们可以理解为聊天室,可以有人数上限,而单聊则可以认为是上限为2个人的特殊聊天室。

为了开发高质量的聊天系统,开发者应该具备客户机和服务器如何通信的基本知识。在聊天系统中,客户端可以是移动应用程序(C端)或web应用程序(B端)。客户端之间不直接通信。相反,每个客户端都连接到一个聊天服务,该服务支撑双方通信的功能。所以该服务在业务上必须支持的最基本功能:

1.能够实时接收来自其他客户端的信息。

2.能够将每条信息实时推送给收件人。

当客户端打算启动聊天时,它会使用一个或多个网络协议连接聊天服务。对于聊天服务,网络协议的选择至关重要,这里,我们选择Tornado框架内置Websocket协议的接口,简单而又方便,安装tornado6.1

pip3 install tornado==6.1

随后编写程序启动文件main.py:

import tornado.httpserver
import tornado.websocket  import tornado.ioloop  import tornado.web  import redis  import threading  import asyncio  # 用户列表
users = []  # websocket协议
class WB(tornado.websocket.WebSocketHandler):  # 跨域支持  def check_origin(self,origin):  return True  # 开启链接  def open(self):  users.append(self)  # 接收消息  def on_message(self,message):  self.write_message(message['data'])  # 断开  def on_close(self):  users.remove(self)# 建立torando实例  app = tornado.web.Application(  [  (r'/wb/',WB)  ],debug=True  )  if __name__ == '__main__':  # 声明服务器  http_server_1 = tornado.httpserver.HTTPServer(app)  # 监听端口  http_server_1.listen(8000)  # 开启事件循环  tornado.ioloop.IOLoop.instance().start()

如此,就在短时间搭建起了一套websocket协议服务,每一次有客户端发起websocket连接请求,我们都会将它添加到用户列表中,等待用户的推送或者接收信息的动作。

下面我们需要通过某种形式将消息的发送方和接收方联系起来,以达到“聊天”的目的,这里选择Redis的发布订阅模式(pubsub),以一个demo来实例说明,server.py

import redis  r = redis.Redis()
r.publish("test",'hello')

随后编写 client.py:

import redis
r = redis.Redis()
ps = r.pubsub()
ps.subscribe('test')
for item in ps.listen():   if item['type'] == 'message':  print(item['data'])

可以这么理解:订阅者(listener)负责订阅频道(channel);发送者(publisher)负责向频道(channel)发送二进制的字符串消息,然后频道收到消息时,推送给订阅者。

频道不仅可以联系发布者和订阅者,同时,也可以利用频道进行“消息隔离”,即不同频道的消息只会给订阅该频道的用户进行推送:

根据发布者订阅者逻辑,改写main.py:

import tornado.httpserver
import tornado.websocket  import tornado.ioloop  import tornado.web  import redis  import threading  import asyncio  # 用户列表
users = []  # 频道列表
channels = ["channel_1","channel_2"]  # websocket协议
class WB(tornado.websocket.WebSocketHandler):  # 跨域支持  def check_origin(self,origin):  return True  # 开启链接  def open(self):  users.append(self)  # 接收消息  def on_message(self,message):  self.write_message(message['data'])  # 断开  def on_close(self):  users.remove(self)  # 基于redis监听发布者发布消息
def redis_listener(loop):  asyncio.set_event_loop(loop)  async def listen():   r = redis.Redis(decode_responses=True)  # 声明pubsb实例  ps = r.pubsub()  # 订阅聊天室频道  ps.subscribe(["channel_1","channel_2"])  # 监听消息  for message in ps.listen():  print(message)  # 遍历链接上的用户  for user in users:  print(user)  if message["type"] == "message" and message["channel"] == user.get_cookie("channel"):  user.write_message(message["data"])  future = asyncio.gather(listen())  loop.run_until_complete(future)  # 接口  发布信息
class Msg(tornado.web.RequestHandler):  # 重写父类方法  def set_default_headers(self):  # 设置请求头信息  print("开始设置")  # 域名信息  self.set_header("Access-Control-Allow-Origin","*")  # 请求信息  self.set_header("Access-Control-Allow-Headers","x-requested-with")  # 请求方式  self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE")  # 发布信息  async def post(self):  data = self.get_argument("data",None)  channel = self.get_argument("channel","channel_1")  print(data)  # 发布  r = redis.Redis()  r.publish(channel,data)  return self.write("ok")  # 建立torando实例  app = tornado.web.Application(  [  (r'/send/',Msg),  (r'/wb/',WB)  ],debug=True  )  if __name__ == '__main__':  loop = asyncio.new_event_loop()  # 单线程启动订阅者服务  threading.Thread(target=redis_listener,args=(loop,)).start()  # 声明服务器  http_server_1 = tornado.httpserver.HTTPServer(app)  # 监听端口  http_server_1.listen(8000)  # 开启事件循环  tornado.ioloop.IOLoop.instance().start()

这里假设默认有两个频道,逻辑是这样的:由前端控制websocket链接用户选择将消息发布到那个频道上,同时每个用户通过前端cookie的设置具备频道属性,当具备频道属性的用户对该频道发布了一条消息之后,所有其他具备该频道属性的用户通过redis进行订阅后主动推送刚刚发布的消息,而频道的推送只匹配订阅该频道的用户,达到消息隔离的目的。

需要注意的一点是,通过线程启动redis订阅服务时,需要将当前的loop实例传递给协程对象,否则在订阅方法内将会获取不到websocket实例,报这个错误:

IOLoop.current() doesn't work in non-main

这是因为Tornado底层基于事件循环ioloop,而同步框架模式的Django或者Flask则没有这个问题。

下面编写前端代码,这里我们使用时下最流行的vue3.0框架,编写chat.vue:

<template>  <div>  <h1>聊天窗口</h1>  <van-tabs v-model:active="active" @click="change_channel">  <van-tab title="客服1号">  <table>  <tr v-for="item,index in msglist" :key="index">  {{ item }}  </tr>  </table>  </van-tab>  <van-tab title="客服2号">  <table>  <tr v-for="item,index in msglist" :key="index">  {{ item }}  </tr>  </table>  </van-tab>  </van-tabs>  <van-field label="聊天信息" v-model="msg" />  <van-button color="gray" @click="commit">发送</van-button>  </div>
</template>  <script>  export default {  data() {  return {  auditlist:[],  //聊天记录  msglist:[],  msg:"",  websock: null, //建立的连接  lockReconnect: false, //是否真正建立连接  timeout: 3 * 1000, //30秒一次心跳  timeoutObj: null, //外层心跳倒计时  serverTimeoutObj: null, //内层心跳检测  timeoutnum: null, //断开 重连倒计时  active:0,  channel:"channel_1"  }  },  methods:{  //切换频道  change_channel:function(){  if(this.active === 0){  this.channel = "channel_1";  var name = "channel";  var value = "channel_1";  }else{  this.channel = "channel_2";  var name = "channel";  var value = "channel_2";  }  //清空聊天记录  this.msglist = [];  var d = new Date();  d.setTime(d.getTime() + (24 * 60 * 60 * 1000));  var expires = "expires=" + d.toGMTString();  document.cookie = name + "=" + value + "; " + expires;  this.reconnect();  },  initWebSocket() {  //初始化weosocket  const wsuri = "ws://localhost:8000/wb/";  this.websock = new WebSocket(wsuri);  this.websock.onopen = this.websocketonopen;  this.websock.onmessage = this.websocketonmessage;  this.websock.onerror = this.websocketonerror;  this.websock.onclose = this.websocketclose;  },  reconnect() {  //重新连接  var that = this;  if (that.lockReconnect) {  // 是否真正建立连接  return;  }  that.lockReconnect = true;  //没连接上会一直重连,设置延迟避免请求过多  that.timeoutnum && clearTimeout(that.timeoutnum);  // 如果到了这里断开重连的倒计时还有值的话就清除掉  that.timeoutnum = setTimeout(function() {  //然后新连接  that.initWebSocket();  that.lockReconnect = false;  }, 5000);  },  reset() {  //重置心跳  var that = this;  //清除时间(清除内外两个心跳计时)  clearTimeout(that.timeoutObj);  clearTimeout(that.serverTimeoutObj);  //重启心跳  that.start();  },  start() {  //开启心跳  var self = this;  self.timeoutObj && clearTimeout(self.timeoutObj);  // 如果外层心跳倒计时存在的话,清除掉  self.serverTimeoutObj && clearTimeout(self.serverTimeoutObj);  // 如果内层心跳检测倒计时存在的话,清除掉  self.timeoutObj = setTimeout(function() {  // 重新赋值重新发送 进行心跳检测  //这里发送一个心跳,后端收到后,返回一个心跳消息,  if (self.websock.readyState == 1) {  //如果连接正常  // self.websock.send("heartCheck");  } else {  //否则重连  self.reconnect();  }  self.serverTimeoutObj = setTimeout(function() {  // 在三秒一次的心跳检测中如果某个值3秒没响应就关掉这次连接  //超时关闭  // self.websock.close();  }, self.timeout);  }, self.timeout);  // 3s一次  },  websocketonopen(e) {  //连接建立之后执行send方法发送数据  console.log("成功");  // this.websock.send("123");  // this.websocketsend(JSON.stringify(actions));  },  websocketonerror() {  //连接建立失败重连  console.log("失败");  this.initWebSocket();  },  websocketonmessage(e) {  console.log(e);  //数据接收  //const redata = JSON.parse(e.data);  const redata = e.data;  //累加  this.msglist.push(redata);  console.log(redata);  },  websocketsend(Data) {  //数据发送  this.websock.send(Data);  },  websocketclose(e) {  //关闭  this.reconnect()  console.log("断开连接", e);  },  //提交表单  commit:function(){  //发送请求  this.myaxios("http://localhost:8000/send/","post",{"data":this.msg,channel:this.channel}).then(data =>{  console.log(data);  });  },  },  mounted(){  //连接后端websocket服务  this.initWebSocket();  var d = new Date();  d.setTime(d.getTime() + (24 * 60 * 60 * 1000));  var expires = "expires=" + d.toGMTString();  document.cookie = "channel" + "=" + "channel_1" + "; " + expires;  }  }
</script>  <style scoped>  @import url("../assets/style.css");  .chatbox{  color:black;  }  .mymsg{  background-color:green;  }  </style>

这里前端在线客户端定期向状态服务器发送心跳事件。如果服务端在特定时间内(例如x秒)从客户端接收到心跳事件,则认为用户处于联机状态。否则,它将处于脱机状态,脱机后在阈值时间内可以进行重新连接的动作。同时利用vant框架的标签页可以同步切换频道,切换后将频道标识写入cookie,便于后端服务识别后匹配推送。

效果是这样的:

诚然,功能业已实现,但是如果我们处在一个高并发场景之下呢?试想一下如果一个频道有10万人同时在线,每秒有100条新消息,那么后台tornado的websocket服务推送频率是100w*10/s = 1000w/s 。

这样的系统架构如果不做负载均衡的话,很难抗住压力,那么瓶颈在哪里呢?没错,就是数据库redis,这里我们需要异步redis库aioredis的帮助:

pip3 install aioredis

aioredis通过协程异步操作redis读写,避免了io阻塞问题,使消息的发布和订阅操作非阻塞。

此时,可以新建一个异步订阅服务文件main_with_aioredis.py:

import asyncio
import aioredis
from tornado import web, websocket
from tornado.ioloop import IOLoop
import tornado.httpserver
import async_timeout

之后主要的修改逻辑是,通过aioredis异步建立redis链接,并且异步订阅多个频道,随后通过原生协程的asyncio.create_task方法(也可以使用asyncio.ensure_future)注册订阅消费的异步任务reader:

async def setup():  r = await aioredis.from_url("redis://localhost", decode_responses=True)  pubsub = r.pubsub()  print(pubsub)  await pubsub.subscribe("channel_1","channel_2")  #asyncio.ensure_future(reader(pubsub))  asyncio.create_task(reader(pubsub))

在订阅消费方法中,异步监听所订阅频道中的发布信息,同时和之前的同步方法一样,比对用户的频道属性并且进行按频道推送:

async def reader(channel: aioredis.client.PubSub):  while True:  try:  async with async_timeout.timeout(1):  message = await channel.get_message(ignore_subscribe_messages=True)  if message is not None:  print(f"(Reader) Message Received: {message}")  for user in users:  if user.get_cookie("channel") == message["channel"]:  user.write_message(message["data"])  await asyncio.sleep(0.01)  except asyncio.TimeoutError:  pass

最后,利用tornado事件循环IOLoop传递中执行回调方法,将setup方法加入到事件回调中:

if __name__ == '__main__':  # 监听端口  application.listen(8000)  loop = IOLoop.current()  loop.add_callback(setup)  loop.start()

完整的异步消息发布、订阅、推送服务改造 main_aioredis.py:

import asyncio
import aioredis
from tornado import web, websocket
from tornado.ioloop import IOLoop
import tornado.httpserver
import async_timeout  users = []  # websocket协议
class WB(tornado.websocket.WebSocketHandler):  # 跨域支持  def check_origin(self,origin):  return True  # 开启链接  def open(self):  users.append(self)  # 接收消息  def on_message(self,message):  self.write_message(message['data'])  # 断开  def on_close(self):  users.remove(self)  class Msg(web.RequestHandler):  # 重写父类方法  def set_default_headers(self):  # 设置请求头信息  print("开始设置")  # 域名信息  self.set_header("Access-Control-Allow-Origin","*")  # 请求信息  self.set_header("Access-Control-Allow-Headers","x-requested-with")  # 请求方式  self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE")  # 发布信息  async def post(self):  data = self.get_argument("data",None)  channel = self.get_argument("channel","channel_1")  print(data)  # 发布  r = await aioredis.from_url("redis://localhost", decode_responses=True)  await r.publish(channel,data)  return self.write("ok")  async def reader(channel: aioredis.client.PubSub):  while True:  try:  async with async_timeout.timeout(1):  message = await channel.get_message(ignore_subscribe_messages=True)  if message is not None:  print(f"(Reader) Message Received: {message}")  for user in users:  if user.get_cookie("channel") == message["channel"]:  user.write_message(message["data"])  await asyncio.sleep(0.01)  except asyncio.TimeoutError:  pass  async def setup():  r = await aioredis.from_url("redis://localhost", decode_responses=True)  pubsub = r.pubsub()  print(pubsub)  await pubsub.subscribe("channel_1","channel_2")  #asyncio.ensure_future(reader(pubsub))  asyncio.create_task(reader(pubsub))  application = web.Application([  (r'/send/',Msg),  (r'/wb/', WB),
],debug=True)      if __name__ == '__main__':  # 监听端口  application.listen(8000)  loop = IOLoop.current()  loop.add_callback(setup)  loop.start()

从程序设计角度上讲,充分利用了协程的异步执行思想,更加地丝滑流畅。

结语:实践操作来看,Redis发布订阅模式,非常契合这种实时(websocket)通信聊天系统的场景,但是发布的消息如果没有对应的频道或者消费者,消息则会被丢弃,假如我们在生产环境在消费的时候,突然断网,导致其中一个订阅者挂掉了一段时间,那么当它重新连接上的时候,中间这一段时间产生的消息也将不会存在,所以如果想要保证系统的健壮性,还需要其他服务来设计高可用的实时存储方案,不过那就是另外一个故事了,最后奉上项目地址,与众乡亲同飨:https://github.com/zcxey2911/tornado_redis_vue3_chatroom

原文转载自「刘悦的技术博客」 https://v3u.cn/a_id_202

把酒言欢话聊天,基于Vue3.0+Tornado6.1+Redis发布订阅(pubsub)模式打造异步非阻塞(aioredis)实时(websocket)通信聊天系统相关推荐

  1. 基于vue3.0简单的页面使用

    基于vue3.0简单的页面使用 项目效果图 项目文件图 package.json main.js App.vue views/Tutorial.vue views/TS.vue views/Docs. ...

  2. 【Vue3 造轮子项目 ------ kaite-ui】基于vue3.0 + vite + TypeScript 实现一个UI框架 - kaiteUI

    基于vue3.0 + vite + TypeScript 实现一个UI框架 - kaiteUI 前言 前段时间笔者一直忙于学习Vue3方面新知识,比如如何从vue2.0版本过渡到vue3.0,如何理解 ...

  3. 基于MFC的socket编程(异步非阻塞通信)

    对于许多初学者来说,网络通信程序的开发,普遍的一个现象就是觉得难以入手.许多概念,诸如:同步(Sync)/异步(Async),阻塞(Block)/非阻塞(Unblock)等,初学者往往迷惑不清,只知其 ...

  4. IMI 基于 Swoole 开发的协程 PHP 开发框架 常驻内存、协程异步非阻塞

    介绍 IMI 是基于 Swoole 开发的协程 PHP 开发框架,拥有常驻内存.协程异步非阻塞IO等优点. IMI 框架文档丰富,上手容易,致力于让开发者跟使用传统 MVC 框架一样顺手. IMI 框 ...

  5. 本地使用 Docker Compose 与 Nestjs 快速构建基于 Dapr 的 Redis 发布/订阅分布式应用...

    Dapr(分布式应用程序运行时)介绍 Dapr 是一个可移植的.事件驱动的运行时,它使任何开发人员能够轻松构建出弹性的.无状态和有状态的应用程序,并可运行在云平台或边缘计算中,它同时也支持多种编程语言 ...

  6. google的api key调用次数是多少_Sprint Boot如何基于Redis发布订阅实现异步消息系统的同步调用?...

    前言 在很多互联网应用系统中,请求处理异步化是提升系统性能一种常用的手段,而基于消息系统的异步处理由于具备高可靠性.高吞吐量的特点,因而在并发请求量比较高的互联网系统中被广泛应用.与此同时,这种方案也 ...

  7. mars3d基于vue3.0的widget使用

    mars3d在vue3.0生态上开发了两个gis相关开源项目 mars3d-vue-example 和 mars3d-vue-project,在这两个项目中widget都是非常重要的一个模块.通过wi ...

  8. 基于Vue3.0+ElementPlus后台精简模板

    前言 欢迎使用VUE3.0 + ElementPlus 后台管理模板 UI库文档: https://element-plus.gitee.io/#/zh-CN 该项目基于Vuecli 使用Vue3 + ...

  9. 基于Vue3.0+Springboot在线购物商城网站设计

    开发技术环境: Idea + Vscode + Mysql + Springboot + vue3.0 基于vue的购物商城网站分为前台功能和后台管理功能,前台功能主要包括基础功能模块.订单管理模块. ...

  10. 奥拉星插件flash下载手机版下载安装_终于等到你!安卓微信7.0.13内测版发布 支持夜间模式 附下载地址!...

    3月22日,iOS版微信迎来了7.0.12正式版更新,最大的亮点在于为iOS13设备加入了"深色模式"功能,虽然没有独立的控制开关,但可以跟随系统开启或关闭夜间模式.此外,iOS版 ...

最新文章

  1. Linux下matlab断点调试
  2. SQL SERVER 存储过程执行带输出参数的SQL语句拼接
  3. sysbench测试mysql性能(TPS、QPS、IOPS)(重要)
  4. TCollector
  5. HackBrowserData 一键导出 浏览器保存的登录密码、历史记录、Cookies、书签
  6. 4G换5G关口,智能手机如何抢回“失去的一个月”
  7. html中取消li的点击事件,jquery设置html li点击click事件为什么无法赋值到表单input value中呢?...
  8. android 歌词的显示不出来,网易云音乐歌词不显示怎么办 网易云显示不出歌词的解决方法...
  9. php获取银行logo,依据银行卡号获取银行信息php代码
  10. 人脸识别算法一:特征脸方法(Eigenface)
  11. 纯css绘制简易对话气泡
  12. js 中日期 转换成时间戳 例如2020-12-19 转换为时间戳
  13. php当月1号怎么获取,php获取下月1号和月底最后一天的时间
  14. 三极管驱动和MOS管驱动的区别
  15. grabcut图像分割的原理简单介绍
  16. 2020-12-15:【黑盒测试用例设计】测试方法之边界值分析法
  17. 【必看】分析各大招聘网站
  18. app store服务器网站,app store 游戏服务器
  19. Visual Studio Code插件整理大全
  20. 鸿蒙系统控制LED的实现方法之经典

热门文章

  1. 计算机网络名怎么设置,网络ssid怎么设置
  2. 计算机国培目标与计划,2018国培个人研修计划
  3. windows10 查看.theme文件
  4. qgc地面站如何导入离线地图_离线地图
  5. php datedif,datedif是什么函数
  6. JAVA正则表达式校验中国大陆手机号段【2022年2月】
  7. LORD MicroStrain 惯性传感系统
  8. 利用多线程爬点dianying回家慢慢看【python爬虫入门进阶】(05)
  9. 如何安全删除电脑上的个人信息和隐私资料
  10. 物联网关键技术————传感器技术