把酒言欢话聊天,基于Vue3.0+Tornado6.1+Redis发布订阅(pubsub)模式打造异步非阻塞(aioredis)实时(websocket)通信聊天系统
原文转载自「刘悦的技术博客」https://v3u.cn/a_id_202
“表达欲”是人类成长史上的强大“源动力”,恩格斯早就直截了当地指出,处在蒙昧时代即低级阶段的人类,“以果实、坚果、根作为食物;音节清晰的语言的产生是这一时期的主要成就”。而在网络时代人们的表达欲往往更容易被满足,因为有聊天软件的存在。通常意义上,聊天大抵都基于两种形式:群聊和单聊。群聊或者群组聊天我们可以理解为聊天室,可以有人数上限,而单聊则可以认为是上限为2个人的特殊聊天室。
为了开发高质量的聊天系统,开发者应该具备客户机和服务器如何通信的基本知识。在聊天系统中,客户端可以是移动应用程序(C端)或web应用程序(B端)。客户端之间不直接通信。相反,每个客户端都连接到一个聊天服务,该服务支撑双方通信的功能。所以该服务在业务上必须支持的最基本功能:
1.能够实时接收来自其他客户端的信息。
2.能够将每条信息实时推送给收件人。
当客户端打算启动聊天时,它会使用一个或多个网络协议连接聊天服务。对于聊天服务,网络协议的选择至关重要,这里,我们选择Tornado框架内置Websocket协议的接口,简单而又方便,安装tornado6.1
pip3 install tornado==6.1
随后编写程序启动文件main.py:
import tornado.httpserver
import tornado.websocket import tornado.ioloop import tornado.web import redis import threading import asyncio # 用户列表
users = [] # websocket协议
class WB(tornado.websocket.WebSocketHandler): # 跨域支持 def check_origin(self,origin): return True # 开启链接 def open(self): users.append(self) # 接收消息 def on_message(self,message): self.write_message(message['data']) # 断开 def on_close(self): users.remove(self)# 建立torando实例 app = tornado.web.Application( [ (r'/wb/',WB) ],debug=True ) if __name__ == '__main__': # 声明服务器 http_server_1 = tornado.httpserver.HTTPServer(app) # 监听端口 http_server_1.listen(8000) # 开启事件循环 tornado.ioloop.IOLoop.instance().start()
如此,就在短时间搭建起了一套websocket协议服务,每一次有客户端发起websocket连接请求,我们都会将它添加到用户列表中,等待用户的推送或者接收信息的动作。
下面我们需要通过某种形式将消息的发送方和接收方联系起来,以达到“聊天”的目的,这里选择Redis的发布订阅模式(pubsub),以一个demo来实例说明,server.py
import redis r = redis.Redis()
r.publish("test",'hello')
随后编写 client.py:
import redis
r = redis.Redis()
ps = r.pubsub()
ps.subscribe('test')
for item in ps.listen(): if item['type'] == 'message': print(item['data'])
可以这么理解:订阅者(listener)负责订阅频道(channel);发送者(publisher)负责向频道(channel)发送二进制的字符串消息,然后频道收到消息时,推送给订阅者。
频道不仅可以联系发布者和订阅者,同时,也可以利用频道进行“消息隔离”,即不同频道的消息只会给订阅该频道的用户进行推送:
根据发布者订阅者逻辑,改写main.py:
import tornado.httpserver
import tornado.websocket import tornado.ioloop import tornado.web import redis import threading import asyncio # 用户列表
users = [] # 频道列表
channels = ["channel_1","channel_2"] # websocket协议
class WB(tornado.websocket.WebSocketHandler): # 跨域支持 def check_origin(self,origin): return True # 开启链接 def open(self): users.append(self) # 接收消息 def on_message(self,message): self.write_message(message['data']) # 断开 def on_close(self): users.remove(self) # 基于redis监听发布者发布消息
def redis_listener(loop): asyncio.set_event_loop(loop) async def listen(): r = redis.Redis(decode_responses=True) # 声明pubsb实例 ps = r.pubsub() # 订阅聊天室频道 ps.subscribe(["channel_1","channel_2"]) # 监听消息 for message in ps.listen(): print(message) # 遍历链接上的用户 for user in users: print(user) if message["type"] == "message" and message["channel"] == user.get_cookie("channel"): user.write_message(message["data"]) future = asyncio.gather(listen()) loop.run_until_complete(future) # 接口 发布信息
class Msg(tornado.web.RequestHandler): # 重写父类方法 def set_default_headers(self): # 设置请求头信息 print("开始设置") # 域名信息 self.set_header("Access-Control-Allow-Origin","*") # 请求信息 self.set_header("Access-Control-Allow-Headers","x-requested-with") # 请求方式 self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE") # 发布信息 async def post(self): data = self.get_argument("data",None) channel = self.get_argument("channel","channel_1") print(data) # 发布 r = redis.Redis() r.publish(channel,data) return self.write("ok") # 建立torando实例 app = tornado.web.Application( [ (r'/send/',Msg), (r'/wb/',WB) ],debug=True ) if __name__ == '__main__': loop = asyncio.new_event_loop() # 单线程启动订阅者服务 threading.Thread(target=redis_listener,args=(loop,)).start() # 声明服务器 http_server_1 = tornado.httpserver.HTTPServer(app) # 监听端口 http_server_1.listen(8000) # 开启事件循环 tornado.ioloop.IOLoop.instance().start()
这里假设默认有两个频道,逻辑是这样的:由前端控制websocket链接用户选择将消息发布到那个频道上,同时每个用户通过前端cookie的设置具备频道属性,当具备频道属性的用户对该频道发布了一条消息之后,所有其他具备该频道属性的用户通过redis进行订阅后主动推送刚刚发布的消息,而频道的推送只匹配订阅该频道的用户,达到消息隔离的目的。
需要注意的一点是,通过线程启动redis订阅服务时,需要将当前的loop实例传递给协程对象,否则在订阅方法内将会获取不到websocket实例,报这个错误:
IOLoop.current() doesn't work in non-main
这是因为Tornado底层基于事件循环ioloop,而同步框架模式的Django或者Flask则没有这个问题。
下面编写前端代码,这里我们使用时下最流行的vue3.0框架,编写chat.vue:
<template> <div> <h1>聊天窗口</h1> <van-tabs v-model:active="active" @click="change_channel"> <van-tab title="客服1号"> <table> <tr v-for="item,index in msglist" :key="index"> {{ item }} </tr> </table> </van-tab> <van-tab title="客服2号"> <table> <tr v-for="item,index in msglist" :key="index"> {{ item }} </tr> </table> </van-tab> </van-tabs> <van-field label="聊天信息" v-model="msg" /> <van-button color="gray" @click="commit">发送</van-button> </div>
</template> <script> export default { data() { return { auditlist:[], //聊天记录 msglist:[], msg:"", websock: null, //建立的连接 lockReconnect: false, //是否真正建立连接 timeout: 3 * 1000, //30秒一次心跳 timeoutObj: null, //外层心跳倒计时 serverTimeoutObj: null, //内层心跳检测 timeoutnum: null, //断开 重连倒计时 active:0, channel:"channel_1" } }, methods:{ //切换频道 change_channel:function(){ if(this.active === 0){ this.channel = "channel_1"; var name = "channel"; var value = "channel_1"; }else{ this.channel = "channel_2"; var name = "channel"; var value = "channel_2"; } //清空聊天记录 this.msglist = []; var d = new Date(); d.setTime(d.getTime() + (24 * 60 * 60 * 1000)); var expires = "expires=" + d.toGMTString(); document.cookie = name + "=" + value + "; " + expires; this.reconnect(); }, initWebSocket() { //初始化weosocket const wsuri = "ws://localhost:8000/wb/"; this.websock = new WebSocket(wsuri); this.websock.onopen = this.websocketonopen; this.websock.onmessage = this.websocketonmessage; this.websock.onerror = this.websocketonerror; this.websock.onclose = this.websocketclose; }, reconnect() { //重新连接 var that = this; if (that.lockReconnect) { // 是否真正建立连接 return; } that.lockReconnect = true; //没连接上会一直重连,设置延迟避免请求过多 that.timeoutnum && clearTimeout(that.timeoutnum); // 如果到了这里断开重连的倒计时还有值的话就清除掉 that.timeoutnum = setTimeout(function() { //然后新连接 that.initWebSocket(); that.lockReconnect = false; }, 5000); }, reset() { //重置心跳 var that = this; //清除时间(清除内外两个心跳计时) clearTimeout(that.timeoutObj); clearTimeout(that.serverTimeoutObj); //重启心跳 that.start(); }, start() { //开启心跳 var self = this; self.timeoutObj && clearTimeout(self.timeoutObj); // 如果外层心跳倒计时存在的话,清除掉 self.serverTimeoutObj && clearTimeout(self.serverTimeoutObj); // 如果内层心跳检测倒计时存在的话,清除掉 self.timeoutObj = setTimeout(function() { // 重新赋值重新发送 进行心跳检测 //这里发送一个心跳,后端收到后,返回一个心跳消息, if (self.websock.readyState == 1) { //如果连接正常 // self.websock.send("heartCheck"); } else { //否则重连 self.reconnect(); } self.serverTimeoutObj = setTimeout(function() { // 在三秒一次的心跳检测中如果某个值3秒没响应就关掉这次连接 //超时关闭 // self.websock.close(); }, self.timeout); }, self.timeout); // 3s一次 }, websocketonopen(e) { //连接建立之后执行send方法发送数据 console.log("成功"); // this.websock.send("123"); // this.websocketsend(JSON.stringify(actions)); }, websocketonerror() { //连接建立失败重连 console.log("失败"); this.initWebSocket(); }, websocketonmessage(e) { console.log(e); //数据接收 //const redata = JSON.parse(e.data); const redata = e.data; //累加 this.msglist.push(redata); console.log(redata); }, websocketsend(Data) { //数据发送 this.websock.send(Data); }, websocketclose(e) { //关闭 this.reconnect() console.log("断开连接", e); }, //提交表单 commit:function(){ //发送请求 this.myaxios("http://localhost:8000/send/","post",{"data":this.msg,channel:this.channel}).then(data =>{ console.log(data); }); }, }, mounted(){ //连接后端websocket服务 this.initWebSocket(); var d = new Date(); d.setTime(d.getTime() + (24 * 60 * 60 * 1000)); var expires = "expires=" + d.toGMTString(); document.cookie = "channel" + "=" + "channel_1" + "; " + expires; } }
</script> <style scoped> @import url("../assets/style.css"); .chatbox{ color:black; } .mymsg{ background-color:green; } </style>
这里前端在线客户端定期向状态服务器发送心跳事件。如果服务端在特定时间内(例如x秒)从客户端接收到心跳事件,则认为用户处于联机状态。否则,它将处于脱机状态,脱机后在阈值时间内可以进行重新连接的动作。同时利用vant框架的标签页可以同步切换频道,切换后将频道标识写入cookie,便于后端服务识别后匹配推送。
效果是这样的:
诚然,功能业已实现,但是如果我们处在一个高并发场景之下呢?试想一下如果一个频道有10万人同时在线,每秒有100条新消息,那么后台tornado的websocket服务推送频率是100w*10/s = 1000w/s 。
这样的系统架构如果不做负载均衡的话,很难抗住压力,那么瓶颈在哪里呢?没错,就是数据库redis,这里我们需要异步redis库aioredis的帮助:
pip3 install aioredis
aioredis通过协程异步操作redis读写,避免了io阻塞问题,使消息的发布和订阅操作非阻塞。
此时,可以新建一个异步订阅服务文件main_with_aioredis.py:
import asyncio
import aioredis
from tornado import web, websocket
from tornado.ioloop import IOLoop
import tornado.httpserver
import async_timeout
之后主要的修改逻辑是,通过aioredis异步建立redis链接,并且异步订阅多个频道,随后通过原生协程的asyncio.create_task方法(也可以使用asyncio.ensure_future)注册订阅消费的异步任务reader:
async def setup(): r = await aioredis.from_url("redis://localhost", decode_responses=True) pubsub = r.pubsub() print(pubsub) await pubsub.subscribe("channel_1","channel_2") #asyncio.ensure_future(reader(pubsub)) asyncio.create_task(reader(pubsub))
在订阅消费方法中,异步监听所订阅频道中的发布信息,同时和之前的同步方法一样,比对用户的频道属性并且进行按频道推送:
async def reader(channel: aioredis.client.PubSub): while True: try: async with async_timeout.timeout(1): message = await channel.get_message(ignore_subscribe_messages=True) if message is not None: print(f"(Reader) Message Received: {message}") for user in users: if user.get_cookie("channel") == message["channel"]: user.write_message(message["data"]) await asyncio.sleep(0.01) except asyncio.TimeoutError: pass
最后,利用tornado事件循环IOLoop传递中执行回调方法,将setup方法加入到事件回调中:
if __name__ == '__main__': # 监听端口 application.listen(8000) loop = IOLoop.current() loop.add_callback(setup) loop.start()
完整的异步消息发布、订阅、推送服务改造 main_aioredis.py:
import asyncio
import aioredis
from tornado import web, websocket
from tornado.ioloop import IOLoop
import tornado.httpserver
import async_timeout users = [] # websocket协议
class WB(tornado.websocket.WebSocketHandler): # 跨域支持 def check_origin(self,origin): return True # 开启链接 def open(self): users.append(self) # 接收消息 def on_message(self,message): self.write_message(message['data']) # 断开 def on_close(self): users.remove(self) class Msg(web.RequestHandler): # 重写父类方法 def set_default_headers(self): # 设置请求头信息 print("开始设置") # 域名信息 self.set_header("Access-Control-Allow-Origin","*") # 请求信息 self.set_header("Access-Control-Allow-Headers","x-requested-with") # 请求方式 self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE") # 发布信息 async def post(self): data = self.get_argument("data",None) channel = self.get_argument("channel","channel_1") print(data) # 发布 r = await aioredis.from_url("redis://localhost", decode_responses=True) await r.publish(channel,data) return self.write("ok") async def reader(channel: aioredis.client.PubSub): while True: try: async with async_timeout.timeout(1): message = await channel.get_message(ignore_subscribe_messages=True) if message is not None: print(f"(Reader) Message Received: {message}") for user in users: if user.get_cookie("channel") == message["channel"]: user.write_message(message["data"]) await asyncio.sleep(0.01) except asyncio.TimeoutError: pass async def setup(): r = await aioredis.from_url("redis://localhost", decode_responses=True) pubsub = r.pubsub() print(pubsub) await pubsub.subscribe("channel_1","channel_2") #asyncio.ensure_future(reader(pubsub)) asyncio.create_task(reader(pubsub)) application = web.Application([ (r'/send/',Msg), (r'/wb/', WB),
],debug=True) if __name__ == '__main__': # 监听端口 application.listen(8000) loop = IOLoop.current() loop.add_callback(setup) loop.start()
从程序设计角度上讲,充分利用了协程的异步执行思想,更加地丝滑流畅。
结语:实践操作来看,Redis发布订阅模式,非常契合这种实时(websocket)通信聊天系统的场景,但是发布的消息如果没有对应的频道或者消费者,消息则会被丢弃,假如我们在生产环境在消费的时候,突然断网,导致其中一个订阅者挂掉了一段时间,那么当它重新连接上的时候,中间这一段时间产生的消息也将不会存在,所以如果想要保证系统的健壮性,还需要其他服务来设计高可用的实时存储方案,不过那就是另外一个故事了,最后奉上项目地址,与众乡亲同飨:https://github.com/zcxey2911/tornado_redis_vue3_chatroom
原文转载自「刘悦的技术博客」 https://v3u.cn/a_id_202
把酒言欢话聊天,基于Vue3.0+Tornado6.1+Redis发布订阅(pubsub)模式打造异步非阻塞(aioredis)实时(websocket)通信聊天系统相关推荐
- 基于vue3.0简单的页面使用
基于vue3.0简单的页面使用 项目效果图 项目文件图 package.json main.js App.vue views/Tutorial.vue views/TS.vue views/Docs. ...
- 【Vue3 造轮子项目 ------ kaite-ui】基于vue3.0 + vite + TypeScript 实现一个UI框架 - kaiteUI
基于vue3.0 + vite + TypeScript 实现一个UI框架 - kaiteUI 前言 前段时间笔者一直忙于学习Vue3方面新知识,比如如何从vue2.0版本过渡到vue3.0,如何理解 ...
- 基于MFC的socket编程(异步非阻塞通信)
对于许多初学者来说,网络通信程序的开发,普遍的一个现象就是觉得难以入手.许多概念,诸如:同步(Sync)/异步(Async),阻塞(Block)/非阻塞(Unblock)等,初学者往往迷惑不清,只知其 ...
- IMI 基于 Swoole 开发的协程 PHP 开发框架 常驻内存、协程异步非阻塞
介绍 IMI 是基于 Swoole 开发的协程 PHP 开发框架,拥有常驻内存.协程异步非阻塞IO等优点. IMI 框架文档丰富,上手容易,致力于让开发者跟使用传统 MVC 框架一样顺手. IMI 框 ...
- 本地使用 Docker Compose 与 Nestjs 快速构建基于 Dapr 的 Redis 发布/订阅分布式应用...
Dapr(分布式应用程序运行时)介绍 Dapr 是一个可移植的.事件驱动的运行时,它使任何开发人员能够轻松构建出弹性的.无状态和有状态的应用程序,并可运行在云平台或边缘计算中,它同时也支持多种编程语言 ...
- google的api key调用次数是多少_Sprint Boot如何基于Redis发布订阅实现异步消息系统的同步调用?...
前言 在很多互联网应用系统中,请求处理异步化是提升系统性能一种常用的手段,而基于消息系统的异步处理由于具备高可靠性.高吞吐量的特点,因而在并发请求量比较高的互联网系统中被广泛应用.与此同时,这种方案也 ...
- mars3d基于vue3.0的widget使用
mars3d在vue3.0生态上开发了两个gis相关开源项目 mars3d-vue-example 和 mars3d-vue-project,在这两个项目中widget都是非常重要的一个模块.通过wi ...
- 基于Vue3.0+ElementPlus后台精简模板
前言 欢迎使用VUE3.0 + ElementPlus 后台管理模板 UI库文档: https://element-plus.gitee.io/#/zh-CN 该项目基于Vuecli 使用Vue3 + ...
- 基于Vue3.0+Springboot在线购物商城网站设计
开发技术环境: Idea + Vscode + Mysql + Springboot + vue3.0 基于vue的购物商城网站分为前台功能和后台管理功能,前台功能主要包括基础功能模块.订单管理模块. ...
- 奥拉星插件flash下载手机版下载安装_终于等到你!安卓微信7.0.13内测版发布 支持夜间模式 附下载地址!...
3月22日,iOS版微信迎来了7.0.12正式版更新,最大的亮点在于为iOS13设备加入了"深色模式"功能,虽然没有独立的控制开关,但可以跟随系统开启或关闭夜间模式.此外,iOS版 ...
最新文章
- Linux下matlab断点调试
- SQL SERVER 存储过程执行带输出参数的SQL语句拼接
- sysbench测试mysql性能(TPS、QPS、IOPS)(重要)
- TCollector
- HackBrowserData 一键导出 浏览器保存的登录密码、历史记录、Cookies、书签
- 4G换5G关口,智能手机如何抢回“失去的一个月”
- html中取消li的点击事件,jquery设置html li点击click事件为什么无法赋值到表单input value中呢?...
- android 歌词的显示不出来,网易云音乐歌词不显示怎么办 网易云显示不出歌词的解决方法...
- php获取银行logo,依据银行卡号获取银行信息php代码
- 人脸识别算法一:特征脸方法(Eigenface)
- 纯css绘制简易对话气泡
- js 中日期 转换成时间戳 例如2020-12-19 转换为时间戳
- php当月1号怎么获取,php获取下月1号和月底最后一天的时间
- 三极管驱动和MOS管驱动的区别
- grabcut图像分割的原理简单介绍
- 2020-12-15:【黑盒测试用例设计】测试方法之边界值分析法
- 【必看】分析各大招聘网站
- app store服务器网站,app store 游戏服务器
- Visual Studio Code插件整理大全
- 鸿蒙系统控制LED的实现方法之经典