
How to just write a distributed web service in Python + AMQP
Hi, Habr. I have been writing in Python for quite some time. Recently I had to deal with RabbitMQ. I like it. Because without any problems (it is clear that with some subtleties) he is going to a cluster. Then I thought: it would be nice to use it as a message queue in a piece of the API of the project I'm working on. The API itself is written in tornado, the main idea was to exclude the blocking code from the API. All synchronous operations were performed in the thread pool.
The first thing I decided was to create a separate “worker” process (s) that would take care of all the synchronous work. Thought that “worker” was as simple as possible, and did tasks from the queue one by one. Say, I chose something from the database, answered, took on the next task, and so on. You can run a lot of “workers” themselves, and then AMQP already acts as a kind of IPC.
After some time, a module grew out of this, which takes on all the routine associated with AMQP and sending messages back and forth, and also compresses them with gzip if there is too much data. So the crew was born . Actually, using it, we will write a simple API that will consist of a server on tornado and simple and uncomplicated “worker” processes. Looking ahead I will say that all the code is available on github, and what I will talk about later is collected in the example folder .
So, let's figure it out in order. The first thing we will need to do is install RabbitMQ. I will not describe how to do this. I can only say that on the same ubuntu it is installed and works out of the box. On my Mac, the only thing I had to do was install LaunchRocket, which collected all the services that were installed via homebrew and displayed in the GUI:

Next, create our virtualenv project and install the module itself via pip:
The module’s dependencies intentionally do not specify tornado, since it may not exist on a host with a worker. And on the web part, they usually create requirements.txt, where all the other dependencies are indicated.
I will write the code in parts so as not to violate the order of the narrative. What we get in the end, you can see here .
The tornado server itself consists of two parts. In the first part, we define handlers of request handlers, and in the second, an event-loop is launched. Let's write a server and create our first api method.
Master.py file:
Thanks to coroutine's tornado, the code looks simple. You can write the same thing without coroutine.
Master.py file:
Our server is ready. But if we run it, and go to /, then we will not wait for an answer, there is no one to process it.
Now let's write a simple worker:
File worker.py:
So, as you can see in the code, there is a simple function wrapped by the Task (“test”) decorator , where test is the unique identifier of the task. Your worker cannot have two tasks with the same identifiers. Of course, it would be right to name the task “crew.example.test” (as I usually call it in the production environment), but for our example, just “test” is enough.
Context.settings.counter is immediately apparent. This is a certain context that is initialized in the worker process when the run function is called. Also, context already has context.headers - these are response headers to separate metadata from the response. In the callback function example, this dictionary is passed to _on_response.
Headers are reset after each response, but context.settings are not. I use context.settings to pass to the worker (s) function a database connection and any other object in general.
Worker also processes startup keys, there are not many of them:
Database connection URLs and other variables can be taken from environment variables. Therefore, the worker in the parameters only waits for him to connect to AMQP (host and port) and the logging level.
So, run everything and check:

When the tornado server started, tornado connected to RabbitMQ, created Exchange DLX and started listening to the DLX queue. This Dead-Letter-Exchange is a special queue that receives tasks that no worker took in a specific timeout. He also created a queue with a unique identifier, which will receive responses from workers.
After starting, the worker created in turn for each queue wrapped by the Task decorator and subscribed to them. When a task arrives, the main-loop worker creates one thread, controlling the execution time of the task in the main thread and executing the wrapped function. After returning from the wrapped function, it serializes it and queues the server responses.
After a request is received, the tornado server puts the task in the appropriate queue, while indicating the identifier of its unique queue, which should receive the response. If no worker took the task, then RabbitMQ redirects the task to exchange DLX and the tornado server receives a message that the queue timeout has expired, throwing an exception.
To demonstrate how the mechanism for completing tasks that hang during execution works, we write another web method and a task in worker.
Add to master.py file:
And add it to the list of handlers:
And in worker.py:
As can be seen from the above example, the task will go into an infinite loop. However, if the task does not complete in 3 seconds (counting the time it was received from the queue), the main-loop in the worker will send a SystemExit exception to the thread. And yes, you have to process it.
As mentioned above, a context is such a special object that is imported and has several built-in variables.
Let's make simple statistics on the answers of our worker.
Add the following handler to the master.py file:
We also register in the list of request handlers:
This handler is not very different from the previous ones, it simply returns the value that worker passed to it.
Now the task itself.
Add to the worker.py file:
The function returns a string with information about the number of tasks processed by the worker.
Now we implement a couple of handlers. One upon request will simply hang and wait, and the second will receive POST data. After the transfer of the latter, the first will give them away.
master.py:
Let's write the publish task.
worker.py:
If you do not need to transfer control to worker, you can simply publish directly from the tornado server
Often there is a situation where we can complete several tasks in parallel. Crew has a little syntactic sugar for this:
In this case, the task will be set two tasks in parallel and exit from with will be done at the end of the last.
But you need to be careful, as some task may cause an exception. It will be equated directly to the variable. So you need to check if test_result and stat_result are instances of the Exception class.
When eigrad suggested writing a layer that could run any wsgi application using crew, I immediately liked this idea. Just imagine, requests will not flow to your wsgi application, but will flow evenly through the queue to wsgi-worker.
I never wrote a wsgi server and I don’t even know where to start. But you can help me, pull-requests I accept.
I also think of adding client for another popular asynchronous framework, for twisted. But while I deal with him, and there is not enough free time.
Thanks to the developers of RabbitMQ and AMQP. Great ideas.
Also thank you readers. I hope that you have not wasted your time.
The first thing I decided was to create a separate “worker” process (s) that would take care of all the synchronous work. Thought that “worker” was as simple as possible, and did tasks from the queue one by one. Say, I chose something from the database, answered, took on the next task, and so on. You can run a lot of “workers” themselves, and then AMQP already acts as a kind of IPC.
After some time, a module grew out of this, which takes on all the routine associated with AMQP and sending messages back and forth, and also compresses them with gzip if there is too much data. So the crew was born . Actually, using it, we will write a simple API that will consist of a server on tornado and simple and uncomplicated “worker” processes. Looking ahead I will say that all the code is available on github, and what I will talk about later is collected in the example folder .
Training
So, let's figure it out in order. The first thing we will need to do is install RabbitMQ. I will not describe how to do this. I can only say that on the same ubuntu it is installed and works out of the box. On my Mac, the only thing I had to do was install LaunchRocket, which collected all the services that were installed via homebrew and displayed in the GUI:

Next, create our virtualenv project and install the module itself via pip:
mkdir -p api
cd api
virtualenv env
source env/bin/activate
pip install crew tornado
The module’s dependencies intentionally do not specify tornado, since it may not exist on a host with a worker. And on the web part, they usually create requirements.txt, where all the other dependencies are indicated.
I will write the code in parts so as not to violate the order of the narrative. What we get in the end, you can see here .
Writing a code
The tornado server itself consists of two parts. In the first part, we define handlers of request handlers, and in the second, an event-loop is launched. Let's write a server and create our first api method.
Master.py file:
# encoding: utf-8
import tornado.ioloop
import tornado.gen
import tornado.web
import tornado.options
class MainHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
# Вызываем задачу test c приоритетом 100
resp = yield self.application.crew.call('test', priority=100)
self.write("{0}: {1}".format(type(resp).__name__, str(resp)))
application = tornado.web.Application(
[
('/', MainHandler),
],
autoreload=True,
debug=True,
)
if __name__ == "__main__":
tornado.options.parse_command_line()
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
Thanks to coroutine's tornado, the code looks simple. You can write the same thing without coroutine.
Master.py file:
class MainHandler(tornado.web.RequestHandler):
def get(self):
# Вызываем задачу test c приоритетом 100
self.application.crew.call('test', priority=100, callback=self._on_response)
def _on_response(resp, headers):
self.write("{0}: {1}".format(type(resp).__name__, str(resp)))
Our server is ready. But if we run it, and go to /, then we will not wait for an answer, there is no one to process it.
Now let's write a simple worker:
File worker.py:
# encoding: utf-8
from crew.worker import run, context, Task
@Task('test')
def long_task(req):
context.settings.counter += 1
return 'Wake up Neo.\n'
run(
counter=0, # This is a part of this worker context
)
So, as you can see in the code, there is a simple function wrapped by the Task (“test”) decorator , where test is the unique identifier of the task. Your worker cannot have two tasks with the same identifiers. Of course, it would be right to name the task “crew.example.test” (as I usually call it in the production environment), but for our example, just “test” is enough.
Context.settings.counter is immediately apparent. This is a certain context that is initialized in the worker process when the run function is called. Also, context already has context.headers - these are response headers to separate metadata from the response. In the callback function example, this dictionary is passed to _on_response.
Headers are reset after each response, but context.settings are not. I use context.settings to pass to the worker (s) function a database connection and any other object in general.
Worker also processes startup keys, there are not many of them:
$ python worker.py --help
Usage: worker.py [options]
Options:
-h, --help show this help message and exit
-v, --verbose make lots of noise
--logging=LOGGING Logging level
-H HOST, --host=HOST RabbitMQ host
-P PORT, --port=PORT RabbitMQ port
Database connection URLs and other variables can be taken from environment variables. Therefore, the worker in the parameters only waits for him to connect to AMQP (host and port) and the logging level.
So, run everything and check:
$ python master.py & python worker.py

