Celery
1.环境搭建
pip install celery
如果是windows,则需要多安装一个eventlet
pip install eventlet
2.两种celery任务结构:提倡用包管理,结构更清晰
# 如果 Celery对象:Celery(...) 是放在一个模块下的
# 1)终端切换到该模块所在文件夹位置:scripts
# 2)执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet
# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
# 注:模块名随意
# 如果 Celery对象:Celery(...) 是放在一个包下的
# 1)必须在这个包下建一个celery.py的文件,将Celery(...)产生对象的语句放在该文件中
# 2)执行启动worker的命令:celery worker -A 包名 -l info -P eventlet
# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
# 注:包名随意
基本结构
# 创建py文件:celery_app_task.py
from celery import Celery
# broker = 'redis://:123456@127.0.0.1:6379/1' # broker任务队列 加密码的写法
broker = 'redis://127.0.0.1:6379/1' # broker任务队列 不加密码
backend = 'redis://127.0.0.1:6379/2' # 结构存储,执行完的结果存在这
app = Celery(__name__,broker=broker,backend=backend)
app.conf.timezone='Asia/Shanghai' # 使用中国时间
app.conf.enable_utc = False # 禁用utc时间
@app.task
def x1(x,y):
return x+y
# 用命令来执行
# 非windows:
# celery worker -A 模块名(不带后缀!!!) -l info # 如果提示-A未找到用下面的
# celery -A 模块名 worker -l info
# windows(同上,如果报错安装eventlet并在运行时加入):
# celery -A 模块名 worker -l info -P eventlet
包架构封装(多任务结构)
project
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须叫celery.py
│ └── tasks.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果
3.基本使用
1)创建app + 任务
# 2)启动celery(app)服务:
# 非windows
# 命令:celery worker -A celery_tasks -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_tasks -l info -P eventlet
# 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本
# 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本
celery_tasks.py
from celery import Celery
# broker = 'redis://:123456@127.0.0.1:6379/1' # broker任务队列 这是带密码的写法
broker = 'redis://127.0.0.1:6379/1' # broker任务队列
backend = 'redis://127.0.0.1:6379/2' # 结构存储,执行完的结果存在这
app = Celery(__name__,broker=broker,backend=backend)
@app.task
def add(x,y):
return x+y
@app.task
def low(n, m):
return n - m
add_task.py
from celery_tasks import add,app
# 添加立即执行任务
t1 = add.delay(10, 20)
t2 = low.delay(100, 50)
print(t1.id)
get_result.py
from celery.result import AsyncResult
from celery_tasks import app
id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
result_obj = AsyncResult(id=id, app=app)
if result_obj.successful():
result = async.get()
print(result)
elif result_obj.failed():
print('任务失败')
elif result_obj.status == 'PENDING':
print('任务等待中被执行')
elif result_obj.status == 'RETRY':
print('任务异常后正在重试')
elif result_obj.status == 'STARTED':
print('任务已经开始被执行')
print(result_obj.status) # 获取状态
print(result_obj.get()) # 获取结果
# result_obj.revoke() # 移除任务
result_obj.forget() # 清除数据
4.高级使用
首先建一个包:celery_task
project
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须叫celery.py
│ └── tasks.py # 所有任务函数
├── get_result.py # 获取结果
└── t_celery_add_task.py # 获取结果
celery.py
# 1)创建app + 任务
# 2)启动celery(app)服务:
# 非windows
# 命令:celery worker -A celery_task -l info
# 命令:celery -A celery_task worker -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet
# 3)添加任务:自动添加任务,所以要启动一个添加任务的服务
# 命令:celery beat -A celery_task -l info
# 4)获取结果
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks']) # 通过include反射加载任务
# 添加定时任务
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'update-banner-list': {
'task': 'celery_task.tasks.add',
'schedule': timedelta(seconds=10), # 每10秒执行一下
# 'schedule':crontab(hour=8,day_of_week=1), # 每周一早八点
'args': (300,100), # 参数
}
}
# 定时任务一定要启动beat,用来提交任务
# celery beat -A celery_task -l info
tasks.py
from .celery import app
import time
@app.task
def add(n, m):
return n + m
@app.task
def low(n, m):
return n - m
get_result.py
from celery_task.celery import app
from celery.result import AsyncResult
id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('任务失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')
t_celery_add_task.py
# 文件写在任意位置都行,只要能导入add
from celery_task.task import add
# 提交任务
ret = add.delay(6,7)
print(ret)
print(ret.id)
# 添加延迟任务,需要UTC时间!!!
from datetime import datetime, timedelta
eta=datetime.utcnow() + timedelta(seconds=10)
result = low.apply_async(args=(200, 50), eta=eta)
print(result.id)
5.django中应用celery
第一步:【项目/项目/settings.py】添加配置
# ######################## Celery配置 ######################## CELERY_BROKER_URL = 'redis://127.0.0.1:6379/1' CELERY_ACCEPT_CONTENT = ['json'] CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2' CELERY_TASK_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Shanghai'
第二步:【项目/项目/celery.py】在项目同名的目录创建celery.py
import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_demo.settings') app = Celery('django_celery_demo') # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. # 以后会去每个已注册app中读取tasks.py文件 app.autodiscover_tasks()
第三步:在settings同级的
__init__.py
文件中添加from .celery import app as celery_app __all__ = ('celery_app',)
第四步:【项目/app名称/tasks.py】app中创建tasks.py文件,编写任务即可
from celery import shared_task @shared_task def add(x, y): print("调用成功。。。",flush=True) print(x,flush=True) print(y,flush=True) print(x+y,flush=True) return x+y
启动worker
celery -A 项目名 worker -l info
启动django程序
6.django中通过使用django-celery应用celery
在django中应用celery,为了方便使用django-celery模块
pip install django-celery
pip install redis==2.10 # 如果运行后执行任务失败报 'str' object has no attribute 'items',记得看自己的redis版本
之后,需要安装django-celery的要求进行编写代码
基本使用
django_celery_demo ├── app01 │ ├── __init__.py │ ├── admin.py │ ├── apps.py │ ├── migrations │ ├── models.py │ ├── tasks.py │ ├── tests.py │ └── views.py ├── db.sqlite3 ├── django_celery_demo │ ├── __init__.py │ ├── celery.py │ ├── settings.py │ ├── urls.py │ └── wsgi.py ├── manage.py ├── red.py └── templates
添加配置
INSTALLED_APPS = [ ... 'djcelery', ] # ######################## Celery配置 ######################## import djcelery djcelery.setup_loader() BROKER_URL = 'redis://127.0.0.1:6379/1' ACCEPT_CONTENT = ['json'] RESULT_BACKEND = 'redis://127.0.0.1:6379/2' TASK_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Shanghai' CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 数据库插入模式
在项目同名的目录创建celery.py
import os from celery import Celery,platforms from django.conf import settings # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_demo.settings') app = Celery('django_celery_demo') # 名随便写的,一般写项目名 # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings') # Load task modules from all registered Django app configs. # 以后会去每个已注册app中读取tasks.py文件 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) # 允许root 用户运行celery platforms.C_FORCE_ROOT = True
在settings同级的
__init__.py
文件中添加from .celery import app as celery_app __all__ = ('celery_app',)
app中创建tasks.py文件,编写任务即可
from celery import shared_task @shared_task def add(x, y): print("调用成功。。。",flush=True) print(x,flush=True) print(y,flush=True) print(x+y,flush=True) return x+y
启动celery
python manage.py celery worker -c 6 -l info