一、介绍

官方文档:http://docs.celeryproject.org/en/latest/index.html

pip3 install celery

Celery是一个专注于实时处理和任务调度的分布式任务队列,通过它可以轻松的实现任务的异步处理。

使用Celery的常见场景:

  • Web应用。当用户触发的一个操作需要较长时间才能执行完成时,可以把它作为任务交给Celery去异步执行,执行完再返回给用户。这段时间用户不需要等待,提高了网站的整体吞吐量和响应时间。
  • 定时任务。生产环境经常会跑一些定时任务。假如你有上千台的服务器、上千种任务,定时任务的管理很困难,Celery可以帮助我们快速在不同的机器设定不同种任务。
  • 同步完成的附加工作都可以异步完成。比如发送短信/邮件、推送消息、清理/设置缓存等。

Celery包含如下组件:

  • Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
  • Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
  • Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。
  • Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
  • Result Backend:任务处理完后保存状态信息和结果,以供查询。

二、简单示例

创建一个tasks.py:

from celery import Celeryapp = Celery("tasks", broker="amqp://pd:123456@localhost:5672//",backend="redis://:123456@localhost:6379/0")@app.task
def add(x, y):return x+y

启动Celery Worker来开始监听并执行任务:

celery -A tasks worker -l info

更多有关命令:

celery worker --help

再打开一个终端, 进行命令行模式,调用任务:

>>> from tasks import add
>>> relt = add.delay(10, 10)
>>> relt.ready()  # 检查任务是否已经完成
True
>>> relt.get()    # 获取任务结果,可设置timeout超时
20
>>> relt
<AsyncResult: 470d5f45-42eb-4b0c-bd38-06b85fa5599b>
>>> relt.id
'470d5f45-42eb-4b0c-bd38-06b85fa5599b'
>>> relt.result
20
>>> relt.status
'SUCCESS'

from celery import Celery
from celery.result import AsyncResultapp = Celery("tasks", broker="amqp://pd:123456@localhost:5672/pdvhost",backend="redis://:123456@localhost:6379/0")result = AsyncResult(id="470d5f45-42eb-4b0c-bd38-06b85fa5599b", app=app)
print(result.get())  # 20

View Code

三、配置

官方文档,配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html#configuration

像上面简单示例中,要想添加配置,则可以直接在应用程序设置配置:

app.conf.task_serializer = "json"

如果您一次配置多个设置,则:

app.conf.update(task_serializer="json",accept_content=["json"],result_serializer="json",timezone="Europe/Oslo",enable_utc=True,
)

对于大型项目,建议使用专用配置模块。因为项目复杂,最好做到程序的解耦,所以将配置保存在集中位置是一个非常好的选择,一般默认 celeryconfig.py 模块是用来保存配置的,你也可以使用自己定义的名字,然后通过调用 app.config_from_object() 方法告诉 Celery 实例使用配置模块:

app.config_from_object("celeryconfig")
# 或者
from . import celeryconfig
app.config_from_object(celeryconfig)

四、在项目中使用Celery

项目布局:

方案选择:

  1. RabbitMQ作为消息代理。不选Redis是因为如果Redis发生意外,会造成数据丢失等后果。
  2. Msgpack做序列化。Msgpack是一个二进制的类json的序列化方案,它比json的数据结构更小,传输更快。
  3. Redis做结果存储。
pip3 install msgpack

########## celeryapp.py ##########
from celery import Celery
from . import celeryconfigapp = Celery("proj.celeryapp", include=["proj.tasks"])
app.config_from_object(celeryconfig)if __name__ == "__main__":app.start()########## tasks.py ##########
from .celeryapp import app@app.task
def add(x, y):return x+y@app.task
def mul(x, y):return x*y########## celeryconfig.py ##########
# 使用RabbitMQ作为消息代理
broker_url = "amqp://pd:123456@114.116.50.214:5672//"
# # 把任务结果存在了Redis
result_backend = "redis://:123456@114.116.50.214:6379/0"
# 任务序列化和反序列化使用msgpack方案
task_serializer = "msgpack"
# 读取任务结果一般性能要求不高,所以使用了可读性更好的json
result_serializer = "json"
# 任务过期时间
result_expires = 60*60*24
# 指定接受的内容类型
accept_content = ["json", "msgpack"]

代码示例

五、在后台运行worker

在生产中,我们需要在后台运行worker,官方文档daemonization教程中有详细描述。

守护程序脚本使用celery multi命令在后台启动一个或多个worker:

# 启动worker后台运行
celery multi start w1 -A proj.celeryapp -l info
celery multi start w2 -A proj.celeryapp -l info
PS:如果使用的是默认的celery.py,那么直接proj即可# 重启
celery multi restart w1 -A proj -l info# 停止
celery multi stop w1 -A proj -l info# 确保退出之前完成所有当前正在执行的任务
celery multi stopwait w1 -A proj -l info

