玩转locust
1.当实现分布式压测时,可在init回调中设置中止测试的情况,也可在init中实现注册master端的消息和worker的消息,可用

environment.runner.send_message(注册的消息类型, 发送的数据)进行数据传输,实现master和worker进行传输信息

2.

'''''''''
#创建集合点,当locust实例产生完成时触发
'''
all_locusts_spawned = Semaphore()
#上锁
all_locusts_spawned.acquire()

上面步骤进上锁,当我们在test_start事件回调上进行释放锁时,可实现在开始测试前等待所有用户加载完成在执行locust的task

3.LoadTestShape,不可与设置用户数和设置增长用户速率一起使用,会覆盖!

这个类在locust启动的时候,会检测,有这个类,locust会一直根据这个类下面的

tick函数返回的元组(int,folat),来进行用户的速率变化,比如你想实现在某段时间内增加用户数量,某段时间内减少用户数量,只要控制tick的返回就行了(用户总数,速率/个)
from locust.runners import MasterRunner, WorkerRunner,STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP,LocalRunner
from locust import HttpUser, TaskSet, task, events, LoadTestShape
from gevent._semaphore import Semaphore
import json
import traceback
import gevent
import time'''请求成功时触发'''
def on_request_success(request_type, name, response_time, response_length):print( 'success  Type: {}, Name: {}, Time: {}ms, length: {}'.format(request_type, name, response_time,response_length))'''请求失败时触发'''
def on_request_failure(request_type, name, response_time, response_length,exception):print('failure  Type: {}, Name: {}, Time: {}ms, exception: {}, response_length:{}'.format(request_type, name, response_time,exception,response_length))'''在执行locust类内发生异常时触发'''
def on_locust_error(locust_instance, exception, tb):print("error  locust_instance: {}, exception: {}, traceback: {}".format(locust_instance, exception,traceback.format_tb(tb)))#退出进程时回调
def on_quitting(environment,**kwargs):print("Test quit")'''
#停止测试的时候客户端会调这个方法发送数据到主节点这边
'''
def on_test_stop(environment,**kwargs):if not isinstance(environment.runner, MasterRunner):print("Cleaning up test data")#节点往主节点发送的数据environment.runner.send_message('acknowledge_users', f"Thanks for the Cleaning up test data users!")else:users = [{"name": "User1"},{"name": "User2"},{"name": "User3"},]environment.runner.send_message('test_users', users)'''
#定义worker节点注册的消息
# Fired when the worker receives a message of type 'test_users'
'''
def setup_test_users(environment, msg, **kwargs):for user in msg.data:print(f"User {user['name']} received")environment.runner.send_message('acknowledge_users', f"Thanks for the {len(msg.data)} users!")'''
#定义matser节点注册的消息
#Fired when the master receives a message of type 'acknowledge_users!!!!
'''
def on_acknowledge(msg, **kwargs):print("recv worker data :{}".format(msg.data))'''
#主动退出进程时:environment.process_exit_code = 0
#判断错误率大于多少N主动退出进程
#判断响应时间大于多少N主动退出进程
#判断响应时间
'''
def checker(environment):while not environment.runner.state in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:time.sleep(1)if environment.stats.total.fail_ratio > 0.01:print("Test failed due to failure ratio > 1%,code=1")environment.process_exit_code = 1'''这个语句是退出'''environment.runner.quit()elif environment.stats.total.avg_response_time > 200:print("Test failed due to average response time ratio > 200 ms,code=1")environment.process_exit_code = 1elif environment.stats.total.get_response_time_percentile(0.95) > 300:print("Test failed due to 95th percentile response time > 800 ms,code=1")environment.process_exit_code = 1else:environment.process_exit_code = 0print("Test Normal task exit code=0")
'''
#初始化时绑定的重写方法,该类中进行了worler和master节点的消息注册
'''
def on_locust_init(environment, **kwargs):if not isinstance(environment.runner, MasterRunner):'''#初始化的时候注册消息,客户端的消息,往客户端的往这个类型发就行'''environment.runner.register_message('test_users', setup_test_users)if not isinstance(environment.runner, WorkerRunner):'''#初始化的时候注册消息,服务端的消息,往后服务端往这个类型发就行'''environment.runner.register_message('acknowledge_users', on_acknowledge)if isinstance(environment.runner, MasterRunner) or isinstance(environment.runner, LocalRunner):gevent.spawn(checker, environment)def on_test_start(environment, **kwargs):'''如果运行环境不是主节点'''if not isinstance(environment.runner, MasterRunner):users = [{"name": "User1"},{"name": "User2"},{"name": "User3"},]environment.runner.send_message('test_users', users)'''''''''
#创建集合点,当locust实例产生完成时触发
'''
all_locusts_spawned = Semaphore()
#上锁
all_locusts_spawned.acquire()
'''
#生成所有locust用户时触发
'''
def on_hatch_complete(**kwargs):#释放锁all_locusts_spawned.release()'''
#事件回调绑定
'''
events.spawning_complete.add_listener(on_hatch_complete)
events.request_success.add_listener(on_request_success)
events.request_failure.add_listener(on_request_failure)
events.user_error.add_listener(on_locust_error)
events.quitting.add_listener(on_quitting)
events.init.add_listener(on_locust_init)'''
#主节点才触发
'''
events.test_start.add_listener(on_test_start)
events.test_stop.add_listener(on_test_stop)header={"Content-Type": "application/json;charset=UTF-8"}
class WebsiteTasks(TaskSet):'''#进行初始化的工作,每个Locust用户开始做的第一件事'''def on_start(self):payload = {"username": "13592945579","password": "5632721",}# self.client属性使用Python request库的所有方法,调用和使用方法和requests完全一致;with self.client.post("/user/signin",data=json.dumps(payload),catch_response=True,headers=header) as response:if response.status_code==200:response.success()else:response.failure("on_start请求失败!")'''#通过@task()装饰的方法为一个事务,方法的参数用于指定该行为的执行权重,参数越大每次被虚拟用户执行的概率越高,默认为1'''@task(1)def index(self):payload = {"username": "13592945579","password": "5632721",}# self.client.request_name = "/blog?id=[id]" #自定义locust报告中Statistics的Name名称,进行分组显示with self.client.post("/user/signin",data=json.dumps(payload),catch_response=True,headers=header) as respone:#self.client属性使用Python request库的所有方法,调用和使用方法和requests完全一致;if respone.status_code == 200:respone.success()else:respone.failure("index请求失败!")@task(2)def test01(self):payload = {"username": "13592945579","password": "5632721",}with self.client.post("/user/signin",data=json.dumps(payload),catch_response=True,headers=header) as respone:#self.client属性使用Python request库的所有方法,调用和使用方法和requests完全一致;try:if respone.status_code == 200:respone.catch_response = Trueelse:respone.failure("test01请求失败!")except json.JSONDecodeError:respone.failure("Response could not be decoded as JSON!")except KeyError:  # 字典类型的数据,操作的变量没有该键时会报这个错respone.failure("Response did not contain expected key 'greeting'")# @task(1)# def about(self):#     self.client.get("/about/")class WebsiteUser(HttpUser):# host     = "172.168.8.113:5000" #被测系统的host,在终端中启动locust时没有指定--host参数时才会用到host = "http://127.0.0.1:5000"tasks = [WebsiteTasks]          #TaskSet类,该类定义用户任务信息,必填。这里就是:WebsiteTasks类名,因为该类继承TaskSet;min_wait = 5000  #每个用户执行两个任务间隔时间的上下限(毫秒),具体数值在上下限中随机取值,若不指定默认间隔时间固定为1秒max_wait = 15000# def stop(self, force=False):#     self.stop(True)'''
#定义长期跑的时候,用户增长量和下滑量和停止时间,这个模式会覆盖config配置的模式
'''
# class MyCustomShape(LoadTestShape):
#     time_limit = 10
#     time_avg=70
#     spawn_rate = 1
#     user_count=0
#     count_avg=20
#     end_time=90
#
#     def tick(self):
#         run_time = self.get_run_time()
#         '''
#         #如果运行时间小于设定的时间就以最大用户数下滑
#         '''
#
#         if run_time < self.time_limit and run_time <self.time_avg:
#
#             self.user_count=self.user_count+self.spawn_rate
#             if self.user_count<self.count_avg:
#                 print("run_time< self.time_limit:",self.user_count,"run_time:{}".format(run_time))
#                 return (self.user_count,self.spawn_rate)
#             elif self.user_count>self.count_avg:
#                 return (self.count_avg,self.spawn_rate)
#
#         elif run_time > self.time_limit and run_time<self.time_avg:
#             '''
#             #如果运行时间大于设定的时间就以最大用户数下滑
#             '''
#             if self.user_count<self.count_avg:
#                 self.user_count = self.user_count+self.spawn_rate
#                 return (self.user_count, (self.spawn_rate))
#             elif self.user_count>self.count_avg and run_time<self.time_avg:
#                 # self.user_count=self.user_count-self.spawn_rate
#                 print("run_time > self.time_limit:",self.user_count,"run_time:{}".format(run_time))
#                 return (self.count_avg, (self.spawn_rate))
#             elif self.user_count>=self.count_avg and run_time>self.time_avg:
#                 # self.user_count=self.user_count-self.spawn_rate
#                 return (self.count_avg, -(self.spawn_rate))
#         elif run_time>=self.time_avg and run_time<=self.end_time:
#             print("run_time = self.time_limit:", self.user_count, "run_time:{}".format(run_time))
#             return (self.count_avg, (self.spawn_rate))
#
#         elif run_time>self.end_time:
#             self.user_count = self.user_count - self.spawn_rate
#             return (self.count_avg, -(self.spawn_rate))if __name__ == '__main__':import os# os.system("locust -f G:\\script_xlink\\locust_py\\script_WebsiteTasks.py  --host=http://127.0.0.1:5000")# os.system("locust -f G:\\script_xlink\\locust_py\\script_WebsiteTasks.py --master --host=http://172.168.8.113:5000")# os.system("locust -f G:\\script_xlink\\locust_py\\script_WebsiteTasks.py --master" )# os.system("locust -f G:\\script_xlink\\locust_py\\script_WebsiteTasks.py --worker")os.system("locust --config=G:\script_xlink\locust_py\config\config.conf")# os.system("locust -f G:\\script_xlink\\locust_py\\script_WebsiteTasks.py --master --expect-workers=1 --headless" )

