Effective multithreading in Python

I want to share a simple recipe on how to efficiently perform a large number of http requests and other I / O tasks from regular Python. The most correct thing that could be done is to use asynchronous frameworks like Tornadoes or gevent. But sometimes this option is not suitable, because it is problematic to integrate an event loop into an existing project.

In my case, a Django application already existed, from which about once a month it was necessary to upload some very small files to AWS s3. Time passed, the number of files began to approach 50 thousand, and uploading them in turn became tedious. As you know, s3 does not support multiple updates in a single PUT request, and the experimentally established maximum speed of requests from the ec2 server in the same data center does not exceed 17 per second (which is not small, by the way). Thus, the update time for 50 thousand files began to approach one hour.

From childhood, pythonists know that there is no sense in using threads (operating system threads) due to the global interpreter lock. But few realize that, like any lock, this one is freed from time to time. In particular, this happens during input-output operations, including network operations. This means that threads can be used to parallelize http-requests - while one thread is waiting for a response, the other quietly processes the result of the previous one or prepares the next.

It turns out that we just need a pool of threads that will execute requests. Fortunately, such a pool has already been written. Starting with version 3.2, a library has appeared in Python to unify all asynchronous work concurrent.futures. For the second version of Python, there is a backport called futures . The code is ugly simple:

from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(concurrency) as executor:
    for _ in executor.map(upload, queryset):
        pass

Here concurrencyis the number of worker threads, uploadis the function that performs the task itself, querysetis an iterator of objects that will be passed to the task one by one. Already this code with concurrency of 150 could cram ≈450 requests per second on the Amazon server.

Here a remark is needed regarding the tasks: they must be thread safe. Those. several parallel tasks should not have shared resources, or should be managed correctly. The interpreter’s global lock is a bad helper here - it does not guarantee that the flow will not be interrupted in the most inappropriate place. If you use only urllib3, requests or boto, there is nothing to worry about, they are already thread safe. Other libraries need to be clarified. Also, your own code may be thread-safe.

As time passed, the number of files began to approach 200 thousand. What do you think, how much memory can occupy 200 thousand Django-models? What about 200 thousand futures? And 200 thousand tasks? All together about a gigabyte. It became clear that sending everything to the executor all at once was not an option. But why not add new tasks at the end of the previous ones? At the very beginning, add the number of tasks equal to the number of threads, keep track of how many tasks are set, how many are completed. We do not store futures, we do not give out. It turns out a very cool function that can be reused (carefully, this is not the final version):

from concurrent.futures import ThreadPoolExecutor, Future
def task_queue(task, iterator, concurrency=10):
    def submit():
        try:
            obj = next(iterator)
        except StopIteration:
            return
        stats['delayed'] += 1
        future = executor.submit(task, obj)
        future.add_done_callback(upload_done)
    def upload_done(future):
        submit()
        stats['delayed'] -= 1
        stats['done'] += 1
    executor = ThreadPoolExecutor(concurrency)
    stats = {'done': 0, 'delayed': 0}
    for _ in range(concurrency):
        submit()
    return stats

It has only three actions: a function submitthat selects the next object from the iterator and creates a task for it upload_done, which is called at the end of the task and sets the next one, and a cycle in which the first tasks are set. Try to run:

stats = task_queue(upload, queryset.iterator(), concurrency=5)
while True:
    print '\rdone {done}, in work: {delayed}  '.format(**stats),
    sys.stdout.flush()
    if stats['delayed'] == 0:
        break
    time.sleep(0.2)

Great, it works! Here the method of iteratorqueriset is already used . It seems that it could be used in the first example with a function executor.map, but the executor.mapwhole iterator immediately selects and makes it useless. Immediately, objects are really selected, one for each working thread.

True, there is a problem: if you increase the number of threads, the exceptions “ValueError: generator already executing” begin to pour in. The code uses the same generator from all threads, so sooner or later two threads try to select values ​​at the same time (in fact, this can happen when there are only two threads, but with less probability). The same applies to counters, sooner or later two processes simultaneously consider the same value, then both will add one and both will write “initial number + 1”, and not “initial number + 2”. Therefore, all work with shared objects must be wrapped in locks.

There are other problems. There is no handling of errors that may occur during task execution. If you interrupt execution using ctrl + c, an exception will be thrown in the main thread, and the rest will continue to execute until the very end, therefore, a mechanism for forcing the queue to end. The executor just has a shutdown method for these purposes and it would be possible to give the executor outward to stop it when the user press ctrl + c. But there is a better option: you can create futures that will resolve at the end of all work and clean up the executor if someone from outside cancels it. Here is the version that takes into account all these errors:

def task_queue(task, iterator, concurrency=10, on_fail=lambda _: None):
    def submit():
        try:
            obj = next(iterator)
        except StopIteration:
            return
        if result.cancelled():
            return
        stats['delayed'] += 1
        future = executor.submit(task, obj)
        future.obj = obj
        future.add_done_callback(upload_done)
    def upload_done(future):
        with io_lock:
            submit()
            stats['delayed'] -= 1
            stats['done'] += 1
        if future.exception():
            on_fail(future.exception(), future.obj)
        if stats['delayed'] == 0:
            result.set_result(stats)
    def cleanup(_):
        with io_lock:
            executor.shutdown(wait=False)
    io_lock = threading.RLock()
    executor = ThreadPoolExecutor(concurrency)
    result = Future()
    result.stats = stats = {'done': 0, 'delayed': 0}
    result.add_done_callback(cleanup)
    with io_lock:
        for _ in range(concurrency):
            submit()
    return result

Here it is necessary to use reentrant lock, because there is a certain probability that a very short task will be completed before the handler is hung in add_done_callback, and then the handler will be executed immediately in the same thread and try to capture the lock again. It turns out deadlock. Reentrant lock will allow the same thread that captured it for the first time to calmly enter again, but will not allow itself to be captured from another thread until the first thread releases it as many times as it captured. The code that uses this task queue changes a bit:

from concurrent.futures import ThreadPoolExecutor, Future, TimeoutError
result = task_queue(upload, queryset.iterator(), concurrency=5)
try:
    while not result.done():
        try:
            result.result(.2)
        except TimeoutError:
            pass
        print '\rdone {done}, in work: {delayed}  '.format(**result.stats),
        sys.stdout.flush()
except KeyboardInterrupt:
    result.cancel()
    raise

You no longer need to stupidly fall asleep every 200 milliseconds, you can fall asleep smartly, waiting for the completion of the queue. And in case of interruption, stop the queue.

It was getting dark. As time passed, the number of files began to approach 1.5 million. Despite the fact that everything looked as if everything worked with fixed memory consumption (the number of threads, futures and Django models should not change throughout the execution), memory consumption was still growing. It turned out thatqueryset.iterator()not working a bit as expected. Objects are really created only when explicitly selected from the iterator, but the raw database response is still raked by the driver immediately. It turns out about 500 megabytes per million lines. The solution to this problem is quite obvious: you need to make requests not to all objects at once, but to share portions. At the same time, offset sampling should be avoided, because a query of the form LIMIT 100 OFFSET 200000 actually means that the DBMS needs to go over 200100 records. Instead of shifting, you should use sampling by a field with an index.

def real_queryset_iterator(qs, pk='pk', chunk_size=5000):
    qs = qs.order_by(pk)
    chunk = list(qs[:chunk_size])
    while chunk:
        for item in chunk:
            yield item
        last_pk = getattr(chunk[-1], pk)
        chunk = list(qs.filter(**{pk + '__gt': last_pk})[:chunk_size])

Here pk is more of a pagination key than primary. However, often primary is well suited for this role. Such an iterator really consumes a fixed amount of memory and works no slower than fetching at a time. But if you increase the number of threads, another problem arises. In the Jung, database connections are local to the threads, so when the next thread makes a request, a new connection is created. Sooner or later, the number of connections reaches a critical number and an exception similar to this occurs:

OperationalError: FATAL:  remaining connection slots are reserved for non-replication superuser connections

The correct solution would be to use the same connection for all threads, as we have already limited the ability to simultaneously make requests from different threads. There are no standard tools for this in Dzhang, but this can be done using a hack, replacing the object threading.localwith a regular object:

from django.db import connections, DEFAULT_DB_ALIAS
connections._connections = type('empty', (object,), {})()
connections[DEFAULT_DB_ALIAS].allow_thread_sharing = True

But you need to understand that this will kill the thread safety of the database throughout the rest of the application, so this option is only suitable for commands launched from the console. A more humane option is to close the connection after each request, or after each element, which gives a not very large overhead.

def close_connection_iterator(iterator, db=DEFAULT_DB_ALIAS):
    for item in iterator:
        connections[db].close()
        yield item
result = task_queue(
    upload,
    close_connection_iterator(real_queryset_iterator(queryset)),
    concurrency=150
)

There is a third solution: use a separate thread, which will communicate with the database, returning objects to other threads. This option does not break anything in the rest of the application and does not introduce the overhead of constantly reopening connections. But its implementation is quite complicated and draws no less than a separate article.

Perhaps more time will pass, the number of files will increase to 10 million and new problems will appear. But so far it seems that the main problem will be that such an update will take about eight hours and will cost $ 50 only for PUT requests at current Amazon prices.

Some theses from the read:
  1. Threads for I / O on Python work well, but you need to take care of isolation.
  2. Dozens and hundreds of thousands of tasks must be run very carefully, monitoring memory consumption.
  3. queryset.iterator() in Dzhangovskaya ORM does not work quite as expected.

Helpers task_queueand real_queryset_iteratoron the github:
https://gist.github.com/homm/b8caf60c11997da69b1e

Also popular now: