Python celery
Quoting authors of the project:
- Celery is "an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well. (...) Tasks can execute asynchronously (in the background) or synchronously (wait until ready)."
Installation
Install the package python-celery. As with most python-based packages you get a package compatible with Python 3.x.
Quoting Celery documentation: "Celery requires a solution to send and receive messages" - one of the options is rabbitmq which also can be installed from official repositories.
Configuration
Celery
For configuration files, the directory /etc/celery/
needs to be created with a configuration file named app.conf
where app is the name of your application. An example configuration file is provided within Celery documentation.
Start/enable the celery@app.service
.
To run celery in a virtualenv, make a copy of celery@.service
in /etc/systemd/system
so you can customize it, and change the paths of the celery binary to the copy in your virtualenv.
RabbitMQ
RabbitMQ stores its configuration within /etc/rabbitmq/rabbitmq-env.conf
The default configuration:
NODENAME=rabbit@rakieta NODE_IP_ADDRESS=0.0.0.0 NODE_PORT=5672 LOG_BASE=/var/log/rabbitmq MNESIA_BASE=/var/lib/rabbitmq/mnesia
You probably want to replace 0.0.0.0
with 127.0.0.1
, RabbitMQ does not support Unix sockets.
For simple configurations, you may also want to add HOME=/var/lib/rabbitmq
. Read more about environmental variables within RabbitMQ docs
Start/enable the rabbitmq.service
.
rabbitmq-service
is being started as rabbitmq user with home folder stored within /var/lib/rabbitmq
- you may want to make sure rabbitmq user owns this folder and all subfoldersFollow RabbitMQ documentation and add your user and virtual host:
$ cd /var/lib/rabbitmq $ su rabbitmq -c 'rabbitmqctl add_user myuser mypassword' $ su rabbitmq -c 'rabbitmqctl add_vhost myvhost' $ su rabbitmq -c 'rabbitmqctl set_user_tags myuser mytag' $ su rabbitmq -c 'rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"'
Read RabbitMQ admin guide to understand the above.
If issuing su rabbitmq -c "rabbitmqctl status"
results in badrpc,nodedown
visit this blog post for more information how to fix the problem.
su rabbitmq -c "erl"
and as a result you should get an erlang prompt with no errorsSecurity
You may want to read a security section from relevant Celery documentation
Example task
Celery application
Follow Celery documentation to create a python sample task:
$ nano test.py
from celery import Celery app = Celery('tasks', backend='amqp', broker='amqp://myuser:mypassword@localhost:5672/myvhost') @app.task def add(x, y): return x + y
amqp://myuser:mypassword@localhost:5672/myvhost
- use the same credentials/vhost you have created when configuring RabbitMQ
backend='amqp'
- this parameter is optional since RabbitMQ is the default broker utilised by celery.
Test run
While in the same directory as your test.py
you can run:
$ celery -A task worker --loglevel=info
Then from another console (but within same directory) create:
$ nano call.py
from test import add add.delay(4, 4)
Run it:
$ python call.py
First, the console should log some information suggesting worker was called:
Received task: task.add[f4aff99a-7477-44db-9f6e-7e0f9342cd4e] Task task.add[f4aff99a-7477-44db-9f6e-7e0f9342cd4e] succeeded in 0.0007182330009527504s: 8
Prepare module for Celery service
Procedure below is slightly different than what you will find within Celery documentation
Create test_task
module:
# mkdir /lib/python3.5/site-packages/test_task # touch /lib/python3.5/site-packages/test_task/__init__.py # touch /lib/python3.5/site-packages/test_task/test_task.py # touch /lib/python3.5/site-packages/test_task/celery.py
# nano /lib/python3.5/site-packages/test_task/celery.py
from __future__ import absolute_import from celery import Celery app = Celery('tasks', backend='amqp', broker='amqp://myuser:mypassword@localhost:5672/myvhost') if __name__ == '__main__': app.start()
# nano /lib/python3.5/site-packages/test_task/test_task.py
from __future__ import absolute_import from test_task.celery import app @app.task def add(x, y): return x + y
At this point if you issue python
in your console you should be able to issue following without any error:
>>> from test_task import celery
In /etc/celery/celery.conf
replace:
CELERY_APP="proj"
with the following line:
CELERY_APP="test_task"
Restart the celery@celery.service
.
Run tasks periodically
Tasks can be ran periodicaly through Celery Beat, basic setup is described within relevant Celery documentation pages. An example:
If you want to specify CELERYBEAT_SCHEDULE
within your celery.py
, then you need to add the app.conf
prefix to make celery recognise your scheduled tasks. After that you need to add the --beat --schedule=/var/lib/celery/celerybeat-schedule
parameters when you start the celery daemon. Further, the /var/lib/celery
directory must exist within the celery-relevant environment and be owned by the user that runs celery.