Celery-分布式任务队列
一、介绍
官方文档:http://docs.celeryproject.org/en/latest/index.html
pip3 install celery
Celery是一个专注于实时处理和任务调度的分布式任务队列,通过它可以轻松的实现任务的异步处理。
使用Celery的常见场景:
- Web应用。当用户触发的一个操作需要较长时间才能执行完成时,可以把它作为任务交给Celery去异步执行,执行完再返回给用户。这段时间用户不需要等待,提高了网站的整体吞吐量和响应时间。
- 定时任务。生产环境经常会跑一些定时任务。假如你有上千台的服务器、上千种任务,定时任务的管理很困难,Celery可以帮助我们快速在不同的机器设定不同种任务。
- 同步完成的附加工作都可以异步完成。比如发送短信/邮件、推送消息、清理/设置缓存等。
Celery包含如下组件:
- Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
- Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
- Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。
- Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
- Result Backend:任务处理完后保存状态信息和结果,以供查询。
二、简单示例
创建一个tasks.py:
from celery import Celeryapp = Celery("tasks", broker="amqp://pd:123456@localhost:5672//",backend="redis://:123456@localhost:6379/0")@app.task def add(x, y):return x+y
启动Celery Worker来开始监听并执行任务:
celery -A tasks worker -l info
更多有关命令:
celery worker --help
再打开一个终端, 进行命令行模式,调用任务:
>>> from tasks import add >>> relt = add.delay(10, 10) >>> relt.ready() # 检查任务是否已经完成 True >>> relt.get() # 获取任务结果,可设置timeout超时 20 >>> relt <AsyncResult: 470d5f45-42eb-4b0c-bd38-06b85fa5599b> >>> relt.id '470d5f45-42eb-4b0c-bd38-06b85fa5599b' >>> relt.result 20 >>> relt.status 'SUCCESS'
![](/assets/blank.gif)
![](/assets/blank.gif)
from celery import Celery from celery.result import AsyncResultapp = Celery("tasks", broker="amqp://pd:123456@localhost:5672/pdvhost",backend="redis://:123456@localhost:6379/0")result = AsyncResult(id="470d5f45-42eb-4b0c-bd38-06b85fa5599b", app=app) print(result.get()) # 20
View Code
三、配置
官方文档,配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html#configuration
像上面简单示例中,要想添加配置,则可以直接在应用程序设置配置:
app.conf.task_serializer = "json"
如果您一次配置多个设置,则:
app.conf.update(task_serializer="json",accept_content=["json"],result_serializer="json",timezone="Europe/Oslo",enable_utc=True,
)
对于大型项目,建议使用专用配置模块。因为项目复杂,最好做到程序的解耦,所以将配置保存在集中位置是一个非常好的选择,一般默认 celeryconfig.py 模块是用来保存配置的,你也可以使用自己定义的名字,然后通过调用 app.config_from_object() 方法告诉 Celery 实例使用配置模块:
app.config_from_object("celeryconfig")
# 或者
from . import celeryconfig
app.config_from_object(celeryconfig)
四、在项目中使用Celery
项目布局:
方案选择:
- RabbitMQ作为消息代理。不选Redis是因为如果Redis发生意外,会造成数据丢失等后果。
- Msgpack做序列化。Msgpack是一个二进制的类json的序列化方案,它比json的数据结构更小,传输更快。
- Redis做结果存储。
pip3 install msgpack
![](/assets/blank.gif)
![](/assets/blank.gif)
########## celeryapp.py ########## from celery import Celery from . import celeryconfigapp = Celery("proj.celeryapp", include=["proj.tasks"]) app.config_from_object(celeryconfig)if __name__ == "__main__":app.start()########## tasks.py ########## from .celeryapp import app@app.task def add(x, y):return x+y@app.task def mul(x, y):return x*y########## celeryconfig.py ########## # 使用RabbitMQ作为消息代理 broker_url = "amqp://pd:123456@114.116.50.214:5672//" # # 把任务结果存在了Redis result_backend = "redis://:123456@114.116.50.214:6379/0" # 任务序列化和反序列化使用msgpack方案 task_serializer = "msgpack" # 读取任务结果一般性能要求不高,所以使用了可读性更好的json result_serializer = "json" # 任务过期时间 result_expires = 60*60*24 # 指定接受的内容类型 accept_content = ["json", "msgpack"]
代码示例
五、在后台运行worker
在生产中,我们需要在后台运行worker,官方文档daemonization教程中有详细描述。
守护程序脚本使用celery multi命令在后台启动一个或多个worker:
# 启动worker后台运行 celery multi start w1 -A proj.celeryapp -l info celery multi start w2 -A proj.celeryapp -l info PS:如果使用的是默认的celery.py,那么直接proj即可# 重启 celery multi restart w1 -A proj -l info# 停止 celery multi stop w1 -A proj -l info# 确保退出之前完成所有当前正在执行的任务 celery multi stopwait w1 -A proj -l info
默认情况下,它会在当前目录下创建的pid和日志文件,为了防止多个worker在彼此之上启动,最好将这些文件放在专用目录中:
mkdir /var/run/celery mkdir /var/log/celery celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n%I.log
六、指定队列传送任务
官方文档:https://celery.readthedocs.io/en/latest/userguide/routing.html#guide-routing
在 celeryconfig.py 中加入以下配置:
# 路由键以 task. 开头的消息都进default队列 # 路由键以 web. 开头的消息都进web_tasks队列 task_queues = (Queue("default", routing_key="task.#"),Queue("web_tasks", routing_key="web.#"), ) # 默认的交换机名字为tasks task_default_exchange = "tasks" # 设置默认交换类型为topic task_default_exchange_type = "topic" # 默认的路由键是 task.default task_default_routing_key = "task.default" # 要将任务路由到web_tasks队列,可以在task_routes设置中添加条目 task_routes = {# tasks.add的消息会进入web_tasks队列"proj.tasks.add": {"queue": "web_tasks","routing_key": "web.add",}, }
其他代码与上面 四 中的相同。
启动worker,指定该worker工作于哪个队列:
# 该worker只会执行web_tasks队列中的任务
celery -A proj.celeryapp worker -Q web_tasks -l info
七、定时任务
官方文档:https://celery.readthedocs.io/en/latest/userguide/periodic-tasks.html
Celery支持定时任务,设定好任务的执行时间,Celery就会定时自动帮你执行, 这个定时任务模块叫 celery beat。
函数版tasks.py:
![](/assets/blank.gif)
![](/assets/blank.gif)
from celery import Celery from celery.schedules import crontabapp = Celery("tasks", broker="amqp://pd:123456@localhost:5672//", backend="redis://:123456@localhost:6379/0") app.conf.timezone = "Asia/Shanghai"@app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs):# 每5秒执行一次 test("Hello")sender.add_periodic_task(5.0, test.s("Hello"), name="every-5s")# 每10秒执行一次 test("World")sender.add_periodic_task(10.0, test.s("World"), name="every-10s", expires=5)# 每周一早上 7:30 执行一次 test("Happy Mondays!") sender.add_periodic_task(crontab(hour=7, minute=30, day_of_week=1),test.s("Happy Mondays!"),)@app.task def test(arg):print(arg)
View Code
celery -A tasks worker -l info
celery -A tasks beat -l info
配置版:
![](/assets/blank.gif)
![](/assets/blank.gif)
########## celeryapp.py ########## from celery import Celery from . import celeryconfigapp = Celery("proj.celeryapp", include=["proj.tasks"]) app.config_from_object(celeryconfig)if __name__ == "__main__":app.start()########## celeryconfig.py ########## broker_url = "amqp://pd:123456@114.116.50.214:5672//" result_backend = "redis://:123456@114.116.50.214:6379/0" task_serializer = "msgpack" result_serializer = "json" result_expires = 60*60*24 accept_content = ["json", "msgpack"] timezone = "Asia/Shanghai"from celery.schedules import crontab beat_schedule = {"every-10s": {"task": "proj.tasks.add","schedule": 10.0,"args": (10, 10)},"every-monday-morning-7:30": {"task": "proj.tasks.mul","schedule": crontab(hour=7, minute=30, day_of_week=1),"args": (10, 10)} }########## tasks.py ########## from .celeryapp import app@app.task def add(x, y):return x+y@app.task def mul(x, y):return x*y
View Code
celery -A proj.celeryapp worker -l info
celery -A proj.celeryapp beat -l info
八、在Django中使用celery
发布任务
https://celery.readthedocs.io/en/latest/django/first-steps-with-django.html#extensions
项目布局:
![](/assets/blank.gif)
![](/assets/blank.gif)
import os from celery import Celeryos.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings") app = Celery("mysite") app.config_from_object("django.conf:settings", namespace="CELERY") app.autodiscover_tasks()@app.task(bind=True) def debug_task(self):print("Request: {0!r}".format(self.request))
celeryapp.py
![](/assets/blank.gif)
![](/assets/blank.gif)
from .celeryapp import app as celery_app __all__ = ["celery_app"]
__init__.py
settings.py,更多设置参考:https://celery.readthedocs.io/en/latest/userguide/configuration.html
#for celery CELERY_BROKER_URL = "amqp://pd:123456@114.116.50.214:5672//" CELERY_RESULT_BACKEND = "redis://:123456@114.116.50.214:6379/0"
在app里的tasks.py里编写任务:
from celery import shared_task@shared_task def add(x, y):return x+y@shared_task def mul(x, y):return x*y
在views里调用celery task:
from django.shortcuts import HttpResponse from app01 import tasksdef test(request):result = tasks.add.delay(100, 100)return HttpResponse(result.get())
定时任务
https://celery.readthedocs.io/en/latest/userguide/periodic-tasks.html#using-custom-scheduler-classes
1、安装 django-celery-beat
pip3 install django-celery-beat
2、在settings.py中设置
INSTALLED_APPS = [...,'django_celery_beat',
]
3、进行数据库迁移,以便创建定时任务所需的表
python3 manage.py migrate
4、开始监测定时任务
celery -A mysite.celeryapp beat -l info -S django
5、在django-admin界面设置定时任务
转载于:https://www.cnblogs.com/believepd/p/10643392.html
Celery-分布式任务队列相关推荐
- python 分布式队列_〖Python〗-- Celery分布式任务队列
[Celery分布式任务队列] 一.Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步 ...
- Celery 分布式任务队列快速入门
Celery 分布式任务队列快速入门 本节内容 Celery介绍和基本使用 在项目中如何使用celery 启用多个workers Celery 定时任务 与django结合 通过django配置cel ...
- Celery分布式任务队列的认识和基本操作
一.简单认识 Celery是由Python开发.简单.灵活.可靠的分布式任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务.Celery侧重于实时操作,但对调度支持也很好 ...
- Python定时任务库Celery——分布式任务队列
文章目录 定时任务库对比 简介 安装 初试 进阶 项目结构 配置文件 Celery实例化 实时任务 定时任务 调用任务 启动定时任务 任务状态跟踪 递归调用 Celery配置 命令行参数 分布式集群部 ...
- python Celery 分布式任务队列快速入门
本节内容 Celery介绍和基本使用 在项目中如何使用celery 启用多个workers Celery 定时任务 与django结合 通过django配置celery periodic task 一 ...
- Python 第三方库之 Celery 分布式任务队列
一.Celery介绍和使用: Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, ...
- Celery分布式任务队列学习记录
安装与环境配置 环境:Ubuntu18.04 安装celery root@ubuntu:/home/toohoo/learncelery# pip install celery==3.1.25 Col ...
- 分布式任务队列 Celery — 深入 Task
目录 文章目录 目录 前文列表 前言 Task 的实例化 任务的名字 任务的绑定 任务的重试 任务的请求上下文 任务的继承 前文列表 分布式任务队列 Celery 分布式任务队列 Celery -- ...
- 分布式任务队列 Celery — 应用基础
目录 文章目录 目录 前文列表 前言 Celery 的周期(定时)任务 Celery 的同步调用 Celery 结果储存 Celery 的监控 Celery 的调试 前文列表 分布式任务队列 Cele ...
- python任务队列 http_基于Python开发的分布式任务队列:Celery
Celery (芹菜)是基于Python开发的分布式任务队列.它支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度. 架构设计. Celery的架构由三部分组成,消息中间件(message ...
最新文章
- R语言 、Excel哪个更能胜任数据分析?
- java web调用c_Java调用C/C++程序
- java并发编程线程安全
- 《大话设计模式》读书笔记-建造者模式
- 太厉害了!3小时34分!53岁院士施一公完成人生首马
- hdu1247 字典树
- paip.基于urlrewrite的反向代理以及内容改写
- 点线面的意义_如何运用设计中的点线面?
- JS将16进制颜色转为rgba格式
- oracle创建一个永久性表空间,oracle创建表空间
- Mac mini M1使用简单体验(编程、游戏、深度学习)
- VC++域名转IP(网络直接连接域名)
- 出生年月缩写c语言,如何用C语言编写输入出生年月?
- 实验(1)信号的采样
- 嵌入式系统的数据结构与算法
- ❤️❤️❤️Unity废柴看过来,手把手教你做植物大战僵尸(二)—— 序列帧动画
- jQuery学习第二天——jQuery的常见效果(上)
- Neo4j调用APOC算法库
- 洛谷P2341(受欢迎的牛)题解
- C语言关键字之C89、C99、C11
热门文章
- 设置ListView每条数据之间的间隔
- 舍不得孩子套不着狼,早就应该换SSD硬盘了!
- 如何在查询分析器中执行dos命令
- 内存映射文件(File Mapping)API
- 【转】Android编码规范建议18条
- PO、BO、VO、DTO、POJO、DAO的区别
- docker: 解决centos7下cgroup.procs: no such device的错误
- spark(1.1) mllib 源代码分析
- 【html 及 HTML5所有标签汇总】★★★
- Web开发学习点滴(持续更新)