python项目开发:用RabbitMQ实现异步RPC
程序要求:
1. 用Rabbit MQ实现RPC
1. 可以异步地执行多条命令
2. 可以对一次性对多个机器执行命令
程序效果:
---》run dir host1 host2 。。。。
---》get task_id
---》taskId:xxxx host: xxxxxx
---》check task_id
--->打印结果
程序分析:
为了达到异步地效果,可以使用多线程或协程,即每执行一条命令就启动一条线程或协程。客户端发送命令到队列、从返回队列接收结果分离,不能写到一起。
业务逻辑:
代码实现:
README
![](/assets/blank.gif)
![](/assets/blank.gif)
#author:Wu zhiHao#博客地址:https://www.cnblogs.com/BUPT-MrWu/p/10364619.html#程序目录框架:|--RPC|--RPC_server #服务端|--bin|--start.py #程序入口|--core|--RpcServer.py #服务端主要逻辑|--RPC_client #客户端|--bin|--start.py #程序入口|--core|--main.py #程序主要逻辑|--modules|--RpcClient.py #客户端主要逻辑|--conf|--settings.py #配置文件|--READ_ME#命令格式:1. run command host1 host2..... #执行命令2. all_task #获取全部task_id3. check task_id #获取命令结果
View Code
RPC_server\\bin\\start.py
![](/assets/blank.gif)
![](/assets/blank.gif)
import sys,os BASE_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) sys.path.append(BASE_dir) from core import RpcServer if __name__ == '__main__':obj = RpcServer.RpcServer()obj.channel.start_consuming()
View Code
RPC_server\\core\\RpcServer.py
![](/assets/blank.gif)
![](/assets/blank.gif)
import pika import os import socket from conf import settings class RpcServer(object):def __init__(self):self.credentials = pika.PlainCredentials(settings.RabbitMq_name,settings.RabbitMq_password) #RabbiMQ用户认证self.connection = pika.BlockingConnection(pika.ConnectionParameters(settings.RabbitMq_ip,settings.RabbitMq_port,"/",self.credentials,))self.My_Ip = self.get_ip() #获取服务端IP地址self.channel = self.connection.channel()self.result = self.channel.queue_declare(exclusive=True)self.queue_name = self.result.method.queueself.channel.exchange_declare(exchange="Rpc",exchange_type="direct",)self.channel.queue_bind(exchange="Rpc",queue=self.queue_name,routing_key=self.My_Ip,)self.channel.basic_consume(self.on_response,queue=self.queue_name,)def on_response(self,ch,method,properties,body):command = body.decode()command_result = self.on_request(command)self.channel.basic_publish(exchange="",routing_key=properties.reply_to,properties=pika.BasicProperties(correlation_id=properties.correlation_id,),body=command_result)def on_request(self,command):return os.popen(command).read()def get_ip(self):computer_name = socket.getfqdn(socket.gethostname( ))computer_Ip = socket.gethostbyname(computer_name)return computer_Ip
View Code
RPC_client\\bin\\start.py
![](/assets/blank.gif)
![](/assets/blank.gif)
import sys,osBASE_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) sys.path.append(BASE_dir) from core import main if __name__ == '__main__':obj = main.run()obj.start()
View Code
RPC_client\\core\\main.py
![](/assets/blank.gif)
![](/assets/blank.gif)
import random import threading from modules import RpcClientclass run(object):def __init__(self):self.client = RpcClient.RpcClient()self.information = {}def start(self):while True:try:command = input("-->")if not command:continuet = threading.Thread(target=self.select,args=(command,))t.start()except Exception as e:print(e)def select(self,command):'''解析命令'''try:keyword = command.split()[0]func = getattr(self,keyword)func(command)except Exception as e:print(e)def run(self,command):'''执行命令'''try:task_id = str(random.randint(100,1000))self.information[task_id] = {}keyword = command.split()[1]for host in command.split()[2:]:result = self.client.on_request(host,keyword)self.information[task_id][host] = [result[0],result[1]]except Exception as e:print(e)def check(self,command):'''获取命令结果'''try:task_id = command.split()[1]for host in self.information[task_id]:corr_id = self.information[task_id][host][0]callback_queue = self.information[task_id][host][1]command_result = self.client.get_response(corr_id,callback_queue)print("%s:\n%s"%(host,command_result))self.information.pop(task_id) #删除task_idexcept Exception as e:print(e)def all_task(self,command):'''获取全部task_id'''try:for task_id in self.information:all_host = []for host in self.information[task_id]:all_host.append(host)print("task_id: %s host: %s\n"%(task_id,all_host))except Exception as e:print(e)
View Code
RPC_client\\conf\\settings.py
![](/assets/blank.gif)
![](/assets/blank.gif)
RabbitMq_name = "XXX" #RabbitMq用户名 RabbitMq_password = "XXX" #rabbitmq用户密码 RabbitMq_ip = "XXX" #RabbitMq端的IP地址 RabbitMq_port = 5672 #RabbitMq端的端口号
View Code
RPC_client\\mudules\\RpcClient.py
![](/assets/blank.gif)
![](/assets/blank.gif)
import pika import uuid from conf import settings class RpcClient(object):def __init__(self):self.credentials = pika.PlainCredentials(settings.RabbitMq_name,settings.RabbitMq_password) #RabbiMQ用户认证self.connection = pika.BlockingConnection(pika.ConnectionParameters(settings.RabbitMq_ip,settings.RabbitMq_port,"/",self.credentials,))self.channel = self.connection.channel()def get_response(self,corr_id,callback_queue):'''从队列里取值'''self.corr_id = corr_idself.response = Noneself.channel.basic_consume(self.on_response,queue=callback_queue,)while self.response is None:self.connection.process_data_events() #非阻塞版的start_consumingreturn self.responsedef on_response(self,ch,method,properties,body):'''当队列里有数据时执行'''if self.corr_id == properties.correlation_id:self.response = body.decode()def on_request(self,host,command):'''发送命令'''result = self.channel.queue_declare(exclusive=False) #生成另一个queue时,这个queue不会消失callback_queue = result.method.queue #返回queuecorr_id = str(uuid.uuid4()) #验证码 self.channel.exchange_declare(exchange="Rpc",exchange_type="direct")self.channel.basic_publish(exchange="Rpc",routing_key=host,properties=pika.BasicProperties(correlation_id=corr_id,reply_to=callback_queue,),body=command,)return corr_id,callback_queue #返回验证值和返回queue
View Code
程序执行实例:
转载于:https://www.cnblogs.com/BUPT-MrWu/p/10364619.html
python项目开发:用RabbitMQ实现异步RPC相关推荐
- python项目开发实例书-Python项目开发实战
本书案例具有实用性,如校园网搜索引擎.小小翻译器.抓取百度图片这些爬虫案例略加修改可以应用实际项目中:还有通过微信通信协议开发微信机器人,机器学习的文本分类.基于卷积神经网络的手写体识别等案例,另外是 ...
- python项目开发实战网盘-python项目开发实战 第2版
python项目开发实战 第2版是一本Python项目开发流程实战手册,由日本BePROUD股份有限公司编著.本书的内容全部基于python开发事实,全部都是BeProud员工实际尝试.实践过的,可以 ...
- python项目开发实战网盘-《Python项目案例开发从入门到实战》PDF版百度网盘
「教程分享:Python项目开发从入门到实列」 本书例子具有实用性,20个不同类型的完整列子,600分钟高品质配套教学视频,完整的源码和教学课件,让你对枯燥的Python语言学习充满乐趣. 编辑推荐 ...
- python项目开发案例-Python项目开发案例集锦 PDF 全彩超清版
给大家带来的一篇关于Python案例相关的电子书资源,介绍了关于Python.项目开发.Python案例方面的内容,本书是由吉林大学出版社出版,格式为PDF,资源大小99.1 MB,明日科技编写,目前 ...
- python项目开发实战-给缺少Python项目实战经验的人
我们在学习过程中最容易犯的一个错误就是:看的多动手的少,特别是对于一些项目的开发学习就更少了! 没有一个完整的项目开发过程,是不会对整个开发流程以及理论知识有牢固的认知的,对于怎样将所学的理论知识应用 ...
- python项目开发案例集锦_在线分享 | 在 VS Code 中一站式完成 Python 项目开发
往期活动回顾 VS Code 中文社区自成立以来,已经举办了4场活动: Workshop | First Step to VS Code 基础篇 Workshop | First Step to VS ...
- python项目开发视频
精品Python项目开发学习视频 所属网站分类: 资源下载 > python视频教程 作者:乐天派 链接:http://www.pythonheidong.com/blog/article/44 ...
- python项目开发实例-《Python项目案例开发从入门到实战》PDF版百度网盘
「教程分享:Python项目开发从入门到实列」 本书例子具有实用性,20个不同类型的完整列子,600分钟高品质配套教学视频,完整的源码和教学课件,让你对枯燥的Python语言学习充满乐趣. 编辑推荐 ...
- python实战一个完整的项目-Python项目开发实战(第2版)高清晰PDF完整版+代码
会写代码≠能做好项目! 1.建立有序生产环境 2.迅速融入开发团队 3.高效处理项目问题 网罗Python项目开发中的流程,让你的编程事半功倍 Python项目与封装/团队开发环境/问题驱动开发/源码 ...
- python项目开发实例-有趣的十个Python实战项目,让你瞬间爱上Python!
前言 Python 是一种极具可读性和通用性的编程语言.Python 这个名字的灵感来自于英国喜剧团体 Monty Python,它的开发团队有一个重要的基础目标,就是使语言使用起来很有趣.Pytho ...
最新文章
- Science子刊:植物所杨元合组揭示矿物保护和微生物属性对冻土碳动态的关键调控作用...
- Spark学习之简介
- 2020.3.10.遗留问题
- C++中模板使用详解
- 浪潮云海OS再度登顶 SPEC Cloud性能得分「全球第一」
- java提升权限运行_提升代码的运行权限,实现模拟管理员身份的功能
- ffmpeg音频播放代码示例-avcodec_decode_audio4
- java JDK 自带的 native2ascii 和它的 reverse 命令
- ubuntu服务器系统不识别,U盘安装16.04server版 安装好后重启 无法进入系统
- 获取ACCESS_TOKEN接口
- java调用linux系统命令_java 调用linux系统命令
- 电脑连接不上wifi,怎么办?
- 米勒拉宾算法——素性测试
- page_to_phys()和virt_to_phys()
- 基于matlab的陷波滤波器设计
- END-TO-END DNN BASED SPEAKER RECOGNITION INSPIRED BY I-VECTOR AND PLDA
- Coherent Reconstruction of Multiple Humans from a Single Image运行代码
- 5款不妨一试的硬盘碎片整理工具
- 连接mysql的url是什么_JDBC的URL是什么
- excel中提取箱单中的箱数
热门文章
- 数字游戏---巧妙解答
- git21天打卡day15-添加提交修改文件
- git 21天打卡day10-创建昵称分支并切换
- 打算年后跳槽的注意了... 这个岗位,人才缺口30万 薪资水涨船高
- 看到这个数据库设计,我终于明白了我和其他软测人的差距
- 程序老鸟:软件测试的工资高还是开发者工资高?
- mysql索引统计信息更新_MySQL索引统计信息更新相关的参数
- 记录最近待学习的内容
- 顶点 java笔试_网易2014校园招聘杭州Java笔试题--题解--第一天
- vue、cnpm不是内部文件_选购办公室钢制文件柜花都文件柜厂家为我们全面解读这一块的知识...