一、什么是Celery

1.1、celery是什么

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

另外, Celery还支持不同的并发和序列化的手段

  • 并发:Prefork, Eventlet, gevent, threads/single threaded
  • 序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等

1.2、使用场景

celery是一个强大的 分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。

异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

定时任务:定时执行某件事情,比如每天数据统计

1.3、Celery具有以下优点

Simple(简单)
Celery 使用和维护都非常简单,并且不需要配置文件。Highly Available(高可用)
woker和client会在网络连接丢失或者失败时,自动进行重试。并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用。Fast(快速)
单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时)Flexible(灵活)
Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。

1.4、Celery安装

你可以安装Celery通过Python包管理平台(PyPI)或者源码安装
使用pip安装:

$ pip install -U Celery

或着:

$ sudo easy_install Celery

二、Celery执行异步任务

2.1、基本使用

创建项目celerypro

创建异步任务执行文件celery_task:

import celery
import time
backend='redis://127.0.0.1:6379/1'
broker='redis://127.0.0.1:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def send_email(name):print("向%s发送邮件..."%name)time.sleep(5)print("向%s发送邮件完成"%name)return "ok"  

创建执行任务文件,produce_task.py:

from celery_task import send_email
result = send_email.delay("yuan")
print(result.id)
result2 = send_email.delay("alex")
print(result2.id)  

注意,异步任务文件命令执行:

celery worker -A celery_app_task -l info

创建py文件:result.py,查看任务执行结果,

from celery.result import AsyncResult
from celery_task import celasync_result=AsyncResult(id="c6ddd5b7-a662-4f0e-93d4-ab69ec2aea5d", app=cel)if async_result.successful():result = async_result.get()print(result)# result.forget() # 将结果删除
elif async_result.failed():print('执行失败')
elif async_result.status == 'PENDING':print('任务等待中被执行')
elif async_result.status == 'RETRY':print('任务异常后正在重试')
elif async_result.status == 'STARTED':print('任务已经开始被执行')

2.1、多任务结构

celery.py:

from celery import Celerycel = Celery('celery_demo',broker='redis://127.0.0.1:6379/1',backend='redis://127.0.0.1:6379/2',# 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类include=['celery_tasks.task01','celery_tasks.task02'])# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

task01.py,task02.py:

#task01
import time
from celery_tasks.celery import cel@cel.task
def send_email(res):time.sleep(5)return "完成向%s发送邮件任务"%res#task02
import time
from celery_tasks.celery import cel
@cel.task
def send_msg(name):time.sleep(5)return "完成向%s发送短信任务"%name

produce_task.py:

from celery_tasks.task01 import send_email
from celery_tasks.task02 import send_msg# 立即告知celery去执行test_celery任务,并传入一个参数
result = send_email.delay('yuan')
print(result.id)
result = send_msg.delay('yuan')
print(result.id)

check_result.py:

from celery.result import AsyncResult
from celery_tasks.celery import celasync_result = AsyncResult(id="562834c6-e4be-46d2-908a-b102adbbf390", app=cel)if async_result.successful():result = async_result.get()print(result)# result.forget() # 将结果删除,执行完成,结果不会自动删除# async.revoke(terminate=True)  # 无论现在是什么时候,都要终止# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
elif async_result.failed():print('执行失败')
elif async_result.status == 'PENDING':print('任务等待中被执行')
elif async_result.status == 'RETRY':print('任务异常后正在重试')
elif async_result.status == 'STARTED':print('任务已经开始被执行')

开启work:celery worker -A celery_task -l info -P eventlet,添加任务(执行produce_task.py),检查任务执行结果(执行check_result.py)

三、Celery执行定时任务

设定时间让celery执行一个定时任务,produce_task.py:

from celery_task import send_email
from datetime import datetime# 方式一
# v1 = datetime(2020, 3, 11, 16, 19, 00)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = send_email.apply_async(args=["egon",], eta=v2)
# print(result.id)# 方式二
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay# 使用apply_async并设定时间
result = send_email.apply_async(args=["egon"], eta=task_time)
print(result.id)

多任务结构中celery.py修改如下:

from datetime import timedelta
from celery import Celery
from celery.schedules import crontabcel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=['celery_tasks.task01','celery_tasks.task02',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = Falsecel.conf.beat_schedule = {# 名字随意命名'add-every-10-seconds': {# 执行tasks1下的test_celery函数'task': 'celery_tasks.task01.send_email',# 每隔2秒执行一次# 'schedule': 1.0,# 'schedule': crontab(minute="*/1"),'schedule': timedelta(seconds=6),# 传递参数'args': ('张三',)},# 'add-every-12-seconds': {#     'task': 'celery_tasks.task01.send_email',#     每年4月11号,8点42分执行#     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),#     'args': ('张三',)# },
} 
# 启动 Beat 程序$ celery beat -A proj
# Celery Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列# 之后启动 worker 进程.$ celery -A proj worker -l info 或者$ celery -B -A proj worker -l info

四、Django中使用celery

项目根目录创建celery包,目录结构如下:

mycelery/
├── config.py
├── __init__.py
├── main.py
└── sms/├── __init__.py├── tasks.py

配置文件config.py:

broker_url = 'redis://127.0.0.1:6379/15'
result_backend = 'redis://127.0.0.1:6379/14'

任务文件tasks.py:

# celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!
from mycelerys.main import app
import timeimport logging
log = logging.getLogger("django")@app.task  # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
def send_sms(mobile):"""发送短信"""print("向手机号%s发送短信成功!"%mobile)time.sleep(5)return "send_sms OK"@app.task  # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
def send_sms2(mobile):print("向手机号%s发送短信成功!" % mobile)time.sleep(5)return "send_sms2 OK"

最后在main.py主程序中对django的配置文件进行加载

# 主程序
import os
from celery import Celery
# 创建celery实例对象
app = Celery("sms")# 把celery和django进行组合,识别和加载django的配置文件
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryPros.settings.dev')# 通过app对象加载配置
app.config_from_object("mycelerys.config")# 加载任务
# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
# app.autodiscover_tasks(["任务1","任务2"])
app.autodiscover_tasks(["mycelerys.sms",])# 启动Celery的命令
# 强烈建议切换目录到mycelery根目录下启动
# celery -A mycelery.main worker --loglevel=info

Django视图调用:

from django.shortcuts import render# Create your views here.from django.shortcuts import render,HttpResponse
from mycelerys.sms.tasks import send_sms,send_sms2
from datetime import timedeltafrom datetime import datetime
def test(request):################################# 异步任务# 1. 声明一个和celery一模一样的任务函数,但是我们可以导包来解决# send_sms.delay("110")# send_sms2.delay("119")# send_sms.delay() 如果调用的任务函数没有参数,则不需要填写任何内容################################# 定时任务# ctime = datetime.now()# # 默认用utc时间# utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())# time_delay = timedelta(seconds=10)# task_time = utc_ctime + time_delay# result = send_sms.apply_async(["911", ], eta=task_time)# print(result.id)return HttpResponse('ok')

牛哄哄的Celery相关推荐

  1. 牛逼哄哄的SLAM技术 即将颠覆哪些领域?

    牛逼哄哄的SLAM技术 即将颠覆哪些领域? 0评论 2016-05-12 21:15:02 来源:雷锋网 作者:宗仁 一般人我不告诉他,绝佳买入机会! 什么是SLAM?机器人在未知环境中,要实现智能化 ...

  2. 牛逼哄哄的SLAM技术即将颠覆哪些领域

    原标题:牛逼哄哄的SLAM技术即将颠覆哪些领域? 什么是SLAM?机器人在未知环境中,要实现智能化需要完成三个任务,第一个是定位(Localization),第二个是建图(Mapping),第三个则是 ...

  3. GitHub 上 6 款牛哄哄的后台模板

    今日推荐 一个基于SpringBoot+Vue的百度网盘高仿项目 一个Github项目搞定微信.QQ.支付宝等第三方登录 推荐 7 个牛哄哄 Spring Cloud 实战项目 一套既美观又方便的后台 ...

  4. java swing 动态生成表格_6 个曾经牛逼哄哄的 Java 技术,你用过吗?

    大家好啊,今天给大家分享下我的开发历程中,我知道的那些被淘汰的技术或者框架,有些我甚至都没有用过,但我知道它曾经风光过. 废话不多说,下面我要开始吹了-- 1.Swing 下面这个是用 swing 开 ...

  5. ios安卓模拟器_IOS全球首款手游模拟器,牛逼哄哄但没有卵用!

    分享IOS和MAC少有人知的软件 100000+果粉都在看    关注 1 黑雷模拟器是什么? 最近看到很多平台都在发布这款苹果模拟器,看了很多文章不得不说都是一些假果粉,很多应该就是为了蹭热点,对这 ...

  6. 【牛哄哄】正版WINDOWS长啥样

    "老板,来瓶茅台!"新年将至,新朋老友来聚会,前天晚上猪哼哼算了一宿的帐,收入颇令自己满意,今天出手自然阔绰起来. "猪先生,请问你是要真货,还是要水货,水货价格很便宜的 ...

  7. 你知道牛X哄哄帕斯卡计算器吗?

    这台机器应该是人类史上第一台能做加减法的机械计算器.好牛X,被冠以第一台头衔. 布莱兹·帕斯卡(Blaise Pascal)这货在科学的历史上是一个牛气冲天,牛X哄哄的人.如何"牛气冲天,牛 ...

  8. 南京理工 大连理工 计算机,北理、大工、华理、华工、南理工,谁家就业牛哄哄?...

    原标题:北理.大工.华理.华工.南理工,谁家就业牛哄哄? 在中国的"一流大学"和"一流学科"建设名单中,理工类大学占据重要地位,属于中国教育高地的重要阵营.同时 ...

  9. 表格下划线怎么加粗_这招高!Excel签名栏的下划线随列宽变化,是不是感觉牛哄哄的?...

    职场牛人的世界总有各种高招,随手蹦出,令人赞叹不已! 今天早上去填一个表格,在输入签名时,突然发现签名处的下划线是随着列宽的变化而自动变化,这绝对是动态的,这是怎么做到的呢? 我特意看了一下这个单元格 ...

最新文章

  1. Netflix的高可用架构建议
  2. linux sort命令
  3. spring2.5+struts2+hibernate+mysql
  4. SAP WebClient UI component模型元数据解析工具
  5. 离线java人脸识别开源算法_Java 离线人脸识别 基于ArcFace 2.0 Demo
  6. 前端学习(3067):vue+element今日头条管理-频道筛选完成
  7. 电路 晶振频率_都说晶振是电路的心脏,你真的了解它吗?
  8. win7系统怎么搭建web服务器,win7系统搭建web服务器的操作方法
  9. 小白用GitHub快速搭建自己的网站,可访问,不用买域名、服务器
  10. Linux expect脚本使用详细说明及示例
  11. 【nodejs学习】0.nodejs学习第一天
  12. docker使用阿里云仓库上传与下拉images
  13. 邱锡鹏nndl学习记录
  14. 千寻位置48小时“复活”伽利略卫星定位系统
  15. IT界的复仇者联盟解读
  16. PostgreSQL使用PgAdmin导入数据
  17. 牛客小白月赛58 B(暴力)C(思维)D(dp滚动数组优化)
  18. 使用AHK减少鼠标和方向键的使用频率,高效编辑
  19. XMind (2022)新版思维导图新增功能介绍
  20. 代理后域名及Https协议向后传递,后端Spring获取不到问题记录及分析

热门文章

  1. 中国工程院院士谭建荣:工业大数据与定制化设计—关键技术与典型应用
  2. scrapy源码8 - contextfactory
  3. Android写入文件到U盘时突然断电,数据丢失
  4. 吉林大学计算机考研资料汇总
  5. VMware 各版本下载教程
  6. 金蝶EAS之编辑界面滚动条设置
  7. 给你安利一款Mac上好用的音频格式转换软件——Music Converter
  8. 力扣123. 买卖股票的最佳时机 III
  9. 2022年医院行业研究报告
  10. 总结几种常见的网络攻击,及解决方案