默认情况下,它会在当前目录下创建的pid和日志文件,为了防止多个worker在彼此之上启动,最好将这些文件放在专用目录中:

mkdir /var/run/celery
mkdir /var/log/celery
celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n%I.log

六、指定队列传送任务

官方文档:https://celery.readthedocs.io/en/latest/userguide/routing.html#guide-routing

在 celeryconfig.py 中加入以下配置:

# 路由键以 task. 开头的消息都进default队列
# 路由键以 web. 开头的消息都进web_tasks队列
task_queues = (Queue("default", routing_key="task.#"),Queue("web_tasks", routing_key="web.#"),
)
# 默认的交换机名字为tasks
task_default_exchange = "tasks"
# 设置默认交换类型为topic
task_default_exchange_type = "topic"
# 默认的路由键是 task.default
task_default_routing_key = "task.default"
# 要将任务路由到web_tasks队列,可以在task_routes设置中添加条目
task_routes = {# tasks.add的消息会进入web_tasks队列"proj.tasks.add": {"queue": "web_tasks","routing_key": "web.add",},
}

其他代码与上面 四 中的相同。

启动worker,指定该worker工作于哪个队列:

# 该worker只会执行web_tasks队列中的任务
celery -A proj.celeryapp worker -Q web_tasks -l info

七、定时任务

官方文档:https://celery.readthedocs.io/en/latest/userguide/periodic-tasks.html

Celery支持定时任务,设定好任务的执行时间,Celery就会定时自动帮你执行, 这个定时任务模块叫 celery beat。

函数版tasks.py:

from celery import Celery
from celery.schedules import crontabapp = Celery("tasks", broker="amqp://pd:123456@localhost:5672//", backend="redis://:123456@localhost:6379/0")
app.conf.timezone = "Asia/Shanghai"@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):# 每5秒执行一次 test("Hello")sender.add_periodic_task(5.0, test.s("Hello"), name="every-5s")# 每10秒执行一次 test("World")sender.add_periodic_task(10.0, test.s("World"), name="every-10s", expires=5)# 每周一早上 7:30 执行一次 test("Happy Mondays!")
    sender.add_periodic_task(crontab(hour=7, minute=30, day_of_week=1),test.s("Happy Mondays!"),)@app.task
def test(arg):print(arg)

View Code

celery -A tasks worker -l info
celery -A tasks beat -l info

配置版:

########## celeryapp.py ##########
from celery import Celery
from . import celeryconfigapp = Celery("proj.celeryapp", include=["proj.tasks"])
app.config_from_object(celeryconfig)if __name__ == "__main__":app.start()########## celeryconfig.py ##########
broker_url = "amqp://pd:123456@114.116.50.214:5672//"
result_backend = "redis://:123456@114.116.50.214:6379/0"
task_serializer = "msgpack"
result_serializer = "json"
result_expires = 60*60*24
accept_content = ["json", "msgpack"]
timezone = "Asia/Shanghai"from celery.schedules import crontab
beat_schedule = {"every-10s": {"task": "proj.tasks.add","schedule": 10.0,"args": (10, 10)},"every-monday-morning-7:30": {"task": "proj.tasks.mul","schedule": crontab(hour=7, minute=30, day_of_week=1),"args": (10, 10)}
}########## tasks.py ##########
from .celeryapp import app@app.task
def add(x, y):return x+y@app.task
def mul(x, y):return x*y

View Code

celery -A proj.celeryapp worker -l info
celery -A proj.celeryapp beat -l info 

八、在Django中使用celery

发布任务

https://celery.readthedocs.io/en/latest/django/first-steps-with-django.html#extensions

项目布局:

import os
from celery import Celeryos.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
app = Celery("mysite")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()@app.task(bind=True)
def debug_task(self):print("Request: {0!r}".format(self.request))

celeryapp.py

from .celeryapp import app as celery_app
__all__ = ["celery_app"]

__init__.py

settings.py,更多设置参考:https://celery.readthedocs.io/en/latest/userguide/configuration.html

#for celery
CELERY_BROKER_URL = "amqp://pd:123456@114.116.50.214:5672//"
CELERY_RESULT_BACKEND = "redis://:123456@114.116.50.214:6379/0"

在app里的tasks.py里编写任务:

from celery import shared_task@shared_task
def add(x, y):return x+y@shared_task
def mul(x, y):return x*y

在views里调用celery task:

from django.shortcuts import HttpResponse
from app01 import tasksdef test(request):result = tasks.add.delay(100, 100)return HttpResponse(result.get())

定时任务

https://celery.readthedocs.io/en/latest/userguide/periodic-tasks.html#using-custom-scheduler-classes

1、安装 django-celery-beat

pip3 install django-celery-beat

2、在settings.py中设置

INSTALLED_APPS = [...,'django_celery_beat',
]

3、进行数据库迁移,以便创建定时任务所需的表

python3 manage.py migrate

4、开始监测定时任务

celery -A mysite.celeryapp beat -l info -S django

5、在django-admin界面设置定时任务

转载于:https://www.cnblogs.com/believepd/p/10643392.html

Celery-分布式任务队列相关推荐

  1. python 分布式队列_〖Python〗-- Celery分布式任务队列

    [Celery分布式任务队列] 一.Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步 ...

  2. Celery 分布式任务队列快速入门

    Celery 分布式任务队列快速入门 本节内容 Celery介绍和基本使用 在项目中如何使用celery 启用多个workers Celery 定时任务 与django结合 通过django配置cel ...

  3. Celery分布式任务队列的认识和基本操作

    一.简单认识 Celery是由Python开发.简单.灵活.可靠的分布式任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务.Celery侧重于实时操作,但对调度支持也很好 ...

  4. Python定时任务库Celery——分布式任务队列

    文章目录 定时任务库对比 简介 安装 初试 进阶 项目结构 配置文件 Celery实例化 实时任务 定时任务 调用任务 启动定时任务 任务状态跟踪 递归调用 Celery配置 命令行参数 分布式集群部 ...

  5. python Celery 分布式任务队列快速入门

    本节内容 Celery介绍和基本使用 在项目中如何使用celery 启用多个workers Celery 定时任务 与django结合 通过django配置celery periodic task 一 ...

  6. Python 第三方库之 Celery 分布式任务队列

    一.Celery介绍和使用: Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, ...

  7. Celery分布式任务队列学习记录

    安装与环境配置 环境:Ubuntu18.04 安装celery root@ubuntu:/home/toohoo/learncelery# pip install celery==3.1.25 Col ...

  8. 分布式任务队列 Celery — 深入 Task

    目录 文章目录 目录 前文列表 前言 Task 的实例化 任务的名字 任务的绑定 任务的重试 任务的请求上下文 任务的继承 前文列表 分布式任务队列 Celery 分布式任务队列 Celery -- ...

  9. 分布式任务队列 Celery — 应用基础

    目录 文章目录 目录 前文列表 前言 Celery 的周期(定时)任务 Celery 的同步调用 Celery 结果储存 Celery 的监控 Celery 的调试 前文列表 分布式任务队列 Cele ...

  10. python任务队列 http_基于Python开发的分布式任务队列:Celery

    Celery (芹菜)是基于Python开发的分布式任务队列.它支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度. 架构设计. Celery的架构由三部分组成,消息中间件(message ...

最新文章

  1. R语言 、Excel哪个更能胜任数据分析?
  2. java web调用c_Java调用C/C++程序
  3. java并发编程线程安全
  4. 《大话设计模式》读书笔记-建造者模式
  5. 太厉害了!3小时34分!53岁院士施一公完成人生首马
  6. hdu1247 字典树
  7. paip.基于urlrewrite的反向代理以及内容改写
  8. 点线面的意义_如何运用设计中的点线面?
  9. JS将16进制颜色转为rgba格式
  10. oracle创建一个永久性表空间,oracle创建表空间
  11. Mac mini M1使用简单体验(编程、游戏、深度学习)
  12. VC++域名转IP(网络直接连接域名)
  13. 出生年月缩写c语言,如何用C语言编写输入出生年月?
  14. 实验(1)信号的采样
  15. 嵌入式系统的数据结构与算法
  16. ❤️❤️❤️Unity废柴看过来,手把手教你做植物大战僵尸(二)—— 序列帧动画
  17. jQuery学习第二天——jQuery的常见效果(上)
  18. Neo4j调用APOC算法库
  19. 洛谷P2341(受欢迎的牛)题解
  20. C语言关键字之C89、C99、C11

热门文章

  1. 设置ListView每条数据之间的间隔
  2. 舍不得孩子套不着狼,早就应该换SSD硬盘了!
  3. 如何在查询分析器中执行dos命令
  4. 内存映射文件(File Mapping)API
  5. 【转】Android编码规范建议18条
  6. PO、BO、VO、DTO、POJO、DAO的区别
  7. docker: 解决centos7下cgroup.procs: no such device的错误
  8. spark(1.1) mllib 源代码分析
  9. 【html 及 HTML5所有标签汇总】★★★
  10. Web开发学习点滴(持续更新)