威尼斯人线上娱乐

【威尼斯人线上娱乐】分布式职责队列学习笔记,执行异步任务和定时职务

6 4月 , 2019  

一、前言

  Celery是叁个依据python开发的分布式职分队列,借使不明白请阅读笔者上一篇博文Celery入门与进阶,而做python
WEB开发最为盛行的框架莫属Django,可是Django的乞求处理进度都以一块的壹筹莫展达成异步任务,若要完成异步职分处理供给通过其余情势(前端的壹般消除方案是ajax操作),而后台Celery正是不利的抉择。假若三个用户在推行某个操作要求等待很久才回来,那大大降低了网址的吞吐量。上面将讲述Django的伸手处理差不离流程(图片源于互联网):

威尼斯人线上娱乐 1

恳请进程大约表达:浏览器发起呼吁–>请求处理–>请求经过中间件–>路由映射–>视图处理工科作逻辑–>响应请求(template或response)

网上有过多celery +
django完结定时职务的科目,然则它们超越50%是遵照djcelery + celery叁的;
抑或是应用django_celery_beat配置较为麻烦的。

持有演示均根据Django二.0

Celery
是3个简单易行、灵活且有限帮忙的,处理大批量消息的分布式系统,并且提供体贴这么2个种类的必需工具。
它是叁个在意于实时处理的职分队列,同时也支撑任务调度。
如上是celery自身官网的介绍

2、配置利用

  celery很不难集成到Django框架中,当然假设想要完结定时义务的话还亟需安装django-celery-beta插件,前边会表达。供给专注的是Celery4.0只协理Django版本>=一.捌的,假诺是小于一.八版本须求利用Celery3.一。

明明简洁而飞快才是大家最终的言情,而celery四已经不要求格外插件即可与django结合完结定时职分了,原生的celery
beat就能够很好的完成定时任务成效。

celery是贰个依据python开发的大致、灵活且保证的分布式职责队列框架,扶助采用职责队列的办法在分布式的机器/进度/线程上推行职分调度。采取独立的劳动者-消费者模型,主要由三有的构成:

celery的行使场景很广阔

配置

  新建立项目taskproj,目录结构(每一种app下多了个tasks文件,用于定义任务):

taskproj
├── app01
│   ├── __init__.py
│   ├── apps.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models.py
│   ├── tasks.py
│   └── views.py
├── manage.py
├── taskproj
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── templates

在类型目录taskproj/taskproj/目录下新建celery.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'taskproj.settings')  # 设置django环境

app = Celery('taskproj')

app.config_from_object('django.conf:settings', namespace='CELERY') #  使用CELERY_ 作为前缀,在settings中写配置

app.autodiscover_tasks()  # 发现任务文件每个app下的task.py

taskproj/taskproj/__init__.py:

from __future__ import absolute_import, unicode_literals

from .celery import app as celery_app

__all__ = ['celery_app']

taskproj/taskproj/settings.py

CELERY_BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

进入项目标taskproj目录运维worker:

celery worker -A taskproj -l debug

本来使用原生方案的同时有几点插件所带来的功利被大家扬弃了:

  • 新闻队列broker:broker实际上正是二个MQ队列服务,能够采纳redis、rabbitmq等作为broker
  • 拍卖职责的买主workers:broker公告worker队列中有职分,worker去队列中取出职分执行,每1个worker就是3个进程
  • 存款和储蓄结果的backend:执行结果存款和储蓄在backend,暗许也会储存在broker使用的MQ队列服务中,也能够单独布置用何种服务做backend
  • 拍卖异步职务
  • 职务调度
  • 处理定时任务
  • 分布式调度

概念与触发职分

  任务定义在每一种tasks文件中,app01/tasks.py:

from __future__ import absolute_import, unicode_literals
from celery import shared_task


@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y

视图中触发职分

from django.http import JsonResponse
from app01 import tasks

# Create your views here.

def index(request,*args,**kwargs):
    res=tasks.add.delay(1,3)
    #任务逻辑
    return JsonResponse({'status':'successful','task_id':res.task_id})

访问

威尼斯人线上娱乐 2

 若想取得职责结果,能够通过task_id使用AsyncResult获取结果,还足以一向通过backend获取:

威尼斯人线上娱乐 3

 

  • 插件提供的定时职分管理将不在可用,当我们只要求任务定期执行而不要求人工资调整度的时候这一点忽略不计。
  • 没辙急忙的管住或追踪定时职务,定时职分的跟踪其实交给日志更合理,可是对职责的改动就未有那么方便了,可是假若不须求平日改变/增减职务的话那点也在可接受范围内。

