Celery - distributed job queue

    This time we decided to talk about a wonderful product that we use in our work. It will be about Celery - "distributed task queue". This is a distributed asynchronous job queue that has wide functionality. In our site builder, we often have to run tasks that are asynchronous in terms of response to the user. Unfortunately, there is not much information on this product on the habr, but it deserves a separate mention, we want to fix it.

    So, what can Celery do :

    • Perform tasks asynchronously or synchronously
    • Perform periodic tasks (smart crond replacement)
    • Complete pending tasks
    • Distributed execution (can be run on N servers)
    • Within the same worker, several tasks can be competitively executed (simultaneously)
    • Run the task again if exception crawls out
    • Limit the number of tasks per unit of time (rate limit, for a task or globally)
    • Routing of tasks (to what worker what to do)
    • It’s easy to monitor tasks
    • Complete sub-quests
    • Email exception reports
    • Check if the task has been completed (convenient for building Ajax applications where the user is waiting for the completion)

    Interested in? We ask for cat.

    Let's start with the configuration of the worker. This is a daemon that actually receives tasks from the queue and executes them. The recommended queue is RabbitMQ, but for now we have limited ourselves to ghettoq, through MongoDB. Also supported by Redis and RDBMS.

    celeryconfig.py:

    CARROT_BACKEND = "ghettoq.taproot.MongoDB"
    BROKER_HOST = "xxx"  
    BROKER_PORT = 27017         
    BROKER_VHOST = "celery"    
    CELERY_SEND_TASK_ERROR_EMAILS = True
    ADMINS = ( ('Admin', 'admin@localhost'), )
    CELERYD_MAX_TASKS_PER_CHILD = 5
    CELERY_IMPORTS = ("tasks", )
    CELERY_DISABLE_RATE_LIMITS = True
    CELERY_RESULT_BACKEND = "mongodb"
    CELERY_MONGODB_BACKEND_SETTINGS = {
        "host": "xxx",
        "port": 27017,
        "database": "celery",
        "taskmeta_collection": "my_taskmeta_collection",
    }
    


    Launching the daemon: celeryd -l INFO -B
    Enable logging to the console and the -B option starts the daemon for periodic jobs. The latter can be launched separately with the celerybeat command.

    Now we will create a test task. In the config, we import tasks, therefore the tasks file for us is tasks.py:

    from celery.decorators import task
    from celery.decorators import periodic_task
    from celery.task.schedules import crontab
    @periodic_task(run_every=timedelta(seconds=60))
    def mail_queue():
        print "Task is executed every minute"
    @periodic_task(run_every=crontab(hour=0, minute=10))
    def transactions():
        print "Task is executed every day on 0:10"
    @task
    def delayed_function(id):
        some_function()
    @task
    def delayed_heavy_function(id):
        some_heavy_function()
    


    So, we have 4 tasks in tasks. The first two are scheduled, as they are marked with the @periodic_task decorator. But the last two will be called directly from the program code. In this way:

    from tasks import delayed_function, delayed_heavy_function
    delayed_function.apply_async(args=[id], countdown=300) # Будет запущена через 300 секунд
    r = delayed_heavy_function.delay(id) #Будет запущена сразу(как только появится возможность), в асинхронном режиме
    


    Now, in order to track the result and the fact of the completion of the last task, we will execute:

    r.ready () # Returns True if the task has completed
    r.result # Returns the value of the executed function or None if it has not yet completed (asynchronously)
    r.get () # Will wait for execution tasks and returns its result (synchronously) The

    variable r can be run through cPickle, put the value in the cache and ajax to query the status of the task. Or you can get the task id, and put it in the cache. In addition, you can set the task id yourself, the main thing is that it is unique.

    After using celery heavily, we found several errors related to delayed execution of tasks with the ghettoq queue manager, but they were all corrected by the author on the day the issue was created on github, for which I thank him.

    Not so long ago, version 2.0 was released, which ceased to be django-dependent, and integration with django was now moved to a separate sub-project celery-django.

    Two limitations of celery can be distinguished, more precisely, these are just features: workers will not work on standard FreeBSD, as there is no python multiprocessing, although there are recipes for building a kernel for celery on the network; To overload tasks, you must restart the worker so that it loads the new python code for the tasks and related functions. It works great on linux.

    Also popular now: