Python celery

From ArchWiki
The printable version is no longer supported and may have rendering errors. Please update your browser bookmarks and please use the default browser print function instead.

Quoting authors of the project:

Celery is "an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well. (...) Tasks can execute asynchronously (in the background) or synchronously (wait until ready)."

Installation

Install the package python-celery. As with most python-based packages you get a package compatible with Python 3.x.

Quoting Celery documentation: "Celery requires a solution to send and receive messages" - one of the options is rabbitmq which also can be installed from official repositories.

Configuration

Celery

For configuration files, the directory /etc/celery/ needs to be created with a configuration file named app.conf where app is the name of your application. An example configuration file is provided within Celery documentation.

Start/enable the celery@app.service.

To run celery in a virtualenv, make a copy of celery@.service in /etc/systemd/system so you can customize it, and change the paths of the celery binary to the copy in your virtualenv.

RabbitMQ

RabbitMQ stores its configuration within /etc/rabbitmq/rabbitmq-env.conf

The default configuration:

   NODENAME=rabbit@rakieta
   NODE_IP_ADDRESS=0.0.0.0
   NODE_PORT=5672
   
   LOG_BASE=/var/log/rabbitmq
   MNESIA_BASE=/var/lib/rabbitmq/mnesia

You probably want to replace 0.0.0.0 with 127.0.0.1, RabbitMQ does not support Unix sockets.

For simple configurations, you may also want to add HOME=/var/lib/rabbitmq. Read more about environmental variables within RabbitMQ docs

Start/enable the rabbitmq.service.

Note: rabbitmq-service is being started as rabbitmq user with home folder stored within /var/lib/rabbitmq - you may want to make sure rabbitmq user owns this folder and all subfolders

Follow RabbitMQ documentation and add your user and virtual host:

$ cd /var/lib/rabbitmq
$ su rabbitmq -c 'rabbitmqctl add_user myuser mypassword'
$ su rabbitmq -c 'rabbitmqctl add_vhost myvhost'
$ su rabbitmq -c 'rabbitmqctl set_user_tags myuser mytag'
$ su rabbitmq -c 'rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"'

Read RabbitMQ admin guide to understand the above.

If issuing su rabbitmq -c "rabbitmqctl status" results in badrpc,nodedown visit this blog post for more information how to fix the problem.

Note: You may also want to run su rabbitmq -c "erl" and as a result you should get an erlang prompt with no errors

Security

You may want to read a security section from relevant Celery documentation

Example task

Celery application

Follow Celery documentation to create a python sample task:

$ nano test.py
from celery import Celery
    
    app = Celery('tasks', backend='amqp', broker='amqp://myuser:mypassword@localhost:5672/myvhost')
    
    @app.task
    def add(x, y):
        return x + y

amqp://myuser:mypassword@localhost:5672/myvhost - use the same credentials/vhost you have created when configuring RabbitMQ

backend='amqp' - this parameter is optional since RabbitMQ is the default broker utilised by celery.

Test run

While in the same directory as your test.py you can run:

$ celery -A task worker --loglevel=info

Then from another console (but within same directory) create:

$ nano call.py
from test import add
    
    add.delay(4, 4)

Run it:

$ python call.py

First, the console should log some information suggesting worker was called:

Received task: task.add[f4aff99a-7477-44db-9f6e-7e0f9342cd4e]
Task task.add[f4aff99a-7477-44db-9f6e-7e0f9342cd4e] succeeded in 0.0007182330009527504s: 8

Prepare module for Celery service

Procedure below is slightly different than what you will find within Celery documentation

Create test_task module:

# mkdir /lib/python3.5/site-packages/test_task
# touch /lib/python3.5/site-packages/test_task/__init__.py
# touch /lib/python3.5/site-packages/test_task/test_task.py
# touch /lib/python3.5/site-packages/test_task/celery.py
# nano /lib/python3.5/site-packages/test_task/celery.py
from __future__ import absolute_import

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://myuser:mypassword@localhost:5672/myvhost')

if __name__ == '__main__':
 app.start()
# nano /lib/python3.5/site-packages/test_task/test_task.py
from __future__ import absolute_import

from test_task.celery import app

@app.task
def add(x, y):
 return x + y

At this point if you issue python in your console you should be able to issue following without any error:

>>> from test_task import celery

In /etc/celery/celery.conf replace:

CELERY_APP="proj"

with the following line:

CELERY_APP="test_task"

Restart the celery@celery.service.

Run tasks periodically

Tasks can be ran periodicaly through Celery Beat, basic setup is described within relevant Celery documentation pages. An example:

If you want to specify CELERYBEAT_SCHEDULE within your celery.py, then you need to add the app.conf prefix to make celery recognise your scheduled tasks. After that you need to add the --beat --schedule=/var/lib/celery/celerybeat-schedule parameters when you start the celery daemon. Further, the /var/lib/celery directory must exist within the celery-relevant environment and be owned by the user that runs celery.