VShard - horizontal scaling in Tarantool

    My name is Vladislav, I participate in the development of Tarantool - the DBMS and the application server in one bottle. And today I will tell you how we implemented horizontal scaling in Tarantool using the VShard module .

    First, a little theory.

    Scaling is of two types: horizontal and vertical. Horizontal is divided into two types: replication and sharding. Replication is used to scale the calculations, sharding - to scale the data.

    Sharding is divided into two types: sharding ranges and sharding hashes.

    When sharding with ranges, we from each record in the cluster calculate some shard key. These shard keys are projected on a straight line, which is divided into ranges that we add to different physical nodes.

    Sharding hashes is simpler: we assume a hash function for each record in the cluster, we add records with the same value of the hash function to one physical node.

    I'll tell you about horizontal scaling using sharding by hashes.

    Previous implementation

    The first horizontal scaling module we had was Tarantool Shard . This is a very simple sharding hash that considers the shard key of the primary key of all entries in the cluster.

    function shard_function(primary_key)
        return guava(crc32(primary_key), shard_count)

    But then the challenge arose that Tarantool Shard was unable to cope for three fundamental reasons.

    First, the locality of logically related data was required . When we have data that is connected logically, we want to always store it on the same physical node, no matter how the topology of the cluster changes and balancing is performed. And Tarantool Shard does not guarantee this. It considers the hash only by the primary keys, and when rebalancing, even entries with the same hash can split for some time - the transfer is not atomic.

    The problem of the lack of local data prevented us the most. I will give an example. There is a bank in which the client opened an account. The data about the account and the client must always be physically stored together so that they can be read in one request, changed in one transaction, for example, when transferring money from the account. If you use classic sharding with Tarantool Shard, then accounts and customers will have different shard functions. Data may appear on different physical nodes. This greatly complicates both reading and transactional work with such a client.

    format = {{'id', 'unsigned'},
              {'email', 'string'}}
    box.schema.create_space('customer', {format = format})
    format = {{'id', 'unsigned'},
              {'customer_id', 'unsigned'},
              {'balance', 'number'}}
    box.schema.create_space('account', {format = format})

    In the example above, the fields idcan easily not be the same for accounts and clients. They are connected through the field of the account customer_idand the idclient. The same field idwould break the uniqueness of the primary key of accounts. And in another way, Shard cannot shard.

    The next problem is slow rewarding.. This is the classic problem of all shards on hashes. The bottom line is that when we change the composition of a cluster, we usually change the shard function, because it usually depends on the number of nodes. And when the function changes, you need to go through all the records of the cluster and recalculate the shard function again. Perhaps move some records. And while we are transferring them, we don’t know whether the data that is needed for the next incoming request has been transferred, maybe they are now in the process of transfer. Therefore, during resharing, it is necessary for each reading to make a request for two shard functions: the old and the new. Requests are two times slower, and for us this was unacceptable.

    Another feature of Tarantool Shard was that if some nodes failed in the replica set it shows poor read access.

    New solution

    To solve the three problems described, we created Tarantool VShard . Its key difference is that the data storage level is virtualized: virtual storage has appeared on top of the physical, and records are distributed over it. These storages are called buckets. The user does not need to think about what and on what physical node lies. A bucket is an atomic indivisible data unit, as in a classic sharding one tuple. VShard always stores buckets entirely on the same physical node and transfers all the data of one bucket atomic during the resharing. This ensures locality. We just need to put the data in one bucket, and we can always be sure that this data will be together for any changes to the cluster.

    How can I put the data in one bucket? In the scheme that we have previously introduced for a bank customer, we will add to the table a new field bucket id. If the linked data is the same, the entries will be in the same bucket. The advantage is that we can bucket idstore these records with the same in different spaces (space), and even in different engines. Localization is bucket idensured regardless of how these records are stored.

    format = {{'id', 'unsigned'},
              {'email', 'string'},
              {'bucket_id', 'unsigned'}}
    box.schema.create_space('customer', {format = format})
    format = {{'id', 'unsigned'},
              {'customer_id', 'unsigned'},
              {'balance', 'number'},
              {'bucket_id', 'unsigned'}}
    box.schema.create_space('account', {format = format})

    Why are we so eager for this? If we have a classic sharding, then the data can spread across all the physical repositories that we only have. In the example with the bank, when requesting all the accounts of a customer, it is necessary to contact all the nodes. It turns out the difficulty of reading O (N), where N is the number of physical repositories. Terribly slow.

    Thanks to buckets and locality, bucket idwe can always read data from one node in one request, regardless of the cluster size. You need to

    calculate bucket idand assign the same values ​​yourself. For some, this is an advantage, for someone a disadvantage. I consider it an advantage that you can choose the function for the calculation bucket id.

    What is the key difference between a classic sharding and a virtual one with buckets?

    In the first case, when we change the composition of the cluster, we have two states: the current (old) and the new, into which we have to go. In the course of the transition, it is necessary not only to transfer the data, but also to recalculate the hash functions for all records. This is very inconvenient, because at any point in time we do not know which data has been transferred and which is not. In addition, it is not reliable and not atomic, since for an atomic transfer of a set of records with the same hash function value, it is necessary to persistently store the transfer state in case of the need for recovery. There are conflicts, errors, you have to repeatedly restart the procedure.

    Virtual sharding is much easier. We do not have two selected cluster states; there is only a bucket state. The cluster becomes more maneuverable, it gradually moves from one state to another. And now there are more than two states. Thanks to the smooth transition, you can change the balancing on the fly, and delete the newly added storages. That is, the controllability of balancing is greatly increased, it becomes granular.


    Suppose we chose a function for bucket idand filled into the cluster so much data that there was no longer enough space. Now we want to add nodes, and that the data on them moved. In VShard, this is done as follows. First, we launch new nodes and Tarantool on them, and then update the VShard configuration. It describes all cluster members, all replicas, replica sets, masters, assigned URIs, and more. We add new nodes to the configuration, and use the function to VShard.storage.cfgapply it on all nodes of the cluster.

    function create_user(email)
       local customer_id = next_id()
       local bucket_id = crc32(customer_id)
       box.space.customer:insert(customer_id, email, bucket_id)
    function add_account(customer_id)
       local id = next_id()
       local bucket_id = crc32(customer_id)
       box.space.account:insert(id, customer_id, 0, bucket_id)

    As you remember, with classical sharding, the change in the number of nodes also changes the shard function. In VShard this does not happen, we have a fixed number of virtual storages - buckets. This is the constant that you select when starting a cluster. It may seem that because of this, scalability is limited, but in fact not. You can choose a huge number of buckets, tens and hundreds of thousands. The main thing is that they have at least two orders of magnitude more than the maximum number of replica sets that you will ever have in a cluster.

    Since the number of virtual storages does not change, and the shard function depends only on this value, we can add as many physical storages without recalculating the shard function.

    How are buckets distributed independently to physical storage? When VShard.storage.cfg is called, the rebalancing process wakes up on one of the nodes. This is an analytical process that calculates the perfect balance in a cluster. He goes to all the physical nodes, asks who has many buckets, and builds their movement routes to average distribution. The rebalancer sends routes to crowded storages, and they begin sending buckets. After some time, the cluster becomes balanced.

    But in real projects the concept of perfect balance may be different. For example, I want to store less data on one replica set than on the other, because there is less volume of hard drives. VShard thinks that everything is well balanced, and in fact my storage is about to overflow. We have provided a mechanism for adjusting balancing rules using weights. Each replica set and storage can be given a weight. When the balancer makes a decision about how many buckets to send to whom, it takes into account the relationships of all pairs of scales.

    For example, one storage has a weight of 100, and another has a 200. Then the first will store two times less buckets than the second. Please note that I am talking about attitudeweights. Absolute values ​​have no influence. You can choose weights based on a 100% distribution across the cluster: one storage has 30%, the other has 70%. You can take as a basis the storage capacity in gigabytes, or you can measure weights in the number of buckets. The main thing is to observe the attitude you need.

    Such a system has an interesting side effect: if some storage is assigned zero weight, then the balancer will order the storage to distribute all its buckets. After that you can remove the entire replica set from the configuration.

    Atomic transfer bucket'a

    We have a bucket, it accepts any requests for reading and writing, and here the balancer asks to transfer it to another storage. Bucket stops accepting write requests, otherwise it will have time to update during the transfer, then they will have time to update the portable update, then the portable update of the update, and so on to infinity. Therefore, the record is blocked, but you can still read from the bucket. The transfer of chunks to a new location begins. After the transfer is complete, the bucket will begin accepting requests again. At the old place it also lies, but it is already marked as garbage, and subsequently the garbage collector will remove it chunk after chunk.

    Associated with each bucket is metadata that is physically stored on disk. All the above steps are saved to disk, and whatever happens to the storage, the state of the bucket will be automatically restored.

    You might have questions:

    • What will happen to those queries that worked with the bucket when it began to be transferred?

      In the metadata of each bucket, there are two types of links: read and write. When a user makes a request to a bucket, he indicates how it will work with it, read only or read write. For each request, the corresponding reference count is incremented.

      What is the reference count for reading requests? Suppose a bucket is quietly transferred, and then the garbage collector comes and wants to delete this bucket. He sees that the reference count is greater than zero, and therefore cannot be deleted. And when requests are processed, the garbage collector will be able to complete its work.

      The counter of references to the writing requests guarantees that the bucket will not even begin to be transferred while at least one writing request is working with it. But after all writing requests can come constantly, and then the bucket will never be transferred. The fact is that if the balancer has expressed a desire to transfer it, then new write requests will begin to block, and the system will wait for the completion of the current ones during some timeout. If the requests are not completed within the allotted time, the system will again begin to accept new write requests, postponing the transfer of the bucket for a while. Thus, the balancer will attempt to carry over until one is successful.

      VShard has a low-level bucket_ref API in case you have few high-level features. If you really want to do something yourself, it is enough to access this API from the code.
    • Can I not block records at all?

      It is impossible. If the bucket contains critical data that you need constant write access to, you will have to block its transfer altogether. There is a function for this bucket_pin, it rigidly attaches the bucket to the current replica set, without allowing its transfer. At the same time, neighboring buckets will be able to move without restrictions.

      There is an even stronger tool than bucket_pin- replica set blocking. It is already done not in the code, but through the configuration. Locking prohibits moving any buckets from this replica set and accepting new ones. Accordingly, all data will be permanently available for recording.


    VShard consists of two submodules: VShard.storage and VShard.router. They can be independently created and scaled even on one instance. When referring to the cluster, we do not know where the bucket is, and bucket idVShard.router will look for it for us.

    Let's look at an example of how it looks. We return to the bank cluster and client accounts. I want to be able to pull out all the accounts of a specific client from the cluster. To do this, I write the usual function for local search:

    It searches for all customer accounts by its id. Now I need to decide on which of the repositories to call this function. To do this bucket id, I calculate from the client ID in my request and ask VShard.router to call me such and such a function in the storage where the bucket lives with the resultingbucket id. In the submodule there is a routing table in which the location of the buckets in the replica set is recorded. And VShard.router proxies my request.

    Of course, it may happen that at this time the resharing began and the buckets began to move. The router in the background gradually updates the table in large chunks: it requests the repositories of their actual bucket tables.

    It may even happen that we turn to a bucket that has just moved, and the router has not yet had time to update its routing table. Then he will turn to the old storage, but it will either tell the router where to find the bucket, or simply answer that it does not have the necessary data. Then the router will go around all the vaults in the search for the desired bucket. And all this is transparent to us, we will not even notice a slip in the routing table.

    Reading instability

    Recall what problems we originally had:

    • There was no local data. Decided by adding buckets.
    • Resharding slowed down everything and slowed down himself. Implemented atomic data transfer with buckets, eliminated recalculation of the shard function.
    • Unstable reading.

    The last problem is solved by VShard.router using the automatic read failover subsystem.

    The router periodically pings the storage specified in the configuration. And one of them stopped pinging. The router has a hot backup connection to each replica, and if the current one stops responding, it will go to another. The read request will be processed normally, because on the replicas we can read (but not write). We can set the priority of the replicas, according to which the router must choose failover for readings. We do this with zoning.

    Assign a zone number to each replica and each router and set the table, in which we indicate the distance between each pair of zones. When the router decides where to send the read request, it will select a cue in the zone closest to its own.

    How it looks in the configuration:

    In general, you can refer to an arbitrary replica, but if the cluster is large and complex, very much distributed, then zoning is very useful. Zones can be different server racks so as not to load the network with traffic. Or it may be geographically distant points.

    Also, zoning helps with different performance replicas. For example, we have one backup replica in each replica set, which should not accept requests, but only keep a copy of the data. Then we do it in a zone that will be very far from all the routers in the table, and they will turn to it as a last resort.

    Recording instability

    Since we started talking about read failover, what about write failover when changing the wizard? Here, VShard does not have everything so rosy: the election of a new master is not implemented in it, you will have to do it yourself. When we have somehow chosen it, it is necessary that this instance now takes over the authority of the master. We update the configuration, specifying for the old master master = false, and for the new one master = true, apply via VShard.storage.cfg and roll it out to the vaults. Then everything happens automatically. The old master stops accepting write requests and starts synchronizing with the new one, because there may be data that has already been applied on the old master, and the new one has not yet arrived. After that, the new master enters the role and begins to accept requests, and the old master becomes a replica. This is how write failover works in VShard.

    replicas = new_cfg.sharding[uud].replicas
    replicas[old_master_uuid].master = false
    replicas[new_master_uuid].master = true

    How now to monitor all these diversity of events?

    In the general case, two handles are enough - VShard.storage.infoand VShard.router.info.

    VShard.storage.info shows information in several sections.

    - replicasets:
          uuid: <replicaset_2>
             uri: storage@
          uuid: <replicaset_1>
          master: missing
          receiving: 0
          active: 0
          total: 0
          garbage: 0
          pinned: 0
          sending: 0
       status: 2
          status: slave
       - ['MISSING_MASTER', 'Master is not configured for ''replicaset <replicaset_1>']

    The first is the replication section. Here you can see the status of the replica set, to which you have applied this function: what is its replication lag, with whom it has connections and with whom it does not, who is available and not available, to whom which master is configured, etc.

    In the Bucket section, you can see in real time how many bucket'es are being moved to the current replica set, how many are leaving, how many are working on it in normal mode, how many are marked as garbage, and how much is attached.

    The Alert section is such a hodgepodge of all the problems that VShard was able to independently determine: the master is not configured, the insufficient redundancy level, the master is there, but all the replicas have failed, etc.

    And the last section is a light bulb that lights up red when everything gets very bad. It is a number from zero to three, the more, the worse.

    VShard.router.info has the same sections, but they mean a little more.

    - replicasets:
          replica: &0
             status: available
             uri: storage@
             uuid: 1e02ae8a-afc0-4e91-ba34-843a356b8ed7
             available_rw: 500
          uuid: <replicaset_2>
          master: *0
          replica: &1
             status: available
             uri: storage@
             uuid: 8a274925-a26d-47fc-9e1b-af88ce939412
             available_rw: 400
             uuid: <replicaset_1>
          master: *1
             unreachable: 0
             available_ro: 800
             unknown: 200
             available_rw: 700
          status: 1
    - ['UNKNOWN_BUCKETS', '200 buckets are not discovered']

    The first section is replication. But only here there are not replication lags, but information about accessibility: what connections the router has, what replica set they hold, what connection is hotter and what reserve in case the master fails, who is chosen by the master, on which replica set'e how many buckets available for reading and writing, how much is available only for reading.

    The Bucket section displays the total number of buckets that are currently available for reading and writing or only for reading on this router; about the location of how many buckets the router does not know; or knows, but does not have a connection to the desired replica set.

    In the Alert section, basically, it tells about connections, about failover operations, about unidentified buckets.

    Finally, there is also the simplest indicator from zero to three.

    What do you need to use VShard?

    The first is to choose a constant number of buckets. Why can't I just ask with help int32_max? Because with each bucket, metadata is stored - 30 bytes in the storage and 16 bytes on the router. The more buckets you have, the more space the metadata takes. But at the same time, you will have a smaller size of the bucket, which means a higher granularity of the cluster and the transfer speed of one bucket. So you have to choose what is more important to you and what kind of scalability you want to lay.

    The second is to select the shard function to calculatebucket id. Here, the rules are the same as when choosing a shard function for classic sharding, because a bucket is as if we were recording the number of storages in a classic sharding. The function must evenly distribute the output values, otherwise the size of the buckets will grow unevenly, and VShard operates only with the number of buckets. And if you do not balance your shard function, then you will have to shift the data from the bucket to the bucket, change the shard function. Therefore, it is necessary to choose carefully.


    Vshard provides:

    • data locality;
    • atomic resharding;
    • higher cluster maneuverability;
    • automatic read failover;
    • many control knobs bucket'ami.

    VShard is now actively developing. The implementation of some planned tasks has already begun. The first is load balancing on the router . There are heavy requests for reading, and it is not always advisable to load them with a master. Let the router could independently balance requests for different reading replicas.

    The second is the lock-free transfer of buckets . An algorithm has already been implemented, with the help of which it is possible not to block buckets for writing even for the duration of the transfer. This will have to be done only at the end to fix the fact of the transfer itself.

    The third is the atomic application of the configuration.. Independently applying the configuration to all the repositories is inconvenient and not atomic: some storage may not be available, the configuration has not been applied, and what then? Therefore, we are working on an automatic configuration distribution mechanism.

    Original of my report

    Also popular now: