How to just write a distributed web service in Python + AMQP

Hi, Habr. I have been writing in Python for quite some time. Recently I had to deal with RabbitMQ. I like it. Because without any problems (it is clear that with some subtleties) he is going to a cluster. Then I thought: it would be nice to use it as a message queue in a piece of the API of the project I'm working on. The API itself is written in tornado, the main idea was to exclude the blocking code from the API. All synchronous operations were performed in the thread pool.

The first thing I decided was to create a separate “worker” process (s) that would take care of all the synchronous work. Thought that “worker” was as simple as possible, and did tasks from the queue one by one. Say, I chose something from the database, answered, took on the next task, and so on. You can run a lot of “workers” themselves, and then AMQP already acts as a kind of IPC.

After some time, a module grew out of this, which takes on all the routine associated with AMQP and sending messages back and forth, and also compresses them with gzip if there is too much data. So the crew was born . Actually, using it, we will write a simple API that will consist of a server on tornado and simple and uncomplicated “worker” processes. Looking ahead I will say that all the code is available on github, and what I will talk about later is collected in the example folder .

Training


So, let's figure it out in order. The first thing we will need to do is install RabbitMQ. I will not describe how to do this. I can only say that on the same ubuntu it is installed and works out of the box. On my Mac, the only thing I had to do was install LaunchRocket, which collected all the services that were installed via homebrew and displayed in the GUI:

Launch rocket

Next, create our virtualenv project and install the module itself via pip:

mkdir -p api
cd api
virtualenv env
source env/bin/activate
pip install crew tornado


The module’s dependencies intentionally do not specify tornado, since it may not exist on a host with a worker. And on the web part, they usually create requirements.txt, where all the other dependencies are indicated.

I will write the code in parts so as not to violate the order of the narrative. What we get in the end, you can see here .

Writing a code


The tornado server itself consists of two parts. In the first part, we define handlers of request handlers, and in the second, an event-loop is launched. Let's write a server and create our first api method.

Master.py file:
# encoding: utf-8
import tornado.ioloop
import tornado.gen
import tornado.web
import tornado.options
class MainHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        # Вызываем задачу test c приоритетом 100
        resp = yield self.application.crew.call('test', priority=100)
        self.write("{0}: {1}".format(type(resp).__name__, str(resp)))
application = tornado.web.Application(
    [
        ('/', MainHandler),
    ],
    autoreload=True,
    debug=True,
)
if __name__ == "__main__":
    tornado.options.parse_command_line()
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()


Thanks to coroutine's tornado, the code looks simple. You can write the same thing without coroutine.

Master.py file:
class MainHandler(tornado.web.RequestHandler):
    def get(self):
        # Вызываем задачу test c приоритетом 100
        self.application.crew.call('test', priority=100, callback=self._on_response)
    def _on_response(resp, headers):
        self.write("{0}: {1}".format(type(resp).__name__, str(resp)))


Our server is ready. But if we run it, and go to /, then we will not wait for an answer, there is no one to process it.

Now let's write a simple worker:

File worker.py:
# encoding: utf-8
from crew.worker import run, context, Task
@Task('test')
def long_task(req):
    context.settings.counter += 1
    return 'Wake up Neo.\n'
run(
    counter=0,      # This is a part of this worker context
)


So, as you can see in the code, there is a simple function wrapped by the Task (“test”) decorator , where test is the unique identifier of the task. Your worker cannot have two tasks with the same identifiers. Of course, it would be right to name the task “crew.example.test” (as I usually call it in the production environment), but for our example, just “test” is enough.

Context.settings.counter is immediately apparent. This is a certain context that is initialized in the worker process when the run function is called. Also, context already has context.headers - these are response headers to separate metadata from the response. In the callback function example, this dictionary is passed to _on_response.

Headers are reset after each response, but context.settings are not. I use context.settings to pass to the worker (s) function a database connection and any other object in general.

Worker also processes startup keys, there are not many of them:

$ python worker.py --help
Usage: worker.py [options]
Options:
  -h, --help            show this help message and exit
  -v, --verbose         make lots of noise
  --logging=LOGGING     Logging level
  -H HOST, --host=HOST  RabbitMQ host
  -P PORT, --port=PORT  RabbitMQ port


Database connection URLs and other variables can be taken from environment variables. Therefore, the worker in the parameters only waits for him to connect to AMQP (host and port) and the logging level.

So, run everything and check:

$ python master.py & python worker.py


image

It works, but what happened behind the screen?


When the tornado server started, tornado connected to RabbitMQ, created Exchange DLX and started listening to the DLX queue. This Dead-Letter-Exchange is a special queue that receives tasks that no worker took in a specific timeout. He also created a queue with a unique identifier, which will receive responses from workers.

After starting, the worker created in turn for each queue wrapped by the Task decorator and subscribed to them. When a task arrives, the main-loop worker creates one thread, controlling the execution time of the task in the main thread and executing the wrapped function. After returning from the wrapped function, it serializes it and queues the server responses.

After a request is received, the tornado server puts the task in the appropriate queue, while indicating the identifier of its unique queue, which should receive the response. If no worker took the task, then RabbitMQ redirects the task to exchange DLX and the tornado server receives a message that the queue timeout has expired, throwing an exception.

Hanging task


To demonstrate how the mechanism for completing tasks that hang during execution works, we write another web method and a task in worker.

Add to master.py file:

class FastHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        try:
            resp = yield self.application.crew.call(
                'dead', persistent=False, priority=255, expiration=3,
            )
            self.write("{0}: {1}".format(type(resp).__name__, str(resp)))
        except TimeoutError:
            self.write('Timeout')
        except ExpirationError:
            self.write('All workers are gone')


And add it to the list of handlers:

application = tornado.web.Application(
    [
        (r"/", MainHandler),
        (r"/stat", StatHandler),
    ],
    autoreload=True,
    debug=True,
)


And in worker.py:
@Task('dead')
def infinite_loop_task(req):
    while True:
        pass


As can be seen from the above example, the task will go into an infinite loop. However, if the task does not complete in 3 seconds (counting the time it was received from the queue), the main-loop in the worker will send a SystemExit exception to the thread. And yes, you have to process it.

Context


As mentioned above, a context is such a special object that is imported and has several built-in variables.

Let's make simple statistics on the answers of our worker.

Add the following handler to the master.py file:

class StatHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        resp = yield self.application.crew.call('stat', persistent=False, priority=0)
        self.write("{0}: {1}".format(type(resp).__name__, str(resp)))


We also register in the list of request handlers:

application = tornado.web.Application(
    [
        (r"/", MainHandler),
        (r"/fast", FastHandler),
        (r"/stat", StatHandler),
    ],
    autoreload=True,
    debug=True,
)


This handler is not very different from the previous ones, it simply returns the value that worker passed to it.

Now the task itself.

Add to the worker.py file:

@Task('stat')
def get_counter(req):
    context.settings.counter += 1
    return 'I\'m worker "%s". And I serve %s tasks' % (context.settings.uuid, context.settings.counter)


The function returns a string with information about the number of tasks processed by the worker.

PubSub and Long polling


Now we implement a couple of handlers. One upon request will simply hang and wait, and the second will receive POST data. After the transfer of the latter, the first will give them away.

master.py:

class LongPoolingHandler(tornado.web.RequestHandler):
    LISTENERS = []
    @tornado.web.asynchronous
    def get(self):
        self.LISTENERS.append(self.response)
    def response(self, data):
        self.finish(str(data))
    @classmethod
    def responder(cls, data):
        for cb in cls.LISTENERS:
            cb(data)
        cls.LISTENERS = []
class PublishHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def post(self, *args, **kwargs):
        resp = yield self.application.crew.call('publish', self.request.body)
        self.finish(str(resp))
...
application = tornado.web.Application(
    [
        (r"/", MainHandler),
        (r"/stat", StatHandler),
        (r"/fast", FastHandler),
        (r'/subscribe', LongPoolingHandler),
        (r'/publish', PublishHandler),
    ],
    autoreload=True,
    debug=True,
)
application.crew = Client()
application.crew.subscribe('test', LongPoolingHandler.responder)
if __name__ == "__main__":
    application.crew.connect()
    tornado.options.parse_command_line()
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()


Let's write the publish task.

worker.py:

@Task('publish')
def publish(req):
    context.pubsub.publish('test', req)


If you do not need to transfer control to worker, you can simply publish directly from the tornado server

class PublishHandler2(tornado.web.RequestHandler):
    def post(self, *args, **kwargs):
        self.application.crew.publish('test', self.request.body)


Parallel tasks


Often there is a situation where we can complete several tasks in parallel. Crew has a little syntactic sugar for this:

class Multitaskhandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        with self.application.crew.parallel() as mc:
            # mc - multiple calls
            mc.call('test')
            mc.call('stat')
            test_result, stat_result = yield mc.result()
            self.set_header('Content-Type', 'text/plain')
            self.write("Test result: {0}\nStat result: {1}".format(test_result, stat_result))


In this case, the task will be set two tasks in parallel and exit from with will be done at the end of the last.

But you need to be careful, as some task may cause an exception. It will be equated directly to the variable. So you need to check if test_result and stat_result are instances of the Exception class.

Future plans


When eigrad suggested writing a layer that could run any wsgi application using crew, I immediately liked this idea. Just imagine, requests will not flow to your wsgi application, but will flow evenly through the queue to wsgi-worker.

I never wrote a wsgi server and I don’t even know where to start. But you can help me, pull-requests I accept.

I also think of adding client for another popular asynchronous framework, for twisted. But while I deal with him, and there is not enough free time.

Acknowledgments


Thanks to the developers of RabbitMQ and AMQP. Great ideas.

Also thank you readers. I hope that you have not wasted your time.

Also popular now: