加载中...

celery


Celery

1.环境搭建

pip install celery
如果是windows,则需要多安装一个eventlet
pip install eventlet

2.两种celery任务结构:提倡用包管理,结构更清晰

# 如果 Celery对象:Celery(...) 是放在一个模块下的
# 1)终端切换到该模块所在文件夹位置:scripts
# 2)执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet
# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
# 注:模块名随意


# 如果 Celery对象:Celery(...) 是放在一个包下的
# 1)必须在这个包下建一个celery.py的文件,将Celery(...)产生对象的语句放在该文件中
# 2)执行启动worker的命令:celery worker -A 包名 -l info -P eventlet
# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
# 注:包名随意

基本结构

# 创建py文件:celery_app_task.py
from celery import Celery

# broker = 'redis://:123456@127.0.0.1:6379/1' # broker任务队列 加密码的写法
broker = 'redis://127.0.0.1:6379/1' # broker任务队列 不加密码
backend = 'redis://127.0.0.1:6379/2' # 结构存储,执行完的结果存在这

app = Celery(__name__,broker=broker,backend=backend)
app.conf.timezone='Asia/Shanghai'  # 使用中国时间
app.conf.enable_utc = False		   # 禁用utc时间

@app.task
def x1(x,y):
    return x+y

# 用命令来执行
# 非windows:
# celery worker -A 模块名(不带后缀!!!) -l info  # 如果提示-A未找到用下面的
# celery -A 模块名 worker -l info
# windows(同上,如果报错安装eventlet并在运行时加入):
# celery -A 模块名 worker -l info -P eventlet

包架构封装(多任务结构)

project
    ├── celery_task  	# celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须叫celery.py
    │   └── tasks.py    # 所有任务函数
    ├── add_task.py  	# 添加任务
    └── get_result.py   # 获取结果

3.基本使用

1)创建app + 任务

# 2)启动celery(app)服务:
# 非windows
# 命令:celery worker -A celery_tasks -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_tasks -l info -P eventlet

# 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本

# 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本
celery_tasks.py
from celery import Celery

# broker = 'redis://:123456@127.0.0.1:6379/1' # broker任务队列  这是带密码的写法
broker = 'redis://127.0.0.1:6379/1' # broker任务队列
backend = 'redis://127.0.0.1:6379/2' # 结构存储,执行完的结果存在这

app = Celery(__name__,broker=broker,backend=backend)

@app.task
def add(x,y):
    return x+y

@app.task
def low(n, m):
    return n - m
add_task.py
from celery_tasks import add,app

# 添加立即执行任务
t1 = add.delay(10, 20)
t2 = low.delay(100, 50)
print(t1.id)
get_result.py
from celery.result import AsyncResult
from celery_tasks import app

id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'

if __name__ == '__main__':
    result_obj = AsyncResult(id=id, app=app)
    if result_obj.successful():
        result = async.get()
        print(result)
    elif result_obj.failed():
        print('任务失败')
    elif result_obj.status == 'PENDING':
        print('任务等待中被执行')
    elif result_obj.status == 'RETRY':
        print('任务异常后正在重试')
    elif result_obj.status == 'STARTED':
        print('任务已经开始被执行')
    print(result_obj.status)  # 获取状态
	print(result_obj.get())   # 获取结果
	# result_obj.revoke()    # 移除任务
	result_obj.forget()      # 清除数据

4.高级使用

首先建一个包:celery_task

project
    ├── celery_task  	# celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须叫celery.py
    │   └── tasks.py    # 所有任务函数
    ├── get_result.py  # 获取结果
    └── t_celery_add_task.py   # 获取结果
celery.py
# 1)创建app + 任务

# 2)启动celery(app)服务:
# 非windows
# 命令:celery worker -A celery_task -l info
# 命令:celery -A celery_task worker -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet

# 3)添加任务:自动添加任务,所以要启动一个添加任务的服务
# 命令:celery beat -A celery_task -l info

# 4)获取结果


from celery import Celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])  # 通过include反射加载任务


# 添加定时任务
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'update-banner-list': {
        'task': 'celery_task.tasks.add',
        'schedule': timedelta(seconds=10), # 每10秒执行一下
        # 'schedule':crontab(hour=8,day_of_week=1), # 每周一早八点
        'args': (300,100), # 参数
    }
}

# 定时任务一定要启动beat,用来提交任务
# celery beat -A celery_task -l info
tasks.py
from .celery import app

import time
@app.task
def add(n, m):
    return n + m

@app.task
def low(n, m):
    return n - m
get_result.py
from celery_task.celery import app

from celery.result import AsyncResult

id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
t_celery_add_task.py
# 文件写在任意位置都行,只要能导入add
from celery_task.task import add

# 提交任务
ret = add.delay(6,7)
print(ret)
print(ret.id)

# 添加延迟任务,需要UTC时间!!!
from datetime import datetime, timedelta
eta=datetime.utcnow() + timedelta(seconds=10)
result = low.apply_async(args=(200, 50), eta=eta)
print(result.id)

5.django中应用celery

  • 第一步:【项目/项目/settings.py】添加配置

    # ######################## Celery配置 ########################
    CELERY_BROKER_URL = 'redis://127.0.0.1:6379/1'
    CELERY_ACCEPT_CONTENT = ['json']
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_TIMEZONE = 'Asia/Shanghai'
  • 第二步:【项目/项目/celery.py】在项目同名的目录创建celery.py

    import os
    from celery import Celery
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_demo.settings')
    
    app = Celery('django_celery_demo')
    
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    # 以后会去每个已注册app中读取tasks.py文件
    app.autodiscover_tasks()
  • 第三步:在settings同级的__init__.py文件中添加

    from .celery import app as celery_app
    __all__ = ('celery_app',)
  • 第四步:【项目/app名称/tasks.py】app中创建tasks.py文件,编写任务即可

    from celery import shared_task
    
    
    @shared_task
    def add(x, y):
        print("调用成功。。。",flush=True)
        print(x,flush=True)
        print(y,flush=True)
        print(x+y,flush=True)
        return x+y
  • 启动worker

    celery -A 项目名 worker -l info
  • 启动django程序

6.django中通过使用django-celery应用celery

在django中应用celery,为了方便使用django-celery模块

pip install django-celery
pip install redis==2.10   # 如果运行后执行任务失败报 'str' object has no attribute 'items',记得看自己的redis版本

之后,需要安装django-celery的要求进行编写代码

  • 基本使用

    django_celery_demo
    ├── app01
    │   ├── __init__.py
    │   ├── admin.py
    │   ├── apps.py
    │   ├── migrations
    │   ├── models.py
    │   ├── tasks.py
    │   ├── tests.py
    │   └── views.py
    ├── db.sqlite3
    ├── django_celery_demo
    │   ├── __init__.py
    │   ├── celery.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── manage.py
    ├── red.py
    └── templates
  • 添加配置

    INSTALLED_APPS = [
    	...
        'djcelery',
    ]
    
    # ######################## Celery配置 ########################
    import djcelery
    djcelery.setup_loader()
    BROKER_URL = 'redis://127.0.0.1:6379/1'
    ACCEPT_CONTENT = ['json']
    RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
    TASK_SERIALIZER = 'json'
    CELERY_TIMEZONE = 'Asia/Shanghai'
    CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 数据库插入模式
  • 在项目同名的目录创建celery.py

    import os
    from celery import Celery,platforms
    from django.conf import settings
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_demo.settings')
    
    app = Celery('django_celery_demo')  # 名随便写的,一般写项目名
    
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings')
    
    # Load task modules from all registered Django app configs.
    # 以后会去每个已注册app中读取tasks.py文件
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
    # 允许root 用户运行celery
    platforms.C_FORCE_ROOT = True
  • 在settings同级的__init__.py文件中添加

    from .celery import app as celery_app
    __all__ = ('celery_app',)
  • app中创建tasks.py文件,编写任务即可

    from celery import shared_task
    
    
    @shared_task
    def add(x, y):
        print("调用成功。。。",flush=True)
        print(x,flush=True)
        print(y,flush=True)
        print(x+y,flush=True)
        return x+y
  • 启动celery

    python manage.py celery worker -c 6 -l info

文章作者: 无夜
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 无夜 !
评论
  目录