Упрощение асинхронных задач с помощью Celery Mongo Database Scheduler
Большинство реальных приложений созданы для обработки большого количества пользователей, сложной обработки данных и вычислений. Все это делается в фоновом режиме, не мешая пользователю. В этой статье я объясню работу Celery MongoDB Scheduler, связь Celery и Redis с MongoDB в проекте Django.
Сельдерей способен справиться с этими обязанностями
Согласно Википедии: Celery - это асинхронная очередь задач с открытым исходным кодом или очередь заданий, основанная на распределенной передаче сообщений. Хотя он поддерживает планирование.
У сельдерея есть две особенности:
- Рабочий: у него есть дочерние процессы, которые выполняют задачи.
- Beat: периодически отправляет задачи брокеру.
Архитектура
Позвольте мне это объяснить. Здесь наши задачи хранятся в коллекции в MongoDB, Celery Beat периодически выбирает задачи из MongoDB и передает Redis. В Redis есть очереди задач, в которых хранятся задачи. Рабочие процессы выбирают задачи из очереди задач и выполняют задачи. Полученные данные снова отправляются обратно в MongoDB, и эти данные хранятся в отдельной коллекции.
Теперь позвольте мне показать вам, как легко это можно сделать.
Установить все зависимости
1. Сельдерей
Https://docs.celeryproject.org/en/stable/getting-started/introduction.html
pip install celery
2. Celerybeat-mongo.
Для хранения задач в MongoDB: Для хранения задач в MongoDB мы собираемся использовать celerybeat-mongo
Этот замечательный проект, своевременно поддерживаемый на GitHub. Https://github.com/zmap/celerybeat-mongo
pip install celerybeat-mongo
3. Redis
Redis - брокер.
pip install redis
Примечание: для Windows также скачайте файл MSI.
4. MongoDB
MongoDB - это база данных NoSQL.
Проверить эту страницу для установки.
5. MongoEngine
Чтобы подключить Django к MongoDB:
pip install mongoengine
Структура каталога проекта
---Blogs |---Blogs |---__init__.py |---settings.py |---urls.py |---celery.py |---UserRegistration |---tasks.py |---utils.py |---BlogTasks |---tasks.py |---urls.py |---views.py manage.py
Давайте соединим их в три этапа
1. Подключите Django и MongoDB.
В вашем файле settings.py. DBName
, упомянутые здесь, будут созданы, если еще не были созданы.
import mongoengine mongoengine.connect(db='YourDBName', host='127.0.0.1', port='27017')
2. Подключите сельдерей и Redis.
В вашем файле settings.py.
# CELERY SETTINGS CELERY_BROKER_URL = 'redis://localhost:6379/0' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json'
3. Подключите Celery и MongoDB.
В вашем файле settings.py.
# CELERY MONGO SETTINGS CELERY_RESULT_BACKEND = "mongodb" CELERY_MONGODB_BACKEND_SETTINGS = { "host": "127.0.0.1", "port": 27017, "database": "jobs", "taskmeta_collection": "stock_taskmeta_collection", }
В вашей БД будет создана коллекция с именем jobs, в которой будут храниться метаданные, сгенерированные при выполнении задач.
celery.py
Теперь нам нужно вставить задачу в нашу MongoDB. Эта задача будет выбрана биткой сельдерея и выполнена рабочим.
Чтобы вставить задачу, нам просто нужно запустить метод, в котором будут все задачи. Посмотрите на пример ниже.
BlogTasks / tasks.py
Запустите create_tasks
, и это добавит запись в коллекцию schedule
. Здесь вы можете видеть, что мы создали задачу с именем user-account-creation-task.
. Метод, выполняемый user-account-creation-task
, находится в местоположении UserRegistration.tasks
, а метод - user_account_creation_task
.
Периодичность user-account-creation-task
- восемь часов. Но вы также можете сделать что-то вроде schedule[crontab][‘minute’] = ‘*/5’ and schedule[crontab][‘hour’] = ‘*’
, это означает, что задача будет выполняться каждые пять минут. Но если вы не изменили его на восемь часов, ваша задача будет выполняться каждые восемь часов пять минут.
Также обратите внимание, что мы используем проект celerybeat-mongo
GitHub, который мы уже установили, для создания задачи, проверки данных (с помощью PeriodicTaskSerializer
) и сохранения в MongoDB.
UserRegistration / tasks.py
from celery import task from UserRegistration.utils import account_creation @task def account_creation(): account_creation()
UserRegistration / utils.py
def account_creation(): # Apply your logic here print('Running account creation task!')
Теперь у нас есть все необходимые библиотеки и код в файлах, которые мы можем запустить celery worker, обыграть и проверить результат.
Запустите сервис mongod
Linux Env
service mongod start
Windows CMD
добавить местоположение mongod в переменные среды
C:\Program Files\MongoDB\Server\3.4\bin\
Запустите службу MongoDB с помощью cmd mongod
Сервис Redis
Linux Env
service redis start
Windows CMD
добавить Redis в переменные среды
C:\Program Files\Redis\
запустить службу Redis, используя redis-cli
Сельдерей
Сначала смените каталог на то, где находится файл manage.py
celery -A Blogs worker -l info
Если вы получаете какие-либо ошибки, установите eventlet
и запустите
celery -A Blogs worker -l info -P eventlet
Сельдерея
Сначала смените каталог на то, где находится файл manage.py:
celery -A Blogs beat -S celerybeatmongo.schedulers.MongoScheduler -l info
Здесь мы используем планировщик базы данных, который предоставляется celerybeat-mongo
. Его роль - выбирать задачи из базы данных и отправлять их работнику.
Если все сделано правильно, то в командной строке worker вы увидите результат печати Running account creation task!
Почему мы должны использовать планировщик базы данных?
В CloudGain мы предпочитаем использовать планировщик базы данных, потому что все расписания присутствуют в MongoDB. Это означает, что мы можем легко изменять частоту выполнения задач и запускать их без необходимости развертывания. Это упрощает задачу тестирования.
Что дальше?
Сельдерей - очень мощный продукт, который решит большинство ваших проблем, но за него приходится платить, и это утечка памяти. Вы найдете много ошибок и проблем, созданных на GitHub и других местах по этому поводу.
Итак, в моей следующей статье я объясню, как проверять утечки памяти и как с ними бороться.
Если у вас есть предложения, оставьте комментарий.