config:

locust分布式压测相关推荐

  1. python locust 能压测数据库_深入浅出 Locust 实现

    写在前面 本文将尝试通过一篇文章讲清楚开源压测工具--Locust的原理和实现过程,聚焦在实现上,不拘泥在一堆源码中(本身Locust的源码就比较简单).本人并不是Locust铁粉,只是对它的实现方式 ...

  2. python locust 能压测数据库_python locust 性能测试:HOOKS钩子方法

    为locust中不同类型的事件,提供的钩子方法: from locust import TaskSet, task, events, Locust from locust.clients import ...

  3. 案例 | 荔枝微课基于 kubernetes 搭建分布式压测系统

    王诚强,荔枝微课基础架构负责人.热衷于基础技术研发推广,致力于提供稳定高效的基础架构,推进了荔枝微课集群化从0到1的发展,云原生架构持续演进的实践者. 本文根据2021年4月10日深圳站举办的[腾讯云 ...

  4. 接口测试学习——jmeter分布式压测

    分布式压测我理解的就是有一台主控机和几台压力机.主控机通过远程控制压力机启动测试,来实现系统不同级别访问量情况下的性能验证.操作步骤如下: 1.启动jmeter自动化工具,界面显示如下图所示. 2.在 ...

  5. jmeter分布式压测原理简介1

    1.什么叫分布式压测? 分布式压测:模拟多台机器向目标机器产生压力,模拟几万用户并发访问 2.分布式压测原理:如下 3.更多补充.....待添加 转载于:https://www.cnblogs.com ...

  6. 分布式压测系列之Jmeter4.0第一季

    1)Jmeter4.0介绍 jmeter是个纯java编写的开源压测工具,apache旗下的开源软件,一开始是设计为web测试的软件,由于发展迅猛,现在可以压测许多协议比如:http.https.so ...

  7. 分布式压测系列之Jmeter4.0

    1)Jmeter4.0介绍 jmeter是个纯java编写的开源压测工具,apache旗下的开源软件,一开始是设计为web测试的软件,由于发展迅猛,现在可以压测许多协议比如:http.https.so ...

  8. Jeecgboot Feign、分布式压测、分布式任务调度

    分布式压测 需求场景 一些关键接口需要压测到很高的QPS需要设置更多的线程去模拟虚拟用户去请求接口假如需要模拟2万个用户因为jemeter使用java语言开发每创建一个线程jvm默认会为每个线程分配1 ...

  9. java模拟数据库压测_Jeecgboot Feign、分布式压测、分布式任务调度

    分布式压测 需求场景 一些关键接口需要压测到很高的QPS 需要设置更多的线程去模拟虚拟用户去请求接口 假如需要模拟2万个用户 因为jemeter使用java语言开发 每创建一个线程 jvm默认会为每个 ...

