celery介绍与使用
创始人
2024-03-22 03:51:28
0

一.celery介绍

celery作用

1.celery可以实现异步任务来提高项目的并发量,完成延迟任务、定时任务

2.celery是一个简单、灵活、可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具

celery架构

1.消息中间件:broker 提交的任务(函数)都放到这里,celery本身不提供中间件,需要借助于第三方:redis,rabbitmq

2.任务执行单元:worker,真正执行任务的地方,一个个进程,执行函数

3.结果存储:backend,函数return的结果存储在这里,celery本身不提供结果存储,借助于第三方:redis,数据库,rabbitmq

celery特点

celery是独立的服务

1.可以不依赖任何服务器,通过自身命令,启动

2.celery服务为其他项目服务提供异步解决任务需求的

注意:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

二.celery快速使用

1.安装模块

        官方介绍:Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform(不支持windows,请不要打开与该平台相关的任何问题)

pip install celerypip install eventlet  # Windows系统需安装

2.使用步骤

新建包:celery_task
    -在包先新建一个 celery.py
    -在里面写app的初始化
    -在包里新建app_task.py 编写相关任务 
    -其它程序,提交任务
    -启动worker ---》它可以先启动,在提交任务之前-->包所在的目录下
        celery -A celery_task worker -l info -P eventlet
    -查看任务执行的结果了

'''celery_task/celery.py'''from celery import Celerybackend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0'
# 一定不要忘了include
app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.home_task','celery_task.user_task'])
'''celery_task/app_task.py'''
from .celery import app
@app.task
def add(a, b):time.sleep(3)print('计算结果是:%s' % (a + b))return a + b
'''add_task.py'''
from celery_task.user_task import send_sms
# 提交了一个发送短信异步任务
res=send_sms.delay('132xxxxxxxx','9999')
print(res)  # 672237ce-c941-415e-9145-f31f90b94627# 任务执行,要启动worker# 查看任务执行的结果

3.启动celery工作服务器

celery -A tasks worker -l info -P eventlet 
或
celery -A tasks worker --loglevel=INFO -P eventlet

4.backend中查看任务执行结果

from tasks import appfrom celery.result import AsyncResulttask_id = '672237ce-c941-415e-9145-f31f90b94627'if __name__ == '__main__':res = AsyncResult(id=task_id, app=app)if res.successful():result = res.get()print(result)# 等同上面代码# if res.state == 'SUCCESS':#     result = res.get()#     print(result)elif res.failed():print('任务失败')# elif res.state == 'FAILURE':#     print('任务失败')elif res.status == 'PENDING':print('任务等待中被执行')elif res.status == 'RETRY':print('任务异常后正在重试')elif res.status == 'STARTED':print('任务已经开始被执行')

AsyncResult下的方法

def failed(self):"""Return :const:`True` if the task failed."""return self.state == states.FAILUREdef successful(self):"""Return :const:`True` if the task executed successfully."""return self.state == states.SUCCESS

3.celery开启定时、延迟任务、异步任务

异步任务

task.delay(*args, **kwargs)

定时任务


app.conf.beat_schedule = {'send_sms_task': {'task': 'celery_task.add_task.send_sms', # 路径'schedule': timedelta(seconds=5), # 每五秒执行一次# 'schedule': crontab(hour=12, day_of_week=1),  # 每周一12点发送验证码'args': ('132xxxxxxxx', '7777'),},
}

延迟任务

task.apply_async(args=[参数,参数],eta=时间对象(utc时间))from datetime import timedelta, datetimeres = add.apply_async(args=(1, 2), eta=(datetime.utcnow() + timedelta(seconds=20)))print(res.task_id)  # c78505e2-614d-4bb2-930c-c73c325af519

三.在django中使用

在包内的celery.py中添加代码

import os
from celery import Celery
from datetime import timedelta
from celery.schedules import crontabos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy.settings.dev')
import django
django.setup()broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
app = Celery(main=__name__, broker=broker, backend=backend,include=['celery_tasks.home_tasks', 'celery_tasks.user_tasks'])

四.双写一致性

为了提高并发量和访问速度我们把数据存放到redis中

class SlideShowView(GenericViewSet, ListMixinView):queryset = SlideShow.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.SLIDE_SHOW_COUNT]serializer_class = SlideShowSerdef list(self, request, *args, **kwargs):result = cache.get('banner_list')if result:print('走了缓存')return APIResponse(code=1001, result=result)res = super().list(request, *args, **kwargs)result = res.data.get('result')cache.set('banner_list', result)print('走了数据库')return res

celery定时任务实现双写一致性

当把数据存放到redis中时我们修改数据库但是redis中的数据不会改变就会造成数据不一致的情况

- 解决方式一:

  1. 修改mysql数据库,删除缓存 【缓存的修改是在后】
  2. 修改数据库,修改缓存 【缓存的修改是在后】
  3. 定时更新缓存,针对于实时性不是很高的接口适合定时更新.

 - 解决方式二:

开启crlery定时每30分钟朝数据库获取一次数据存放到redis中

#home_tasks.py 首页相关任务
import timefrom .celery import app
from home.models import SlideShow
from django.conf import settings
from home.serializer import SlideShowSer
from django.core.cache import cache@app.task
def update_banner():# 更新缓存queryset = SlideShow.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.SLIDE_SHOW_COUNT]ser = SlideShowSer(instance=queryset, many=True)# print(ser.data)for item in ser.data:item['image'] = settings.HOST_URL + item['image']cache.set('banner_list', ser.data)return True
# celery.py
import os
from celery import Celery
from datetime import timedelta
from celery.schedules import crontabos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy.settings.dev')
import djangodjango.setup()broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
app = Celery(main=__name__, broker=broker, backend=backend,include=['celery_tasks.home_tasks', 'celery_tasks.user_tasks'])app.conf.beat_schedule = {# 定时任务'update_banner': {'task': 'celery_tasks.home_tasks.update_banner','schedule': timedelta(minutes=30),# 'schedule': crontab(hour=8, day_of_week=1),'args': (),},
}

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【Ctfer训练计划】——(三... 作者名:Demo不是emo  主页面链接:主页传送门 创作初心ÿ...