ZooKeeper or write a distributed lock service

    disclaimer It so happened that for the last month I have been sorting out with ZooKeeper, and I had a desire to systematize what I found out, actually the post about it, and not about the lock service, as you might think from the name. Go!

    When moving from multi-threaded programming to programming distributed systems, many standard techniques stop working. One of these techniques is synchronized, since their scope is limited to one process, therefore, they not only do not work on different nodes of a distributed system, but also not between different application instances on the same machine; it turns out that you need a separate mechanism for locking.

    It is reasonable to require a distributed lock service:
    1. working capacity in conditions of network blinking (the first rule of distributed systems is that the network is unreliable to no one to talk about distributed systems )
    2. lack of a single point of failure

    ZooKeeper will help us create such a service.

    imageWikipedia says that ZooKeeper is a distributed configuration and synchronization service , I don’t know about you, but this definition does not reveal much to me. Looking back on my experience, I can give an alternative definition of ZooKeeper, this is a distributed key / value storage with the following properties :
    • the key space forms a tree (hierarchy similar to a file system)
    • values ​​can be contained in any node of the hierarchy, and not just in leaves (as if the files were also directories at the same time), the hierarchy node is called znode
    • there is bidirectional communication between the client and the server, therefore, the client can be subscribed as a change in a specific value or part of a hierarchy
    • it is possible to create a temporary key / value pair that exists while the client who created it is connected to the cluster
    • all data must be stored in memory
    • death resistance of non-critical number of cluster nodes

    Acquaintance with an unfamiliar system should begin first of all with the API that it offers, so

    Supported Operations


    existschecks for the existence of znode and returns its metadata
    createcreates znode
    deleteremoves znode
    getDatagets data associated with znode
    setDataassociates new data with znode
    getChildrengets children of the specified znode
    syncwaiting for the synchronization of the cluster node to which we are connected, and the wizard.

    These operations can be divided into the following groups
    callbackCas
    existsdelete
    getDatasetData
    getChildrencreate
    sync

    Callback - read-only operations to which callbacks can be indicated, the callback will work when the requested entity changes. A callback will work no more than once, in the case when you need to constantly monitor the value, you need to constantly re-sign in the event handler .

    CAS - write requests. The problem of concurrent access in ZooKeeper has been resolved through compare-and-swap : its version is stored with each znode, when changing it, you must indicate if the znode has already been changed, the version does not match and the client will receive a corresponding exception. Operations from this group require specifying the version of the object being modified.

    create- creates a new znode (key / value pair) and returns the key. It seems strange that the key is returned if it is indicated as an argument, but the fact is that ZooKeeper can be prefixed with the key and say that znode is sequential, then an aligned number will be added to the prefix and the result will be used as the key. It is guaranteed that by creating sequential znode with the same prefix, the keys will form an increasing (in the lexico-graphic sense) sequence .

    In addition to serial znode, you can create ephemeral znode that will be deleted as soon as the client who created them disconnect (I remind you that the connection between the cluster and the client in ZooKeeper is kept open for a long time). Ephemeral znode cannot have children.

    Znode can be both ephemeral and sequential.

    sync - synchronizes the cluster node to which the client is connected with the master. For good, it should not be called, since synchronization occurs quickly and automatically. About when to call it will be written below.

    Based on sequential ephemeral znode and subscriptions for their removal, you can easily create a distributed lock system.

    Distributed lock system


    In fact, everything was invented before us - we go to the ZooKeeper website in the recipe section and look for the blocking algorithm there:

    1. Create an ephemeral serial znode using "_locknode_ / guid-lock-" as a prefix, where _locknode_ is the name of the resource that we are locking and guid is the freshly generated guide
    2. Get the list of children _locknode_ without an event subscription
    3. If the znode created in the first step has the minimum numerical suffix in the key: exit the algorithm - we grabbed the resource
    4. Otherwise, we sort the list of children by suffix and call exists with a callback on znode, which is in the list before what we created in step 1
    5. If you get false, go to step 2, otherwise we are waiting for the event and go to step 2

    To check the assimilation of the material, try to understand it yourself; in descending or ascending order, you need to sort the list in step 4.

    Since, in the event of a fall in any operation during the operation of ZooKeeper, we cannot find out whether the operation passed or not , we need to take this check to the application level. Guid is needed just for this: knowing it and asking the children, we can easily determine whether we have created a new node or not, and the operation is worth repeating.

    By the way, I didn’t say that, but I think you already guessed that to calculate the suffix for a sequential znode, it’s not using a unique prefix sequence, but a unique parent sequence in which znode will be created.

    Wtf


    In theory, it would be possible to finish, but as practice has shown, the most interesting begins - wtf 'ki. By wtf, I mean the discrepancy between my intuitive ideas about the system and its real behavior, attention, wtf does not carry a value judgment, in addition, I perfectly understand why the creators of ZooKeeper made such architectural decisions.

    WTF # 1 - turn the code inside out


    Any API method can throw a checked exception and force you to handle it. This is not familiar, but correct, since the first rule of distributed systems is that the network is not reliable. One of the exceptions that can fly is loss of connection (blinking network). Do not confuse the loss of the connection with the cluster node (CONNECTIONLOSS), in which the client will restore it with the saved session and callbacks (will connect to another or wait), and force the connection to be closed from the cluster and the loss of all callbacks (SESSIONEXPIRED), in this case the task of restoring the context lies with the programmer. But we moved away from the topic ...

    How to handle winks? In fact, withWhen opening a connection with a cluster, we specify a callback that is called repeatedly, and not just once, like the others, and which delivers events about the connection being lost and restored . It turns out when the connection is lost, you need to pause the calculations and continue them when the desired event arrives.

    Does this remind you of anything? On the one hand - events, on the other - the need to "play" with the flow of the program, in my opinion somewhere near continuation and the monad.

    In general, I designed the steps of the program in the form:

    public interface Task {
        Task continueWith(Task continuation); // объединяем шаги в цепочку
        void run(Executor context, Object arg); // нормальное выполнение 
        void error(Executor context, Exception error); // вместо того, чтобы кидать исключение - передаем его
    }
    

    where is executor

    public interface Executor {
        void execute(Task task, Object arg, Timer timeout); // timeout ограничивает время выполнения таски передавая/кидая TimeoutException в error таски, создавая мягкий real-time
    }
    

    Adding the necessary combinators, you can build the following programs, somewhere using idempotency of steps, somewhere explicitly cleaning up the garbage:

    image

    • the square is a useful operation, and the arrow is the execution thread and / or error stream
    • rhombus - a combinator that ignores the specified errors and repeats the last useful operation
    • honeycomb is a combinator that performs operation A in the case of a normal flow of execution, and operation B in the case of a specified error
    • rounded parallelepiped - a combinator that launches a successful thread of execution, and the erroneous one immediately goes further

    When implementing Executor, I added wrapper functions to the ZooKeeper class in it so that it is the handler of all events and decides for itself which Watcher (event handler to call). Inside the implementation, I placed three BlockingQueue and three threads that read them, as a result it turned out that when an event arrives it is added to eventQueue, thereby the stream almost instantly returns to ZooKeeper internals, by the way, inside ZooKeeper all Watchers work in one thread , therefore, a situation is possible when processing one event blocks all the others and ZooKeeper itself. In the second turn of taskQueue are added Task'i together with arguments. The processing of these queues (eventQueue and taskQueue) is allotted by stream, eventThread and taskThread, respectively, these threads read their queues and wrap each incoming object in Job and put in the jobQueue, which is associated with its own thread, which actually runs the task code or handler messages. In case of a connection failure, the taskThread thread is suspended, and in case of a network uplift, it resumes. Executing code of shuffles and handlers in one thread allows you not to worry about locks and facilitates business logic.

    WTF # 2 - the main server


    We can say that in ZooKeeper the server (cluster) is the main one, and clients have practically no rights. Sometimes this goes to the absolute, for example ... In the ZooKeeper configuration there is such a parameter as session timeout, it determines how much the connection between the cluster and the client can disappear, if the maximum is exceeded, then the session of this client will be closed and all ephemeral znode of this client will be deleted; if the connection is restored, the client will receive the SESSIONEXPIRED event. So, when the connection is lost (CONNECTIONLOSS) and the session timeout is exceeded, it stupidly waits and does nothing, although, on the idea, he could guess that the session was dead and throw SESSIONEXPIRED to its handlers.

    Due to this behavior, the developer at some point tear his hair out, for example, you raised the ZooKeeper server and try to connect to it, but you made a mistake in the config and knock on the wrong address, or on the wrong port, then, according to the behavior described above , you will just wait until the client enters the CONNECTED state and receive no error message, as would be the case with MySQL or something similar.

    Interestingly, this scenario allows you to painlessly update ZooKeeper on production:
    • turn off ZooKeeper - all clients enter the CONNECTIONLOSS state
    • updating ZooKeeper
    • we turn on ZooKeeper, the connection with the server was restored, but the server does not send SESSIONEXPIRED, since the time for the disconnection was stopped relative to the server

    By the way, it is precisely because of this behavior that I pass to the Executor Timer, which cancels the execution of the Task if we cannot connect for too long.

    WTF # 3 - Int overflows


    Suppose you implemented locks according to the algorithm described above and started this business in highload production, where, say, you take 10MM locks per day. About a year later, you find yourself in hell - locks will stop working. The fact is that after a year the zversion counter cversion overflows and the principle of a monotonously increasing sequence of consecutive znode names is violated , and our implementation of locks is based on this principle.

    What to do? It is necessary to periodically delete / re-create _locknode_ - while the counter associated with it is reset and the principle of a monotonous sequence is violated again, but the fact is that znode can only be deleted when it has no childrenand now guess yourself why resetting cversion for _locknode_ when there are no children in it does not affect the locking algorithm.

    WTF # 4 - quorum write but not read


    When ZooKeeper returned OK to the write request, this means that the data was written to the quorum (most machines in the cluster, in the case of 3 machines, the quorum consists of 2), but when reading the user receives data from the machine to which he connected. That is, a situation is possible when the user receives old data.

    In the case when clients do not communicate otherwise than through ZooKeeper, this is not a problem, since all operations in ZooKeeper are strictly ordered and then there is no way to find out that the event happened except how to wait for it. Guess for yourself why it follows from this that everything is fine. In fact, the client may know that the data was updated, even if no one told him - in the case when he made the changes himself, but ZooKeeper supportsread your writes consistency , so this is not a problem.

    But nevertheless, if one client learned about the change in part of the data through the ZooKeeper communication channel, forced synchronization can help him - this is what the sync command is for .

    Performance


    Most distributed key / value storages use distributed to store large amounts of data. As I already wrote, the data that ZooKeeper stores in itself should not exceed the size of RAM, it is asked why it is distributed - it is used to ensure reliability. Remembering the need to get a quorum for recording, it is not surprising that the performance drops by 15% when using a cluster of three machines, compared to one machine.

    Another feature of ZooKeeper is that it provides persistence - it is also necessary for it, since the time it takes to write to disk is included during request processing.

    And the last blow to performance is due to strict ordering of requests - to ensure that all write requests go through one cluster machine.

    I tested on a local laptop, yes it strongly resembles:
    image
    Devops borat
    Big Data Analytic is show 90% of devops which are use word 'benchmark' in sentence are also use word 'laptop' in same sentence.
    but it’s obvious that ZooKeeper performs best in a single-node configuration, with a fast disk and a small amount of data, so my X220 with SSD and i7 was ideal for this. I mainly tested write requests.

    The performance ceiling was somewhere around 10K operations per second during intensive recording, it takes 1ms to write, therefore, from the point of view of one client, the server can run no faster than 1K operations per second.

    What does it mean? In conditions when we don’t rest against the disk (utilization of ssd at the level of 10%, for the sake of fidelity, I also tried to place data in memory via ramfs - I got a small increase in performance), we run into cpu. In total, it turned out that ZooKeeper is only 2 times slower than the numbers indicated by its creators on the site, which is not bad considering that they know how to get everything out of it.

    Summary


    Despite everything I wrote here, ZooKeeper is not as bad as it may seem. I like its laconicism (only 7 teams), I like the way it pushes and directs its API programmer to the right approach when developing distributed systems, namely, everything can fall at any time, because every operation should leave the system in a consistent state . But these are my impressions, they are not so important as the fact that ZooKeeeper solves the tasks for which it was created well, including: storing cluster configs, monitoring cluster status (number of connected nodes, node status), node synchronization (blocking , barriers) and communication of nodes of a distributed system (a-la jabber).

    I will list again what is worth keeping in mind when developing with ZooKeeper:
    • main server
    • the client receives notification of the record only when the data got to disk
    • quorum write + read your writes consistency
    • strict ordering
    • at any point in time, everything may fall, so after each change the system must be in a consistent state
    • in the event of a loss of communication, we are in a state in which the state of the last write operation is unknown
    • explicit error handling (for me the best strategy is to use CPS)


    About distributed locks


    Returning to the blocking algorithm described above, I can say that it does not work, more precisely, it works smoothly until actions inside the critical section occur on the same and only the same ZooKeeper cluster that is used for blocking. Why is that? - Try to guess for yourself. And in the next article I will write how to make distributed locks more honest and extend the class of operations inside the critical section to any key / value storage with CAS support.

    Some links to information on ZooKeeper


    zookeeper.apache.org
    outerthought.org/blog/435-ot.html
    highscalability.com/zookeeper-reliable-scalable-distributed-coordination-system
    research.yahoo.com/node/3280

    Also popular now: