Django 中celery的使用
1、django应用Celery
django框架请求/响应的过程是同步的,框架本身无法实现异步响应。
但是我们在项目过程中会经常会遇到一些耗时的任务, 比如:发送邮件、发送短信、大数据统计等等,这些操作耗时长,同步执行对用户体验非常不友好,那么在这种情况下就需要实现异步执行。
异步执行前端一般使用ajax,后端使用Celery。
2 、项目应用
django项目应用celery,主要有两种任务方式,一是异步任务(发布者任务),一般是web请求,二是定时任务。
celery组成请看celery介绍_宠乖仪的博客-CSDN博客
本文使用的是redis数据库作为消息中间件和结果存储数据库
1.异步任务redis
1.安装库
pip install celery
pip install redis
2.celery.py
在主项目目录下,新建 celery.py 文件:
import os
import django
from celery import Celery
from django.conf import settings# 设置系统环境变量,安装django,必须设置,否则在启动celery时会报错
# celery_study 是当前项目名
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_study.settings')
django.setup()celery_app = Celery('celery_study')
celery_app.config_from_object('django.conf:settings')
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
注意:是和settings.py文件同目录,一定不能建立在项目根目录,不然会引起 celery 这个模块名的命名冲突
同时,在主项目的init.py中,添加如下代码:
from .celery import celery_app__all__ = ['celery_app']
3.settings.py
在配置文件中配置对应的redis配置:
# Broker配置,使用Redis作为消息中间件
BROKER_URL = 'redis://127.0.0.1:6379/0' # BACKEND配置,这里使用redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 结果序列化方案
CELERY_RESULT_SERIALIZER = 'json' # 任务结果过期时间,秒
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 时区配置
CELERY_TIMEZONE='Asia/Shanghai' # 指定导入的任务模块,可以指定多个
#CELERY_IMPORTS = (
# 'other_dir.tasks',
#)
注意:所有配置的官方文档:Configuration and defaults — Celery 5.2.0b3 documentation
4.tasks.py
在子应用下建立各自对应的任务文件tasks.py(必须是tasks.py这个名字,不允许修改)
from celery import shared_task@shared_task
def add(x, y):return x + y@shared_task
def mul(x, y):return x * y@shared_task
def xsum(numbers):return sum(numbers)
5.调用任务
from .tasks import *
# Create your views here.def task_add_view(request):add.delay(100,200)return HttpResponse(f'调用函数结果')
6.启动celery
pip install eventlet
celery -A celery_study worker -l debug -P eventlet
注意 :celery_study是项目名
使用redis时,有可能会出现如下类似的异常
AttributeError: 'str' object has no attribute 'items'
这是由于版本差异,需要卸载已经安装的python环境中的 redis 库,重新指定安装特定版本(celery4.x以下适用 redis2.10.6, celery4.3以上使用redis3.2.0以上):
xxxxxxxxxx pip install redis==2.10.6
7.获取任务结果
在 views.py 中,通过 AsyncResult.get() 获取结果
from celery import result
def get_result_by_taskid(request):task_id = request.GET.get('task_id')# 异步执行ar = result.AsyncResult(task_id)if ar.ready():return JsonResponse({'status': ar.state, 'result': ar.get()})else:return JsonResponse({'status': ar.state, 'result': ''})
AsyncResult类的常用的属性和方法:
state: 返回任务状态,等同status;
task_id: 返回任务id;
result: 返回任务结果,同get()方法;
ready(): 判断任务是否执行以及有结果,有结果为True,否则False;
info(): 获取任务信息,默认为结果;
wait(t): 等待t秒后获取结果,若任务执行完毕,则不等待直接获取结果,若任务在执行中,则wait期间一直阻塞,直到超时报错;
successful(): 判断任务是否成功,成功为True,否则为False;
2.定时任务
在第一步的异步任务的基础上,进行部分修改即可
1.settings.py
from celery.schedules import crontabCELERYBEAT_SCHEDULE = {'mul_every_30_seconds': {# 任务路径'task': 'celery_app.tasks.mul',# 每30秒执行一次'schedule': 5,'args': (14, 5)}
}
说明(更多内容见文档:Periodic Tasks — Celery 5.2.0b3 documentation):
task:任务函数
schedule:执行频率,可以是整型(秒数),也可以是timedelta对象,也可以是crontab对象,也可以是自定义类(继承celery.schedules.schedule)
args:位置参数,列表或元组
kwargs:关键字参数,字典
options:可选参数,字典,任何 apply_async() 支持的参数
relative:默认是False,取相对于beat的开始时间;设置为True,则取设置的timedelta时间
在task.py中设置了日志
from celery import shared_task
import logging
logger = logging.getLogger(__name__))@shared_task
def mul(x, y):logger.info('___mul__'*10)return x * y
2.启动celery
(两个cmd)分别启动worker和beat
celery -A worker celery_study -l debug -P eventlet
celery beat -A celery_study -l debug
3.任务绑定
Celery可通过task绑定到实例获取到task的上下文,这样我们可以在task运行时候获取到task的状态,记录相关日志等
方法:
在装饰器中加入参数 bind=True
在task函数中的第一个参数设置为self
在task.py 里面写
from celery import shared_task
import logging
logger = logging.getLogger(__name__)# 任务绑定
@shared_task(bind=True)
def add(self,x, y):logger.info('add__-----'*10)logger.info('name:',self.name)logger.info('dir(self)',dir(self))return x + y
其中:self对象是celery.app.task.Task的实例,可以用于实现重试等多种功能
from celery import shared_task
import logging
logger = logging.getLogger(__name__)# 任务绑定
@shared_task(bind=True)
def add(self,x, y):try:logger.info('add__-----'*10)logger.info('name:',self.name)logger.info('dir(self)',dir(self))raise Exceptionexcept Exception as e:# 出错每4秒尝试一次,总共尝试4次self.retry(exc=e, countdown=4, max_retries=4) return x + y
启动celery
celery -A worker celery_study -l debug -P eventlet
4.任务钩子
Celery在执行任务时,提供了钩子方法用于在任务执行完成时候进行对应的操作,在Task源码中提供了很多状态钩子函数如:on_success(成功后执行)、on_failure(失败时候执行)、on_retry(任务重试时候执行)、after_return(任务返回时候执行)
方法:通过继承Task类,重写对应方法即可,
from celery import Taskclass MyHookTask(Task):def on_success(self, retval, task_id, args, kwargs):logger.info(f'task id:{task_id} , arg:{args} , successful !')def on_failure(self, exc, task_id, args, kwargs, einfo):logger.info(f'task id:{task_id} , arg:{args} , failed ! erros: {exc}')def on_retry(self, exc, task_id, args, kwargs, einfo):logger.info(f'task id:{task_id} , arg:{args} , retry ! erros: {exc}')# 在对应的task函数的装饰器中,通过 base=MyHookTask 指定
@shared_task(base=MyHookTask, bind=True)
def add(self,x, y):logger.info('add__-----'*10)logger.info('name:',self.name)logger.info('dir(self)',dir(self))return x + y
启动celery
celery -A worker celery_study -l debug -P eventlet
5.任务编排
在很多情况下,一个任务需要由多个子任务或者一个任务需要很多步骤才能完成,Celery也能实现这样的任务,完成这类型的任务通过以下模块完成:
group: 并行调度任务
chain: 链式任务调度
chord: 类似group,但分header和body2个部分,header可以是一个group任务,执行完成后调用body的任务
map: 映射调度,通过输入多个入参来多次调度同一个任务
starmap: 类似map,入参类似*args
chunks: 将任务按照一定数量进行分组
文档:Next Steps — Celery 5.2.0b3 documentation
1.group
urls.py:
path('primitive/', views.test_primitive),
views.py:
from .tasks import *
from celery import groupdef test_primitive(request):# 创建10个并列的任务lazy_group = group(add.s(i, i) for i in range(10))promise = lazy_group()result = promise.get()return JsonResponse({'function': 'test_primitive', 'result': result})
说明:
通过task函数的 s 方法传入参数,启动任务
上面这种方法需要进行等待,如果依然想实现异步的方式,那么就必须在tasks.py中新建一个task方法,调用group,示例如下:
tasks.py:
@shared_task
def group_task(num):return group(add.s(i, i) for i in range(num))().get()
urls.py:
path('first_group/', views.first_group),
views.py:
def first_group(request):ar = tasks.group_task.delay(10)return HttpResponse('返回first_group任务,task_id:' + ar.task_id)
2.chain
默认上一个任务的结果作为下一个任务的第一个参数
def test_primitive(request):# 等同调用 mul(add(add(2, 2), 5), 8)promise = chain(tasks.add.s(2, 2), tasks.add.s(5), tasks.mul.s(8))()# 72result = promise.get() return JsonResponse({'function': 'test_primitive', 'result': result})
3.chord
任务分割,分为header和body两部分,hearder任务执行完在执行body,其中hearder返回结果作为参数传递给body
def test_primitive(request):# header: [3, 12] # body: xsum([3, 12])promise = chord(header=[tasks.add.s(1,2),tasks.mul.s(3,4)],body=tasks.xsum.s())()result = promise.get()return JsonResponse({'function': 'test_primitive', 'result': result})
6、celery管理和监控
celery通过flower组件实现管理和监控功能 ,flower组件不仅仅提供监控功能,还提供HTTP API可实现对woker和task的管理
官网:flower · PyPI
文档:Flower - Celery monitoring tool — Flower 1.0.1 documentation
安装flower
pip install flower
启动flower
flower -A celery_study --port=5555
说明:
-A:项目名
--port: 端口号
访问
在浏览器输入:http://127.0.0.1:5555
通过api操作
curl http://127.0.0.1:5555/api/workers
Django 中celery的使用相关推荐
- 任务队列:celery快速入门及django中celery的用法
文章目录 一.celey的简介 1.1 celery的工作机制 1.2 安装celery(5.2版本) 二.celery快速入门 2.1 选择broker 2.2 celery的简单使用 2.2.1 ...
- Django中Celery简介
初识Celery: Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,可将一些耗时的任务放入该消息队列中处理,一些定时任务也可以放入队列中自动执行 ...
- Django中celery配置总结
情景: 用户发起request,并等待response返回.在本些views中,可能需要执行一段耗时的程序,那么用户就会等待很长时间, 造成不好的用户体验,比如发送邮件.手机验证码等. 使用celer ...
- celery 可视化_Django中Celery的实现介绍(一)
Django中Celery的实现 Celery介绍 Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度. 上图展示的是Celery ...
- Django+redis+celery实现异步任务
1 Django中的异步请求 Django Web中从一个http请求发起,到获得响应返回html页面的流程大致如下:http请求发起 -- http handling(request解析) -- u ...
- django中使用celery简单介绍
链客,专为开发者而生,有问必答! 此文章来自区块链技术社区,未经允许拒绝转载. 本章节我们重点在于实现,如何存储任务的结果. 我们将任务函数改为: from celery_demo.celery im ...
- Django 中使用Celery实现异步任务
1.在settings.py 同级目录下,新增一个celery.py的文件 需要注意的是:你的项目目录名要和配置一样 例如我的项目目录名就是 base_django_api 我的目录结构如下: # c ...
- django中的缓存以及跨域
django中的缓存 先来了解以下问题?(面试会问) 如何提高网站的并发量: QPS:Queries Per Second意思是"每秒查询率",是一台服务器每秒能够相应的查询次数, ...
- Django(Celery+日志)
celery文档参考:http://docs.jinkan.org/docs/celery/ 同步请求:所有逻辑处理.数据计算任务在View中处理完毕后返回response.在View处理任务时用户处 ...
最新文章
- 成功解决ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or
- DL之DeepLabv2:DeepLab v2算法的简介(论文介绍)、架构详解、案例应用等配图集合之详细攻略
- python 新闻摘要_每日新闻摘要:运营商承诺他们不再出售您的位置…
- oracle查询本月第一天_oracle获取本月第一天和最后一天及Oracle trunc()函数的用法...
- 织梦cms生成首页html的php文件,织梦DedeCMS定时自动生成首页HTML的实现方法
- leetcode题解538-把二叉搜索树转化为累加树
- Oracle表重命名后索引、约束、权限、同义词的影响
- Redis主从复制和集群配置
- linux下驱动程序数字签名,64位Windows操作系统为驱动程序添加测试数字签名
- LINUX EMOS部署及管理
- SkeyePlayer 超低延迟RTSP/RTMP流媒体播放器插件使用说明
- 问题 J: LZY订单查询
- Android:SQLite数据库学习小结
- 2022年全球颈椎按摩仪市场前景分析及研究报告
- 浅谈“决策引擎”在身份管理的应用
- negroni包和mux包的一点理解
- ROS学习记录9——urdf文件的创建与使用
- 【金字塔原理2】剖析金字塔的内部结构
- [ 前端开发 ] label标签的使用
- 查看电脑连接过的所有无线的密码