It works, but what happened behind the screen?
When the tornado server started, tornado connected to RabbitMQ, created Exchange DLX and started listening to the DLX queue. This Dead-Letter-Exchange is a special queue that receives tasks that no worker took in a specific timeout. He also created a queue with a unique identifier, which will receive responses from workers.
After starting, the worker created in turn for each queue wrapped by the Task decorator and subscribed to them. When a task arrives, the main-loop worker creates one thread, controlling the execution time of the task in the main thread and executing the wrapped function. After returning from the wrapped function, it serializes it and queues the server responses.
After a request is received, the tornado server puts the task in the appropriate queue, while indicating the identifier of its unique queue, which should receive the response. If no worker took the task, then RabbitMQ redirects the task to exchange DLX and the tornado server receives a message that the queue timeout has expired, throwing an exception.
Hanging task
To demonstrate how the mechanism for completing tasks that hang during execution works, we write another web method and a task in worker.
Add to master.py file:
class FastHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
try:
resp = yield self.application.crew.call(
'dead', persistent=False, priority=255, expiration=3,
)
self.write("{0}: {1}".format(type(resp).__name__, str(resp)))
except TimeoutError:
self.write('Timeout')
except ExpirationError:
self.write('All workers are gone')
And add it to the list of handlers:
application = tornado.web.Application(
[
(r"/", MainHandler),
(r"/stat", StatHandler),
],
autoreload=True,
debug=True,
)
And in worker.py:
@Task('dead')
def infinite_loop_task(req):
while True:
pass
As can be seen from the above example, the task will go into an infinite loop. However, if the task does not complete in 3 seconds (counting the time it was received from the queue), the main-loop in the worker will send a SystemExit exception to the thread. And yes, you have to process it.
Context
As mentioned above, a context is such a special object that is imported and has several built-in variables.
Let's make simple statistics on the answers of our worker.
Add the following handler to the master.py file:
class StatHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
resp = yield self.application.crew.call('stat', persistent=False, priority=0)
self.write("{0}: {1}".format(type(resp).__name__, str(resp)))
We also register in the list of request handlers:
application = tornado.web.Application(
[
(r"/", MainHandler),
(r"/fast", FastHandler),
(r"/stat", StatHandler),
],
autoreload=True,
debug=True,
)
This handler is not very different from the previous ones, it simply returns the value that worker passed to it.
Now the task itself.
Add to the worker.py file:
@Task('stat')
def get_counter(req):
context.settings.counter += 1
return 'I\'m worker "%s". And I serve %s tasks' % (context.settings.uuid, context.settings.counter)
The function returns a string with information about the number of tasks processed by the worker.
PubSub and Long polling
Now we implement a couple of handlers. One upon request will simply hang and wait, and the second will receive POST data. After the transfer of the latter, the first will give them away.
master.py:
class LongPoolingHandler(tornado.web.RequestHandler):
LISTENERS = []
@tornado.web.asynchronous
def get(self):
self.LISTENERS.append(self.response)
def response(self, data):
self.finish(str(data))
@classmethod
def responder(cls, data):
for cb in cls.LISTENERS:
cb(data)
cls.LISTENERS = []
class PublishHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def post(self, *args, **kwargs):
resp = yield self.application.crew.call('publish', self.request.body)
self.finish(str(resp))
...
application = tornado.web.Application(
[
(r"/", MainHandler),
(r"/stat", StatHandler),
(r"/fast", FastHandler),
(r'/subscribe', LongPoolingHandler),
(r'/publish', PublishHandler),
],
autoreload=True,
debug=True,
)
application.crew = Client()
application.crew.subscribe('test', LongPoolingHandler.responder)
if __name__ == "__main__":
application.crew.connect()
tornado.options.parse_command_line()
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
Let's write the publish task.
worker.py:
@Task('publish')
def publish(req):
context.pubsub.publish('test', req)
If you do not need to transfer control to worker, you can simply publish directly from the tornado server
class PublishHandler2(tornado.web.RequestHandler):
def post(self, *args, **kwargs):
self.application.crew.publish('test', self.request.body)
Parallel tasks
Often there is a situation where we can complete several tasks in parallel. Crew has a little syntactic sugar for this:
class Multitaskhandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self, *args, **kwargs):
with self.application.crew.parallel() as mc:
# mc - multiple calls
mc.call('test')
mc.call('stat')
test_result, stat_result = yield mc.result()
self.set_header('Content-Type', 'text/plain')
self.write("Test result: {0}\nStat result: {1}".format(test_result, stat_result))
In this case, the task will be set two tasks in parallel and exit from with will be done at the end of the last.
But you need to be careful, as some task may cause an exception. It will be equated directly to the variable. So you need to check if test_result and stat_result are instances of the Exception class.
Future plans
When eigrad suggested writing a layer that could run any wsgi application using crew, I immediately liked this idea. Just imagine, requests will not flow to your wsgi application, but will flow evenly through the queue to wsgi-worker.
I never wrote a wsgi server and I don’t even know where to start. But you can help me, pull-requests I accept.
I also think of adding client for another popular asynchronous framework, for twisted. But while I deal with him, and there is not enough free time.
Acknowledgments
Thanks to the developers of RabbitMQ and AMQP. Great ideas.
Also thank you readers. I hope that you have not wasted your time.