Упрощение асинхронных задач с помощью Celery Mongo Database Scheduler

Большинство реальных приложений созданы для обработки большого количества пользователей, сложной обработки данных и вычислений. Все это делается в фоновом режиме, не мешая пользователю. В этой статье я объясню работу Celery MongoDB Scheduler, связь Celery и Redis с MongoDB в проекте Django.

Сельдерей способен справиться с этими обязанностями

Согласно Википедии: Celery - это асинхронная очередь задач с открытым исходным кодом или очередь заданий, основанная на распределенной передаче сообщений. Хотя он поддерживает планирование.

У сельдерея есть две особенности:

  1. Рабочий: у него есть дочерние процессы, которые выполняют задачи.
  2. Beat: периодически отправляет задачи брокеру.

Архитектура

Позвольте мне это объяснить. Здесь наши задачи хранятся в коллекции в MongoDB, Celery Beat периодически выбирает задачи из MongoDB и передает Redis. В Redis есть очереди задач, в которых хранятся задачи. Рабочие процессы выбирают задачи из очереди задач и выполняют задачи. Полученные данные снова отправляются обратно в MongoDB, и эти данные хранятся в отдельной коллекции.

Теперь позвольте мне показать вам, как легко это можно сделать.

Установить все зависимости

1. Сельдерей

Https://docs.celeryproject.org/en/stable/getting-started/introduction.html

pip install celery

2. Celerybeat-mongo.

Для хранения задач в MongoDB: Для хранения задач в MongoDB мы собираемся использовать celerybeat-mongo Этот замечательный проект, своевременно поддерживаемый на GitHub. Https://github.com/zmap/celerybeat-mongo

pip install celerybeat-mongo

3. Redis

Redis - брокер.

pip install redis

Примечание: для Windows также скачайте файл MSI.

4. MongoDB

MongoDB - это база данных NoSQL.

Проверить эту страницу для установки.

5. MongoEngine

Чтобы подключить Django к MongoDB:

pip install mongoengine

Структура каталога проекта

---Blogs
  |---Blogs
    |---__init__.py
    |---settings.py
    |---urls.py
    |---celery.py
  |---UserRegistration
    |---tasks.py
    |---utils.py
  |---BlogTasks
    |---tasks.py
    |---urls.py
    |---views.py 
manage.py

Давайте соединим их в три этапа

1. Подключите Django и MongoDB.

В вашем файле settings.py. DBName, упомянутые здесь, будут созданы, если еще не были созданы.

import mongoengine
mongoengine.connect(db='YourDBName', host='127.0.0.1', port='27017')

2. Подключите сельдерей и Redis.

В вашем файле settings.py.

# CELERY SETTINGS
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

3. Подключите Celery и MongoDB.

В вашем файле settings.py.

# CELERY MONGO SETTINGS
CELERY_RESULT_BACKEND = "mongodb"
CELERY_MONGODB_BACKEND_SETTINGS = {
    "host": "127.0.0.1",
    "port": 27017,
    "database": "jobs",
    "taskmeta_collection": "stock_taskmeta_collection",
}

В вашей БД будет создана коллекция с именем jobs, в которой будут храниться метаданные, сгенерированные при выполнении задач.

celery.py

Теперь нам нужно вставить задачу в нашу MongoDB. Эта задача будет выбрана биткой сельдерея и выполнена рабочим.

Чтобы вставить задачу, нам просто нужно запустить метод, в котором будут все задачи. Посмотрите на пример ниже.

BlogTasks / tasks.py

Запустите create_tasks, и это добавит запись в коллекцию schedule. Здесь вы можете видеть, что мы создали задачу с именем user-account-creation-task. . Метод, выполняемый user-account-creation-task, находится в местоположении UserRegistration.tasks, а метод - user_account_creation_task.

Периодичность user-account-creation-task - восемь часов. Но вы также можете сделать что-то вроде schedule[crontab][‘minute’] = ‘*/5’ and schedule[crontab][‘hour’] = ‘*’, это означает, что задача будет выполняться каждые пять минут. Но если вы не изменили его на восемь часов, ваша задача будет выполняться каждые восемь часов пять минут.

Также обратите внимание, что мы используем проект celerybeat-mongo GitHub, который мы уже установили, для создания задачи, проверки данных (с помощью PeriodicTaskSerializer) и сохранения в MongoDB.

UserRegistration / tasks.py

from celery import task
from UserRegistration.utils import account_creation

@task
def account_creation():
    account_creation()

UserRegistration / utils.py

def account_creation():
    # Apply your logic here
    print('Running account creation task!')

Теперь у нас есть все необходимые библиотеки и код в файлах, которые мы можем запустить celery worker, обыграть и проверить результат.

Запустите сервис mongod

Linux Env

service mongod start

Windows CMD

добавить местоположение mongod в переменные среды

C:\Program Files\MongoDB\Server\3.4\bin\

Запустите службу MongoDB с помощью cmd mongod

Сервис Redis

Linux Env

service redis start

Windows CMD

добавить Redis в переменные среды

C:\Program Files\Redis\

запустить службу Redis, используя redis-cli

Сельдерей

Сначала смените каталог на то, где находится файл manage.py

celery -A Blogs worker -l info

Если вы получаете какие-либо ошибки, установите eventlet и запустите

celery -A Blogs worker -l info -P eventlet

Сельдерея

Сначала смените каталог на то, где находится файл manage.py:

celery -A Blogs beat -S celerybeatmongo.schedulers.MongoScheduler -l info

Здесь мы используем планировщик базы данных, который предоставляется celerybeat-mongo. Его роль - выбирать задачи из базы данных и отправлять их работнику.

Если все сделано правильно, то в командной строке worker вы увидите результат печати Running account creation task!

Почему мы должны использовать планировщик базы данных?

В CloudGain мы предпочитаем использовать планировщик базы данных, потому что все расписания присутствуют в MongoDB. Это означает, что мы можем легко изменять частоту выполнения задач и запускать их без необходимости развертывания. Это упрощает задачу тестирования.

Что дальше?

Сельдерей - очень мощный продукт, который решит большинство ваших проблем, но за него приходится платить, и это утечка памяти. Вы найдете много ошибок и проблем, созданных на GitHub и других местах по этому поводу.

Итак, в моей следующей статье я объясню, как проверять утечки памяти и как с ними бороться.

Если у вас есть предложения, оставьте комментарий.