最新文章

  1. mysql讀取sql_SQL 2008连接读取mysql数据的方法
  2. idea 报错javax/xml/bind/DatatypeConverter
  3. python程序员专用壁纸_代码没写完,哪里有脸睡觉!17 张程序员壁纸推荐
  4. aws mfa 认证_如何为您的AWS账户设置多因素身份验证(MFA)
  5. Gvim 字体大小设置和FencView插件安装
  6. 第十天:SwiftGoodAsOldPhones
  7. 循环的中断操作(C#基础回顾03)
  8. CoreData整理(二)——多线程方案
  9. np.random.RandomState、np.random.rand、np.random.random、np.random_sample
  10. java二手书交易系统_基于Java的二手图书交易系统后台设计与实现.doc
  11. android office转pdf插件,office word转pdf插件-Office自带Word转PDF插件下载__飞翔下载
  12. 单片微机计算机原理与接口技术高峰,单片微机原理与接口技术
  13. html 的title中显示网页logo
  14. python文本编辑器怎么运行_python程序编辑和运行的几种方法
  15. deepin升级到20.2,开机显示deepin LOGO之后屏幕黑屏解决办法
  16. 软著申请所需资料整理(软件著作权)
  17. 【LaTeX排版】西电硕\博士论文LaTeX模版<一>
  18. b500k电位器引脚接法_电位器引脚含义是什么
  19. HttpUrlConnection与HttpClient的认识(六) -实际应用之刷网络流量
  20. c语言程序设计:现代方法 勘误,C语言程序设计基础教程----勘误记录

热门文章

  1. 关于python课程深度学习
  2. 物联网传感器_基于传感器的物联网预测性维护,为什么必须对机器进行数字信号处理...
  3. 阿里云天池龙珠计划SQL训练营Task04:集合运算-表的加减法和join等
  4. 2012自学CCNP路由与交换之三网络设备造型及验收
  5. Jsp连接数据库大全(ZT)
  6. 哈希码相同,值一定相同?
  7. 2019智能家居博览会-今日优选展会
  8. PyTorch 17. GPU并发
  9. 编译Rockeylinux-8.5内核代码
  10. 支付宝对刷脸支付精心布局步步推进