Celery: best practices
- Transfer
If you work with Django, then at some stage of development you may need background processing of long-running tasks. It is possible that for this kind of task you are using some kind of tool to manage task queues. Celery is one of the most popular projects for solving similar problems in the world of python and Django at the moment, but there are other projects for this purpose.
While I was working on some projects using Celery to manage task queues, some of the best practices that I decided to document came to light. However, these are big words for what I think about the right approach to solving such problems, as well as about some underutilized opportunities that the community of the Celery project offers.
Let me explain why I think this is wrong (in addition to the limitations described in the Celery documentation).
DBMSs were not developed for those tasks that are performed by a full-fledged AMQP broker such as RabbitMQ. It will fall in “combat” conditions, even on a project with not very large traffic / user base.
I assume that the most popular reason why people decide to use a DBMS is that, as a rule, they already have one DBMS for a web application, so why not use it again. Getting started with this option is easy and there is no need to worry about other components (such as RabbitMQ).
Assume a not-so-hypothetical scenario: you have 4 background workers for processing that you put into the database. This means that you get 4 processes that quite often request a database of new tasks, not to mention the fact that each of them can have its own competing threads. At some point in time, you understand that the delay in processing tasks is growing, and therefore there are more new tasks than being completed, you need to increase the number of workers. Suddenly, the speed of your database begins to “sink” due to the huge number of requests from workers to the database, disk input / output exceeds the specified limits, and all this starts to affect your application, since the workers actually launched a DDOS attack on your database.
This would not happen when using a full-fledged AMQP broker, since the queue is placed in memory and thus eliminates the high load on the hard disk. Consumers (workers) do not need to frequently request information, since the queue has a mechanism for delivering a new task to the worker, and even if the AMQP broker is overloaded for any other reason, this will not lead to the crash and brakes of the web application that interacts with the user .
I will go even further and say that you should not use the DBMS as a broker even during the development process, when there are such things as Docker and many pre-configured images that provide customized RabbitMQ out of the box .
Celery is very easy to start using, and it immediately provides one default queue, into which all tasks are placed until another Celery behavior is explicitly prescribed. The most common example of what you can see:
What happens if both tasks are placed in the same queue, unless otherwise specified in the celeryconfig.py file. I fully understand how this approach can justify, you have one decorator that creates convenient background tasks. Here I would like to draw attention to the fact that taskA and taskB, while in the same queue, can do completely different things, and thus one of them can be much more important than the other, so why are they all in the same basket? Even if you have one worker, imagine a situation where the less important taskB task will turn out to be so massive that the more important taskA task the worker will not be able to pay the necessary attention. This leads us to the next point.
By solving the problem mentioned above, placing taskA task in one queue, and taskB in another and after that assign x workers to the Q1 queue, and the rest to Q2 processing, since more tasks come into it. Thus, you can be sure that taskB will receive enough workers, and the rest, meanwhile, will process a lower priority task when it arrives without provoking a long wait and processing. Therefore, determine your queues yourself:
And your routers, which determine where to direct the task:
This will allow workers to complete each task:
Most of the tasks that I have seen do not have error handling mechanisms. If an error occurs in the task, then it just crashes. This may be convenient for some tasks, but most of the tasks that I saw interacted with external APIs and crashed due to some types of network errors or other problems of “resource availability”. The simplest approach to handling such errors is to overfulfill the task code, since, possibly, the problems of interacting with the external API have already been eliminated.
I like to define the default wait time for the task, which it will wait before it tries to execute again and how many attempts to exceed it will take before finally throwing an error (default_retry_delay and max_retries, respectively). This is the simplest form of error handling that I can imagine, but I have seen that it is practically not used. Of course, Celery has more sophisticated error handling methods, as described in the Celery documentation.
Flower is a great tool for monitoring the status of your tasks and Celery workers. The tool has a web interface and it allows such things as:
You can see the full list of features at the link provided.
Task status is information about whether the task completed successfully or not. It may be useful for some statistical indicators. An important thing that should be understood in this case: the status of the task is not the resulting data and the work that it performed, such information is most similar to implicit changes written to the database (such as, for example, changes to the user's friends list).
In most of the projects that I saw, they didn’t really care about the data on the status of the task after its completion, using the sqlite database, which is offered by default, or better than that, they spent time using large DBMSs like PostgreSQL. Why just load your application database? Use CELERY_IGNORE_RESULT = True in your celeryconfig.py settings file and discard such data.
After discussing the above at meetings of local python developer groups, some people suggested adding an extra item to the list. What is he talking about? You should not transfer database objects, for example, a user model, to a background task, since already serialized and incorrect data may appear in a serialized object. If you need to, then pass the user ID to the task, and in the task itself, request a database about this user.
While I was working on some projects using Celery to manage task queues, some of the best practices that I decided to document came to light. However, these are big words for what I think about the right approach to solving such problems, as well as about some underutilized opportunities that the community of the Celery project offers.
No.1: Do not use the DBMS as your AMQP broker
Let me explain why I think this is wrong (in addition to the limitations described in the Celery documentation).
DBMSs were not developed for those tasks that are performed by a full-fledged AMQP broker such as RabbitMQ. It will fall in “combat” conditions, even on a project with not very large traffic / user base.
I assume that the most popular reason why people decide to use a DBMS is that, as a rule, they already have one DBMS for a web application, so why not use it again. Getting started with this option is easy and there is no need to worry about other components (such as RabbitMQ).
Assume a not-so-hypothetical scenario: you have 4 background workers for processing that you put into the database. This means that you get 4 processes that quite often request a database of new tasks, not to mention the fact that each of them can have its own competing threads. At some point in time, you understand that the delay in processing tasks is growing, and therefore there are more new tasks than being completed, you need to increase the number of workers. Suddenly, the speed of your database begins to “sink” due to the huge number of requests from workers to the database, disk input / output exceeds the specified limits, and all this starts to affect your application, since the workers actually launched a DDOS attack on your database.
This would not happen when using a full-fledged AMQP broker, since the queue is placed in memory and thus eliminates the high load on the hard disk. Consumers (workers) do not need to frequently request information, since the queue has a mechanism for delivering a new task to the worker, and even if the AMQP broker is overloaded for any other reason, this will not lead to the crash and brakes of the web application that interacts with the user .
I will go even further and say that you should not use the DBMS as a broker even during the development process, when there are such things as Docker and many pre-configured images that provide customized RabbitMQ out of the box .
No.2: Use more queues (i.e. not just one, which is given by default)
Celery is very easy to start using, and it immediately provides one default queue, into which all tasks are placed until another Celery behavior is explicitly prescribed. The most common example of what you can see:
@app.task()defmy_taskA(a, b, c):
print("doing something here...")
@app.task()defmy_taskB(x, y):
print("doing something here...")
What happens if both tasks are placed in the same queue, unless otherwise specified in the celeryconfig.py file. I fully understand how this approach can justify, you have one decorator that creates convenient background tasks. Here I would like to draw attention to the fact that taskA and taskB, while in the same queue, can do completely different things, and thus one of them can be much more important than the other, so why are they all in the same basket? Even if you have one worker, imagine a situation where the less important taskB task will turn out to be so massive that the more important taskA task the worker will not be able to pay the necessary attention. This leads us to the next point.
No.3: Use Worker Priorities
By solving the problem mentioned above, placing taskA task in one queue, and taskB in another and after that assign x workers to the Q1 queue, and the rest to Q2 processing, since more tasks come into it. Thus, you can be sure that taskB will receive enough workers, and the rest, meanwhile, will process a lower priority task when it arrives without provoking a long wait and processing. Therefore, determine your queues yourself:
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('for_task_A', Exchange('for_task_A'), routing_key='for_task_A'),
Queue('for_task_B', Exchange('for_task_B'), routing_key='for_task_B'),
)
And your routers, which determine where to direct the task:
CELERY_ROUTES = {
'my_taskA': {'queue': 'for_task_A', 'routing_key': 'for_task_A'},
'my_taskB': {'queue': 'for_task_B', 'routing_key': 'for_task_B'},
}
This will allow workers to complete each task:
celery worker -E -l INFO -n workerA -Q for_task_A
celery worker -E -l INFO -n workerB -Q for_task_B
No.4: use Celery mechanisms to handle errors
Most of the tasks that I have seen do not have error handling mechanisms. If an error occurs in the task, then it just crashes. This may be convenient for some tasks, but most of the tasks that I saw interacted with external APIs and crashed due to some types of network errors or other problems of “resource availability”. The simplest approach to handling such errors is to overfulfill the task code, since, possibly, the problems of interacting with the external API have already been eliminated.
@app.task(bind=True, default_retry_delay=300, max_retries=5)defmy_task_A():try:
print("doing stuff here...")
except SomeNetworkException as e:
print("maybe do some clenup here....")
self.retry(e)
I like to define the default wait time for the task, which it will wait before it tries to execute again and how many attempts to exceed it will take before finally throwing an error (default_retry_delay and max_retries, respectively). This is the simplest form of error handling that I can imagine, but I have seen that it is practically not used. Of course, Celery has more sophisticated error handling methods, as described in the Celery documentation.
No.5: use Flower
Flower is a great tool for monitoring the status of your tasks and Celery workers. The tool has a web interface and it allows such things as:
- task progress
- execution details
- worker status
- launch new workers
You can see the full list of features at the link provided.
No.6: Track task status only if you need it
Task status is information about whether the task completed successfully or not. It may be useful for some statistical indicators. An important thing that should be understood in this case: the status of the task is not the resulting data and the work that it performed, such information is most similar to implicit changes written to the database (such as, for example, changes to the user's friends list).
In most of the projects that I saw, they didn’t really care about the data on the status of the task after its completion, using the sqlite database, which is offered by default, or better than that, they spent time using large DBMSs like PostgreSQL. Why just load your application database? Use CELERY_IGNORE_RESULT = True in your celeryconfig.py settings file and discard such data.
No.7: do not pass database objects \ ORM to task
After discussing the above at meetings of local python developer groups, some people suggested adding an extra item to the list. What is he talking about? You should not transfer database objects, for example, a user model, to a background task, since already serialized and incorrect data may appear in a serialized object. If you need to, then pass the user ID to the task, and in the task itself, request a database about this user.