目录
  • 一、背景
  • 二、Celery动态添加定时任务的官方文档
  • 三、celery简单实用
    • 3.1 基础环境配置
    • 3.2 测试使用Celery应用
  • 四、配置backend存储任务执行结果 
    • 四、优化Celery目录结构
      • 五、开始使用django-celery-beat调度器
        • 六、具体操作演练
          • 6.1 创建基于间隔时间的周期性任务
          • 6.2 创建一个不带参数的周期性间隔任务
          • 6.3 周期性任务的查询、删除操作
        • 总结

          一、背景

          实际工作中会有一些耗时的异步任务需要使用定时调度,比如发送邮件,拉取数据,执行定时脚本

          通过celery 实现调度主要思想是 通过引入中间人redis,启动 worker 进行任务执行 ,celery-beat进行定时任务数据存储

          二、Celery动态添加定时任务的官方文档

          celery文档:https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

          celery 自定义调度类说明: 

          自定义调度器类可以在命令行中指定(–scheduler参数)

          django-celery-beat文档 : https://pypi.org/project/django-celery-beat/

          关于django-celery-beat 插件的说明: 

          此扩展使您能够将定期任务计划存储在数据库中,可以从 Django 管理界面管理周期性任务,您可以在其中创建、编辑和删除周期性任务以及它们应该运行的频率

          三、celery简单实用

          3.1 基础环境配置

          1. 安装最新版本的Django

          pip3 install django #当前我安装的版本是 3.0.6

          2. 创建项目

          django-admin startproject typeidea
          django-admin startapp blog

          3.安装 celery

          pip3 install django-celery
          pip3 install -U Celery 
          pip3 install "celery[librabbitmq,redis,auth,msgpack]" 
          pip3 install django-celery-beat # 用于动态添加定时任务
          pip3 install django-celery-results
          pip3 install redis

          3.2 测试使用Celery应用

          1. 创建blog目录、新建task.py

          首先在Django项目中创建一个blog文件夹,并且在blog文件夹下创建tasks.py模块, 如下:

          Python Celery动态添加定时任务生产实践指南

           tasks.py代码如下: 

          #!/usr/bin/env python
          # -*- coding: UTF-8 -*-
           
          """
          #File: tasks.py
          #Time: 2022/3/30 2:26 下午
          #Author: julius
          """
          from celery import Celery
           
          # 使用redis做为broker
          app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0')
           
          # 创建任务函数
          @app.task
          def my_task():
              print('任务正在执行...')

          Celery第一个参数是给其设定一个名字, 第二参数我们设定一个中间人broker, 在这里我们使用Redis作为中间人。my_task函数是我们编写的一个任务函数, 通过加上装饰器app.task, 将其注册到broker的队列中。

          2. 启动redis、创建worker

          现在我们在创建一个worker, 等待处理队列中的任务。

          进入项目的根目录,执行命令: celery -A celery_tasks.tasks worker -l info

          Python Celery动态添加定时任务生产实践指南

           3. 调用任务

          下面来测试一下功能,创建一个任务,加入任务队列中,提供worker执行。

          进入python终端, 执行如下代码:

          $ python manage.py shell
          >>> from blog.tasks import my_task
          >>> my_task.delay()
          <AsyncResult: 83484dfe-f729-417b-8e51-6c7ae32a1377>

          调用一个任务函数,将会返回一个AsyncResult对象,这个对象可以用来检查任务的状态或者获得任务的返回值。

          4. 查看结果

          在worker的终端查看任务执行情况,可以看到已经收到83484dfe-f729-417b-8e51-6c7ae32a1377 任务,并打印了任务执行信息

          Python Celery动态添加定时任务生产实践指南

          5. 存储并查看任务执行状态

          把任务执行结果赋值给ret,然后调用result() 会产生 DisabledBackend 报错,可见没有配置后端存储的时候并不能保存任务执行的状态信息,下一节我们会讲到如何配置backend保存任务执行结果

          $ python manage.py shell
          >>> from blog.tasks import my_task
          >>> ret=my_task.delay()
          >>> ret.result()

          Python Celery动态添加定时任务生产实践指南

          四、配置backend存储任务执行结果 

          如果我们想跟踪任务的状态,Celery需要将结果保存到某个地方。有几种保存的方案可选:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。

          1. 添加backend参数

          在本例中我们使用Redis作为存储结果的方案,通过Celery的backend参数来设定任务结果存储地址。我们将tasks模块修改如下:

          from celery import Celery
           
          # 使用redis作为broker以及backend
          app = Celery('celery_tasks.tasks',
                       broker='redis://127.0.0.1:6379/8',
                       backend='redis://127.0.0.1:6379/9')
           
          # 创建任务函数
          @app.task
          def my_task(a, b):
              print("任务函数正在执行....")
              return a + b

          给Celery增加了backend参数,指定redis作为结果存储,并将任务函数修改为两个参数,并且有返回值。

          2. 调用任务/查看任务执行结果

          下面再来执行调用一下这个任务看看。

          $ python manage.py shell
          >>> from blog.tasks import my_task
          >>> res=my_task.delay(10,40)
          >>> res.result
          50
          >>> res.failed()
          False

          再来看看worker的执行情况,如下:

          Python Celery动态添加定时任务生产实践指南

          可以看到celery任务已经执行成功了。

          但是这只是一个开始,下一步要看看如何添加定时的任务。

          四、优化Celery目录结构

          上面直接将Celery的应用创建、配置、tasks任务全部写在了一个文件,这样在后面项目越来越大,也是不方便的。下面来拆分一下,并且添加一些常用的参数。

          基本结构如下

          Python Celery动态添加定时任务生产实践指南

          $ vim typeidea/celery.py (Celery应用文件)

          #!/usr/bin/env python
          # -*- coding: UTF-8 -*-
           
          """
          #File: celery.py
          #Time: 2022/3/30 12:25 下午
          #Author: julius
          """
          import os
          from celery import Celery
          from blog import celeryconfig
          project_name='typeidea'
          # set the default django setting module for the 'celery' program
          os.environ.setdefault('DJANGO_SETTINGS_MODULE','typeidea.settings')
          app = Celery(project_name)
           
          app.config_from_object('django.conf:settings')
           
          app.autodiscover_tasks()

          vim blog/celeryconfig.py (配置Celery的参数文件)

          #!/usr/bin/env python
          # -*- coding: UTF-8 -*-
           
          """
          #File: celeryconfig.py
          #Time: 2022/3/30 2:54 下午
          #Author: julius
          """
          
          # 设置结果存储
          from typeidea import settings
          import os
           
          os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
          CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
          # 设置代理人broker
          BROKER_URL = 'redis://127.0.0.1:6379/1'
          # celery 的启动工作数量设置
          CELERY_WORKER_CONCURRENCY = 20
          # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
          CELERYD_PREFETCH_MULTIPLIER = 20
          # 非常重要,有些情况下可以防止死锁
          CELERYD_FORCE_EXECV = True
          # celery 的 worker 执行多少个任务后进行重启操作
          CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
          # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
          CELERY_DISABLE_RATE_LIMITS = True
           
          CELERY_ENABLE_UTC = False
          CELERY_TIMEZONE = settings.TIME_ZONE
          DJANGO_CELERY_BEAT_TZ_AWARE = False
          CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

          vim blog/tasks.py (tasks 任务文件)

          import time
          from blog.celery import app
           
          # 创建任务函数
          @app.task
          def my_task(a, b, c):
              print('任务正在执行...')
              print('任务1函数休眠10s')
              time.sleep(10)
              return a + b + c

          五、开始使用django-celery-beat调度器

          使用 django-celery-beat 动态添加定时任务  celery 4.x 版本在 django 框架中是使用 django-celery-beat 进行动态添加定时任务的。前面虽然已经安装了这个库,但是还要再说明一下。

          官网的配置说明
          https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

          1. 安装 django-celery-beat

          pip3 install django-celery-beat

          2.在项目的 settings 文件配置 django-celery-beat 

          INSTALLED_APPS = [
              'blog',
              'django_celery_beat',
              ...
          ]
           
          # Django设置时区
          LANGUAGE_CODE = 'zh-hans'  # 使用中国语言
          TIME_ZONE = 'Asia/Shanghai'  # 设置Django使用中国上海时间
          # 如果USE_TZ设置为True时,Django会使用系统默认设置的时区,此时的TIME_ZONE不管有没有设置都不起作用
          # 如果USE_TZ 设置为False,TIME_ZONE = 'Asia/Shanghai', 则使用上海的UTC时间。
          USE_TZ = False

          3. 创建 django-celery-beat 相关表

          执行Django数据库迁移: python manage.py migrate

          Python Celery动态添加定时任务生产实践指南

          4. 配置Celery使用 django-celery-beat

          配置 celery.py

          import os
           
          from celery import Celery
           
          from blog import celeryconfig
           
          # 为celery 设置环境变量
          os.environ.setdefault("DJANGO_SETTINGS_MODULE","typeidea.settings")
          # 创建celery app
          app = Celery('blog')
          # 从单独的配置模块中加载配置
          app.config_from_object(celeryconfig)
           
          # 设置app自动加载任务
          app.autodiscover_tasks([
              'blog',
          ])

          配置 celeryconfig.py

          # 设置结果存储
          from typeidea import settings
          import os
           
          os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
          CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
          # 设置代理人broker
          BROKER_URL = 'redis://127.0.0.1:6379/1'
          # celery 的启动工作数量设置
          CELERY_WORKER_CONCURRENCY = 20
          # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
          CELERYD_PREFETCH_MULTIPLIER = 20
          # 非常重要,有些情况下可以防止死锁
          CELERYD_FORCE_EXECV = True
          # celery 的 worker 执行多少个任务后进行重启操作
          CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
          # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
          CELERY_DISABLE_RATE_LIMITS = True
           
          CELERY_ENABLE_UTC = False
          CELERY_TIMEZONE = settings.TIME_ZONE
          DJANGO_CELERY_BEAT_TZ_AWARE = False
          CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
           

          编写任务 tasks.py

          import time
          from celery import Celery
          from blog.celery import app
           
          # 使用redis做为broker
          # app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0',backend='redis://127.0.0.1:6379/1')
           
          # 创建任务函数
          @app.task
          def my_task(a, b, c):
              print('任务正在执行...')
              print('任务1函数休眠10s')
              time.sleep(10)
              return a + b + c
           
          @app.task
          def my_task2():
              print("任务2函数正在执行....")
              print('任务2函数休眠10s')
              time.sleep(10)

          5. 启动定时任务work

          启动定时任务首先需要有一个work执行异步任务,然后再启动一个定时器触发任务。

          启动任务 work

          $ celery -A blog worker -l info 

          Python Celery动态添加定时任务生产实践指南

          启动定时器触发 beat

          celery -A blog beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

          Python Celery动态添加定时任务生产实践指南

          六、具体操作演练

          6.1 创建基于间隔时间的周期性任务

          1. 初始化周期间隔对象interval 对象

          >>> from django_celery_beat.models import PeriodicTask, IntervalSchedule
          >>> schedule, created = IntervalSchedule.objects.get_or_create( 
          ...       every=10, 
          ...       period=IntervalSchedule.SECONDS, 
          ...  )
          >>> IntervalSchedule.objects.all()
          <QuerySet [<IntervalSchedule: every 10 seconds>]>

          2.创建一个无参数的周期性间隔任务

          >>>PeriodicTask.objects.create(interval=schedule,name='my_task2',task='blog.tasks.my_task2',)
          <PeriodicTask: my_task2: every 10 seconds>

          beat 调度服务日志显示如下:

          Python Celery动态添加定时任务生产实践指南

           worker 服务日志显示如下:

          Python Celery动态添加定时任务生产实践指南

          3.创建一个带参数的周期性间隔任务

          >>> PeriodicTask.objects.create(interval=schedule,name='my_task',task='blog.tasks.my_task',args=json.dumps([10,20,30]))
          <PeriodicTask: my_task: every 10 seconds>

          beat 调度服务日志结果:

          Python Celery动态添加定时任务生产实践指南

           worker 服务日志结果:

          Python Celery动态添加定时任务生产实践指南

          4.如何高并发执行任务

          需要并行执行任务的时候,就需要设置多个worker来执行任务。 

          6.2 创建一个不带参数的周期性间隔任务

          1.初始化 crontab 的调度对象

          >>> import pytz
          >>> schedule, _ = CrontabSchedule.objects.get_or_create(
          ... minute='*',
          ... hour='*',
          ... day_of_week='*',
          ... day_of_month='*',
          ... timezone=pytz.timezone('Asia/Shanghai')
          ... )

          2. 创建不带参数的定时任务

          PeriodicTask.objects.create(crontab=schedule,name='my_task2_crontab',task='blog.tasks.my_task2',)

          beat 调度服务执行结果 

          Python Celery动态添加定时任务生产实践指南

           worker 执行服务结果

          Python Celery动态添加定时任务生产实践指南

          6.3 周期性任务的查询、删除操作

          1. 周期性任务的查询

          >>> PeriodicTask.objects.all()
          <ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) Asia/Shanghai>, <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>]>
          >>> PeriodicTask.objects.get(name='my_task2_crontab')
          <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
          >>> for task in PeriodicTask.objects.all():
          ...     print(task.id)
          ... 
          1
          13
          >>> PeriodicTask.objects.get(id=13)
          <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
          >>> PeriodicTask.objects.get(name='my_task2_crontab')
          <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>

           控制台实际操作记录

          Python Celery动态添加定时任务生产实践指南

          2.周期性任务的暂停/启动

          2.1 设置my_taks2_crontab 暂停任务

          >>> my_task2_crontab = PeriodicTask.objects.get(id=13)
          >>> my_task2_crontab.enabled
          True
          >>> my_task2_crontab.enabled=False
          >>> my_task2_crontab.save()

          查看worker输出:

          Python Celery动态添加定时任务生产实践指南

           可以看到worker从19:31以后已经没有输出了,说明已经成功吧my_task2_crontab 任务暂停

          2.2 设置my_task2_crontab 开启任务

          把任务的 enabled 为 True 即可:

          >>> my_task2_crontab.enabled
          False
          >>> my_task2_crontab.enabled=True
          >>> my_task2_crontab.save()

          查看worker输出:

          Python Celery动态添加定时任务生产实践指南

           可以看到worker从19:36开始有输出,说明已把my_task2_crontab 任务重新启动

          3. 周期性任务的删除

          获取到指定的任务后调用delete(),再次查询指定任务会发现已经不存在了

          PeriodicTask.objects.get(name='my_task2_crontab').delete()
          >>> PeriodicTask.objects.get(name='my_task2_crontab')
          Traceback (most recent call last):
            File "<console>", line 1, in <module>
            File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method
              return getattr(self.get_queryset(), name)(*args, **kwargs)
            File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get
              raise self.model.DoesNotExist(
          django_celery_beat.models.PeriodicTask.DoesNotExist: PeriodicTask matching query does not exist.

          总结

          声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。