威尼斯人线上娱乐 4

好处也很多,尤其在动用python创设的行使系统中,无缝过渡,使用一定便宜。

扩展

  除了redis、rabbitmq能做结果存款和储蓄外,还足以行使Django的orm作为结果存款和储蓄,当然须要安装注重插件,这样的功利在于大家得以一向通过django的数据查看到职责情形,同时为能够制定更加多的操作,上面介绍怎样利用orm作为结果存储。

1.安装

pip install django-celery-results

【威尼斯人线上娱乐】分布式职责队列学习笔记,执行异步任务和定时职务。2.配置settings.py,注册app

INSTALLED_APPS = (
    ...,
    'django_celery_results',
)

4.修改backend配置,将redis改为django-db

#CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_BACKEND = 'django-db'  #使用django orm 作为结果存储

伍.改动数据库

python3 manage.py migrate django_celery_results

此刻会师到数据库会多成立:

威尼斯人线上娱乐 5 当然你有时候需求对task表实行操作,以下源码的表结构定义:

class TaskResult(models.Model):
    """Task result/status."""

    task_id = models.CharField(_('task id'), max_length=255, unique=True)
    task_name = models.CharField(_('task name'), null=True, max_length=255)
    task_args = models.TextField(_('task arguments'), null=True)
    task_kwargs = models.TextField(_('task kwargs'), null=True)
    status = models.CharField(_('state'), max_length=50,
                              default=states.PENDING,
                              choices=TASK_STATE_CHOICES
                              )
    content_type = models.CharField(_('content type'), max_length=128)
    content_encoding = models.CharField(_('content encoding'), max_length=64)
    result = models.TextField(null=True, default=None, editable=False)
    date_done = models.DateTimeField(_('done at'), auto_now=True)
    traceback = models.TextField(_('traceback'), blank=True, null=True)
    hidden = models.BooleanField(editable=False, default=False, db_index=True)
    meta = models.TextField(null=True, default=None, editable=False)

    objects = managers.TaskResultManager()

    class Meta:
        """Table information."""

        ordering = ['-date_done']

        verbose_name = _('task result')
        verbose_name_plural = _('task results')

    def as_dict(self):
        return {
            'task_id': self.task_id,
            'task_name': self.task_name,
            'task_args': self.task_args,
            'task_kwargs': self.task_kwargs,
            'status': self.status,
            'result': self.result,
            'date_done': self.date_done,
            'traceback': self.traceback,
            'meta': self.meta,
        }

    def __str__(self):
        return '<Task: {0.task_id} ({0.status})>'.format(self)

 

Celery定时任务陈设

在拓展示公布置前先来看看项目布局:

.├── linux_news│   ├── celery.py│   ├── __init__.py│   ├── settings.py│   ├── urls.py│   └── wsgi.py├── manage.py├── news│   ├── admin.py│   ├── apps.py│   ├── __init__.py│   ├── migrations│   ├── models│   ├── tasks.py│   ├── tests.py│   └── views└── start-celery.sh

里头news是大家的app,用于从壹些rss订阅源获取情报音信,linux_news则是大家的project。咱们需求关心的根本是celery.py,settings.py,tasks.py和start-celery.sh。

第二是celery.py,想让celery执行职务就务须实例化几个celery
app,并把settings.py里的铺排传入app:

import osfrom celery import Celery# set the default Django settings module for the 'celery' program.os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'linux_news.settings')app = Celery('linux_news')# 'django.conf:settings'表示django,conf.settings也就是django项目的配置,celery会根据前面设置的环境变量自动查找并导入# - namespace表示在settings.py中celery配置项的名字的统一前缀,这里是'CELERY_',配置项的名字也需要大写app.config_from_object('django.conf:settings', namespace='CELERY')# Load task modules from all registered Django app configs.app.autodiscover_tasks()

配置正是那般简单,为了能在django里使用这几个app,我们需求在__init__.py中程导弹入它:

from .celery import app as celery_app

接下来我们来看tasks.py,它应当置身你的app目录中,前边大家布置了自行发现,所以celery会自动找到这个tasks,大家的tasks将写在那壹模块中,代码涉及了一部分orm的施用,为了顺应大旨作者做了些不难:

from linux_news.celery import celery_app as appfrom .models import *import timeimport feedparserimport pytzimport html@app.task(ignore_result=True)def fetch_news(origin_name):    """    fetch all news from origin_name    """    origin = get_feeds_origin(origin_name)    feeds = feedparser.parse(origin.feed_link)    for item in feeds['entries']:        entry = NewsEntry()        entry.title = item.title        entry.origin = origin        entry.author = item.author        entry.link = item.link        # add timezone        entry.publish_time = item.time.replace(tzinfo=pytz.utc)        entry.summary = html.escape(item.summary)        entry.save()@app.task(ignore_result=True)def fetch_all_news():    """    这是我们的定时任务    fetch all origins' news to db    """    origins = NewsOrigin.objects.all()    for origin in origins:        fetch_news.delay(origin.origin_name)

tasks里是某个耗费时间操作,比如网络IO或许数据库读写,因为大家不关切职责的重返值,所以使用@app.task(ignore_result=True)将其屏蔽了。

职责布署完结后我们就要安排celery了,大家采用redis作为天职队列,小编强烈建议在生产条件中接纳rabbitmq也许redis作为天职队列或结果缓存后端,而不应该运用关系型数据库:

# redisREDIS_PORT = 6379REDIS_DB = 0# 从环境变量中取得redis服务器地址REDIS_HOST = os.environ.get('REDIS_ADDR', 'redis')# celery settings# 这两项必须设置,否则不能正常启动celery beatCELERY_ENABLE_UTC = TrueCELERY_TIMEZONE = TIME_ZONE# 任务队列配置CELERY_BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'CELERY_ACCEPT_CONTENT = ['application/json', ]CELERY_RESULT_BACKEND = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'CELERY_TASK_SERIALIZER = 'json'

接下来是大家的定时任务设置:

from celery.schedules import crontabCELERY_BEAT_SCHEDULE={        'fetch_news_every-1-hour': {            'task': 'news.tasks.fetch_all_news',            'schedule': crontab(minute=0, hour='*/1'),        }}

定时任务安顿对象是二个dict,由任务名和陈设项整合,首要布局想如下:

  • task:职责函数所在的模块,模块路径得写全,不然找不到将不恐怕运营该职务
  • schedule:定时事政治策,1般接纳celery.schedules.crontab,上边例子为每时辰的0分执行1回义务,具体写法与linux的crontab类似能够参照文书档案表达
  • args:是个元组,给出任务急需的参数,假若不供给参数也能够不写进配置,就好像例子中的1样
  • 任何配置项较少用,能够参照文书档案
    从那之后,配置celery beat的1对就归西了。

异步任务

Celery

叁、Django中使用定时职务

  假诺想要在django中使用定时职责作用雷同是靠beat完结职责发送功能,当在Django中使用定时任务时,必要安装django-celery-beat插件。以下将介绍使用进度。

启动celery beat

布置完结后只须要运转celery了。

启航以前安顿一下环境。不要用root运维celery!不要用root运维celery!不要用root运转celery!主要的事体说一回。

start-celery.sh:

export REDIS_ADDR=127.0.0.1celery -A linux_news worker -l info -B -f /path/to/log

-A 表示app所在的目录,-B表示运营celery beat运转定时职分。
celery平常运转后就能够透过日记来查阅职分是还是不是正规运作了:

[2018-12-21 13:00:00,022: INFO/MainProcess] Received task: news.tasks.fetch_all_news[e4566ede-2cfa-4c19-b2f3-0c7d6c38690d]  [2018-12-21 13:00:00,046: INFO/MainProcess] Received task: news.tasks.fetch_news[583e96dc-f508-49fa-a24a-331e0c07a86b]  [2018-12-21 13:00:00,051: INFO/ForkPoolWorker-2] Task news.tasks.fetch_all_news[e4566ede-2cfa-4c19-b2f3-0c7d6c38690d] succeeded in 0.02503809699555859s: None[2018-12-21 13:00:00,052: INFO/MainProcess] Received task: news.tasks.fetch_news[c61a3e55-dd3c-4d49-8d6d-ca9b1757db25]  [2018-12-21 13:00:00,449: INFO/ForkPoolWorker-5] Task news.tasks.fetch_news[c61a3e55-dd3c-4d49-8d6d-ca9b1757db25] succeeded in 0.39487219898728654s: None[2018-12-21 13:00:00,606: INFO/ForkPoolWorker-3] Task news.tasks.fetch_news[583e96dc-f508-49fa-a24a-331e0c07a86b] succeeded in 0.5523456179944333s: None

如上正是celery4运转定时职务的始末,如有错误和疏漏,欢迎指正。

自己的异步使用处境为品种上线:前端web上有个上线按钮,点击按钮后发请求给后端,后端执行上线进程要四分钟,后端在接到到请求后把任务放入队列异步执行,同时立时再次来到给前端三个职务执行中的结果。若果未有异步执行会什么啊?同步的动静正是实施进程中前端一贯在等后端重返结果,页面转呀转的就转超时了。

安装

设置配置

一.beat插件装置

pip3 install django-celery-beat

2.注册APP

INSTALLED_APPS = [
    ....   
    'django_celery_beat',
]

3.数据库变更

python3 manage.py migrate django_celery_beat

四.独家运维woker和beta

celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler  #启动beta 调度器使用数据库

celery worker -A taskproj -l info #启动woker

5.配置admin

urls.py

# urls.py
from django.conf.urls import url
from django.contrib import admin

urlpatterns = [
    url(r'^admin/', admin.site.urls),
]

六.开立用户

python3 manage.py createsuperuser 

7.登录admin举办管理(地址

威尼斯人线上娱乐 6

 

 使用示例:

威尼斯人线上娱乐 7

 

 

 

 

威尼斯人线上娱乐 8

 

 

 查看结果:

威尼斯人线上娱乐 9

 

异步职务安插

安装Celery

推荐介绍应用pip安装,倘若你利用的是虚拟环境,请在虚拟环境里安装

$ pip install celery

三遍开发

  django-celery-beat插件本质上是对数据库表变化检查,一旦有数量库表改变,调度器重新读取职责展开调度,所以只要想自身定制的天职页面,只需求操作beat插件的四张表就能够了。当然你还足以友善定义调度器,django-celery-beat插件已经松手了model,只要求开始展览导入便可开始展览orm操作,以下我用django
reset api进行出现说法:

settings.py

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app01.apps.App01Config',
    'django_celery_results',
    'django_celery_beat',
    'rest_framework',
]

urls.py

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^index$', views.index),
    url(r'^res$', views.get_res),
    url(r'^tasks$', views.TaskView.as_view({'get':'list'})),
]

views.py

from django_celery_beat.models import PeriodicTask  #倒入插件model
from rest_framework import serializers
from rest_framework import pagination
from rest_framework.viewsets import ModelViewSet
class Userserializer(serializers.ModelSerializer):
    class Meta:
        model = PeriodicTask
        fields = '__all__'

class Mypagination(pagination.PageNumberPagination):
    """自定义分页"""
    page_size=2
    page_query_param = 'p'
    page_size_query_param='size'
    max_page_size=4

class TaskView(ModelViewSet):
    queryset = PeriodicTask.objects.all()
    serializer_class = Userserializer
    permission_classes = []
    pagination_class = Mypagination

访问

威尼斯人线上娱乐 10

 

一.安装rabbitmq,那里大家使用rabbitmq作为broker,安装到位后默许运营了,也不须要任何任何配置

设置音讯中间件

Celery 协理 RabbitMQ、Redis 甚至其余数据库系统作为其音信代理中间件

您愿意用什么样中间件和后端就请自行设置,壹般都应用redis只怕RabbitMQ

# apt-get install rabbitmq-server

安装Redis

在Ubuntu系统下行使apt-get命令就足以

$ sudo apt-get install redis-server

设若您采用redis作为中间件,还必要安装redis支持包,同样利用pip安装即可

$ pip install redis

能出现以下结果即为成功

redis 127.0.0.1:6379>

其余的redis知识那里不左介绍,要是有趣味,能够活动明白

要是您选用RabbitMQ,也请安装RabbitMQ

2.安装celery

安装RabbitMQ

$ sudo apt-get install rabbitmq-server
# pip3 install celery

使用Celery

三.celery用在django项目中,django项目目录结构(简化)如下

简容易单直接运用

能够在急需的地点直接引进Celery,直接运用即可。最简便易行的方法只要求布置一个任务和中间人即可

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/3')

@app.task
def add(x, y):
    return x + y

本人那边运用了redis作为中间件,那是足以按自个儿的习惯替换的

鉴于暗中同意的布局不是最契合大家的品类实际上须要,1般的话我们都必要按大家友好的渴求铺排部分,
可是由于须求将品种解耦,也好维护,大家最棒利用单独的3个文件编写制定配置。

website/
|-- deploy
|  |-- admin.py
|  |-- apps.py
|  |-- __init__.py
|  |-- models.py
|  |-- tasks.py
|  |-- tests.py
|  |-- urls.py
|  `-- views.py
|-- manage.py
|-- README
`-- website
  |-- celery.py
  |-- __init__.py
  |-- settings.py
  |-- urls.py
  `-- wsgi.py

单身安排配置文件

比上面的有些复杂一点,大家需求创制三个公文,3个为config.py的celery配置文件,在中间填写适合大家项目标配置,在创造贰个tasks.py文件来编排大家的职分。文件的名字能够按您的喜好和谐取名。

config.py内容为:

# coding=utf-8
# 配置文件同一配置celery
BROKER_URL = 'redis://localhost:6379/3'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/4'

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

# 把“脏活”路由到专用的队列:
CELERY_ROUTES = {
    'tasks.add': 'low-priority',
}

# 限制任务的速率,这样每分钟只允许处理 10 个该类型的任务:
CELERY_ANNOTATIONS = {
    'tasks.add': {'rate_limit': '10/m'}
}

陈设好之后能够用以下命令检查陈设文件是不是科学(config为布局文件名)

$ python -m config

tasks.py内容为:

# coding=utf-8
from celery import Celery

app = Celery()
# 参数为配置文件的文件名
app.config_from_object('config')

@app.task
def add(x, y):
    return x + y

再有1种同等设置配置的点子,不是很推荐

app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

在app使用前先须要用以上办法批量更新配备文件。

4.创建 website/celery.py 主文件

在选择上利用

工程目录结构为

proj/
    __init__.py
    # 存放配置和启动celery代码
    celery.py
    # 存放任务
    tasks.py

celery.py为:

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
             broker='redis://localhost:6379/3',
             backend='redis://localhost:6379/4',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

tasks.py为:

from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

启航celery只要求在proj同级目录下:

$ celery -A proj worker -l info
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'website.settings')

app = Celery('website')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#  should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

# 允许root 用户运行celery
platforms.C_FORCE_ROOT = True

@app.task(bind=True)
def debug_task(self):
  print('Request: {0!r}'.format(self.request))

在django中使用celery

大家的django的类型的目录结构相似如下

proj/
    manage.py
    myapp/
    proj/
        __init__py
        settings.py
        urls.py
        wsgi.py

想要在django项目中使用celery,大家率先供给在django中配置celery

我们必要在与工程名同名的子文件夹中添加celery.py文件
在本例中也便是proj/proj/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
# 第二个参数为工程名.settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

# 括号里的参数为工程名
app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
# 配置文件需要写在setting.py中,并且配置项需要使用`CELERY_`作为前缀
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
# 能够自动加载所有在django中注册的app,也就是setting.py中的INSTALLED_APPS
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

下一场大家要求在同级目录下的**init.py文本中布局如下内容 proj/proj/init.py**

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

然后大家就足以把必要的职分放到须要的app下的tasks.py中,未来项目目录结构如下

proj/
    manage.py
    myapp1/
        __init__.py
        tasks.py
        views.py
        model.py
        tests.py
    myapp2/
        __init__.py
        tasks.py
        views.py
        model.py
        tests.py
    proj/
        __init__py
        settings.py
        urls.py
        wsgi.py

恐怕的1个tasks.py文件内容如下:
myapp1/tasks.py为:

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time


@shared_task
def add(x, y):
    # 为了测试是否是异步,特意休眠5s,观察是否会卡主主进程
    time.sleep(5)
    print(x+y)
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)

@shared_task修饰器能够让您创立task不需求app实体

在需求的地点调用相关职务即可,例如在myapp1/views.py中调用

from django.shortcuts import render
from .tasks import add


def index(request):
    # 测试celery任务
    add.delay(4,5)
    return render(request,'index.html')

接下来就足以运营项目,celery要求独自运维,所以须求开多个极点,分别

启航web应用服务器

$ python manage.py runserver

启动celery

$ celery -A proj worker -l info

下一场访问浏览器就足以在起步celery的终点中见到输出

威尼斯人线上娱乐 11

测试结果

5.在 website/__init__.py
文件中加进如下内容,确定保证django运行的时候那么些app能够被加载到

扩展

  • 只要您的连串需求在admin中管理调度,请使用django-celery-beat
  1. 使用pip安装django-celery-beat

$ pip install django-celery-beat

决不在利用django-celery,这些体系现已终止更新好好多年。。。。

  1. 在settings.py中丰盛这些app

INSTALLED_APPS = (
    ...,
    'django_celery_beat',
)
  1. 共同一下数据库

$ python manage.py migrate
  1. 设置celery
    beat
    劳动应用django_celery_beat.schedulers:DatabaseScheduler
    scheduler

$ celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

下一场在就可以admin界面看到了。

  • 纵然您想采纳Django-OBMWX五M恐怕Django
    Cache作为后端,必要设置django-celery-results增添(笔者不提议)
  1. 使用pip安装django-celery-results

$ pip install django-celery-results

不用在应用django-celery,这一个类型早就终止更新好好多年。。。。

  1. 在settings.py中添加那几个app

INSTALLED_APPS = (
    ...,
    'django_celery_results',
)
  1. 共同一下数据库

$ python manage.py migrate django_celery_results
  1. 配置后端,在settings.py中布署

# 使用数据库作为结果后端
CELERY_RESULT_BACKEND = 'django-db'

# 使用缓存作为结果后端
CELERY_RESULT_BACKEND = 'django-cache'

骨干选用大约正是上述那些,别的实际布置和利用还需协调查商讨读法定文书档案

注:

  • 上述条件在ubuntu16.04 lts django1.9中搭建测试成功
  • 上述文字皆为私有看法,如有错误或提议请霎时联系本人
from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

陆.各使用创制tasks.py文件,那里为 deploy/tasks.py

from __future__ import absolute_import
from celery import shared_task

@shared_task
def add(x, y):
  return x + y

只顾tasks.py必须建在各app的根目录下,且只好叫tasks.py,无法随便命名

七.views.py中引用使用那么些tasks异步处理

from deploy.tasks import add

def post(request):
  result = add.delay(2, 3)


result.ready()
result.get(timeout=1)
result.traceback

8.启动celery

# celery -A website worker -l info

九.如此在调用post那些方法时,里边的add就能够异步处理了

定时职务

定时职分的运用景况就很广阔了,比如本身要求定时发送报告给业主~

定时职分安顿

  1. website/celery.py 文件添加如下配置以协理定时职务crontab
from celery.schedules import crontab

app.conf.update(
  CELERYBEAT_SCHEDULE = {
    'sum-task': {
      'task': 'deploy.tasks.add',
      'schedule': timedelta(seconds=20),
      'args': (5, 6)
    }
    'send-report': {
      'task': 'deploy.tasks.report',
      'schedule': crontab(hour=4, minute=30, day_of_week=1),
    }
  }
)

概念了多个task:

  • 名叫’sum-task’的task,每20秒执行一次add函数,并传了八个参数伍和六
  • 名称为’send-report’的task,每礼拜5上午四:30实施report函数

timedelta是datetime中的2个指标,须求 from datetime import timedelta
引入,有如下多少个参数

  • days
  • seconds
  • microseconds
  • milliseconds
  • minutes
  • hours

crontab的参数有:

month_of_year
day_of_month
day_of_week
hour
minute

  1. deploy/tasks.py 文件添加report方法:
@shared_task
def report():
  return 5

三.起初celery
beat,celery运维了三个beat过程向来在频频的论断是不是有职分急需执行

# celery -A website beat -l info

Tips

一.假使您而且利用了异步职务和安顿义务,有一种更简便易行的开发银行格局
celery -A website worker -b -l info ,可同时开动worker和beat

二.假设采用的不是rabbitmq做队列那么必要在主配置文件中 website/celery.py
配置broker和backend,如下:

# redis做MQ配置
app = Celery('website', backend='redis', broker='redis://localhost')
# rabbitmq做MQ配置
app = Celery('website', backend='amqp', broker='amqp://admin:admin@localhost')

三.celery不可能用root用户运维的话要求在主配置文件中添加
platforms.C_FORCE_ROOT = True

肆.celery在长日子运作后可能出现内部存款和储蓄器泄漏,供给丰盛配置
CELERYD_MAX_TASKS_PER_CHILD = 10威尼斯人线上娱乐
,表示每种worker执行了有个别个职分就死掉

以上正是本文的全体内容,希望对大家的求学抱有帮衬,也盼望我们多多帮助脚本之家。

你恐怕感兴趣的稿子:

  • 异步职务队列Celery在Django中的运用办法
  • Django使用Celery异步职分队列的使用
  • Django中使用celery实现异步职务的言传身教代码


相关文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图