FAQ on architecture and work VKontakte

    The history of VKontakte is on Wikipedia, it was told by Pavel himself. It seems that everyone already knows her. Pavel talked about the insides, architecture and design of the site on HighLoad ++ back in 2010 . A lot of servers have leaked since then, so we will update the information: we dissect, pull out the insides, weigh - we look at the VK device from a technical point of view.



    Alexey Akulovich ( AterCattus ) is a backend developer on the VKontakte team. The transcript of this report is a collective answer to frequently asked questions about the operation of the platform, infrastructure, servers and the interaction between them, but not about development, namely , hardware. Separately - about databases and what VK has in their place, about collecting logs and monitoring the entire project as a whole. Details under the cut.



    For more than four years I have been doing all kinds of tasks related to the backend.

    • Download, storage, processing, distribution of media: video, live streaming, audio, photos, documents.
    • Infrastructure, platform, developer monitoring, logs, regional caches, CDN, proprietary RPC protocol.
    • Integration with external services: push mailing, parsing of external links, RSS feed.
    • Help colleagues on various issues, for the answers to which you have to plunge into an unknown code.

    During this time, I had a hand in many components of the site. I want to share this experience.

    General architecture


    Everything, as usual, starts with a server or a group of servers that accept requests.

    Front server


    The front server accepts requests over HTTPS, RTMP, and WSS.

    HTTPS are requests for the main and mobile web versions of the site: vk.com and m.vk.com, and other official and unofficial clients of our API: mobile clients, instant messengers. We have RTMP traffic for live broadcasts with separate front servers and WSS connections for the Streaming API.

    For HTTPS and WSS, nginx is installed on the servers . For RTMP broadcasts, we recently switched to our own kive solution .but it is beyond the scope of the report. For fault tolerance, these servers announce shared IP addresses and act as groups so that in case of a problem on one of the servers, user requests are not lost. For HTTPS and WSS, these same servers encrypt traffic to take part of the CPU load on themselves.

    Further we will not talk about WSS and RTMP, but only about standard HTTPS requests, which are usually associated with a web project.

    Backend


    Behind the front are usually the backend servers. They handle requests that the front server receives from clients.

    These are kPHP servers running the HTTP daemon because HTTPS is already decrypted. kPHP is a server that works according to the prefork model : it starts the master process, a bunch of child processes, passes listening sockets to them and they process their requests. In this case, the processes are not restarted between each request from the user, but simply reset their state to the initial zero-value state - request by request, instead of restarting.

    Load distribution


    All our backends are not a huge pool of machines that can handle any request. We divide them into separate groups : general, mobile, api, video, staging ... The problem on a separate group of machines will not affect everyone else. In case of problems with the video, the user who is listening to music does not even know about the problems. Which backend to send the request to is solved by nginx on the front in the config.

    Metrics collection and rebalancing


    To understand how many cars you need in each group, we do not rely on QPS . The backends are different, they have different requests, each request has different QPS calculation complexity. Therefore, we use the concept of load on the server as a whole - on the CPU and perf .

    We have thousands of such servers. The kPHP group is running on each physical server to utilize all the kernels (because kPHP is single-threaded).

    Content server


    CS or Content Server is storage . CS is a server that stores files, and also processes uploaded files, all kinds of synchronous background tasks that the main web frontend poses for it.

    We have tens of thousands of physical servers that store files. Users love to upload files, and we love to store and share them. Some of these servers are closed by special pu / pp servers.

    pu / pp


    If you opened the network tab in VK, then you saw pu / pp.



    What is pu / pp? If we close one server after another, then there are two options for uploading and uploading a file to a server that was closed: directly through http://cs100500.userapi.com/pathor through an intermediate server - http://pu.vk.com/c100500/path.

    Pu is the historical name for photo upload, and pp is photo proxy . That is, one server to upload photos, and another - to give. Now not only photos are loaded, but the name has been preserved.

    These servers terminate HTTPS sessions.to remove processor load from storage. Also, since user files are processed on these servers, the less sensitive information is stored on these machines, the better. For example, HTTPS encryption keys.

    Since the machines are closed by our other machines, we can afford not to give them “white” external IPs, and give “gray” ones . So we saved on the IP pool and guaranteed to protect the machines from access from outside - there simply is no IP to get to it.

    Fault tolerance through shared IP . In terms of fault tolerance, the scheme works the same way - several physical servers have a common physical IP, and the piece of iron in front of them chooses where to send the request. Later I will talk about other options.

    The controversial point is that in this casethe client holds fewer connections . If there is the same IP on several machines - with the same host: pu.vk.com or pp.vk.com, the client browser has a limit on the number of simultaneous requests to one host. But during the ubiquitous HTTP / 2, I believe that this is no longer the case.

    The obvious minus of the scheme is that you have to pump all the traffic that goes to the storage through another server. Since we pump traffic through cars, we cannot yet pump heavy traffic in the same way, for example, video. We transfer it directly - a separate direct connection for individual repositories specifically for video. We transmit lighter content through a proxy.

    Not so long ago, we have an improved version of proxy. Now I’ll tell you how they differ from ordinary ones and why this is necessary.

    Sun


    In September 2017, Oracle, which had previously bought Sun, laid off a huge number of Sun employees . We can say that at this moment the company ceased to exist. Choosing a name for the new system, our admins decided to pay tribute and respect to this company, and named the new Sun system. Between ourselves, we call it simply “sunshine”.



    Pp had a few problems. One IP per group is an inefficient cache . Several physical servers have a common IP address, and there is no way to control which server the request will come to. Therefore, if different users come for the same file, then if there is a cache on these servers, the file settles in the cache of each server. This is a very inefficient scheme, but nothing could be done.

    As a result, we cannot shard content , because we cannot select a specific server for this group - they have a common IP. Also, for some internal reasons, we did not have the opportunity to put such servers in the regions . They stood only in St. Petersburg.

    With the suns, we changed the selection system. Now we have anycast routing : dynamic routing, anycast, self-check daemon. Each server has its own individual IP, but at the same time a common subnet. Everything is configured in such a way that in the event of the loss of one server, traffic is spread out to other servers of the same group automatically. Now it is possible to select a specific server, there is no excessive caching , and reliability is not affected.

    Weight support. Now we can afford to put cars of different capacities as necessary, and also in case of temporary problems, change the weights of the working “suns” to reduce the load on them so that they “rest” and work again.

    Sharding by content id . The funny thing about sharding is that we usually shard content so that different users follow the same file through the same “sun” so that they have a common cache.

    We recently launched the Clover app. This is an online live broadcast quiz where the presenter asks questions and users respond in real time by choosing options. The application has a chat where users can flood. More than 100 thousand people can simultaneously connect to the broadcast. They all write messages that are sent to all participants, along with the message comes another avatar. If 100 thousand people come for one avatar in one “sun”, then it can sometimes roll over a cloud.

    To withstand bursts of requests from the same file, it’s for some kind of content that we include a dumb scheme that spreads files across all the available “suns” in the region.

    Sun inside


    Reverse proxy to nginx, cache in either RAM or Optane / NVMe fast disks. Example: http://sun4-2.userapi.com/c100500/path- a link to the "sun", which is in the fourth region, the second server group. It closes the path file, which physically lies on the server 100500.

    Cache


    We add one more node to our architectural scheme - the caching environment.



    Below is the layout of regional caches , there are about 20 of them. These are the places where exactly the caches and "suns" are located, which can cache traffic through themselves.



    This is caching of multimedia content, user data is not stored here - just music, video, photos.

    To determine the user's region, we collect the BGP network prefixes announced in the regions . In the case of fallback, we still have parsing of the geoip base, if we could not find IP by prefixes. Based on the user's IP, we determine the region . In the code, we can look at one or more regions of the user - those points to which he is geographically closest.

    How it works?


    We consider the popularity of files by region . There is a regional cache number where the user is located, and a file identifier - we take this pair and increment the rating for each download.

    At the same time, demons - services in the regions - from time to time come to the API and say: "I have such and such a cache, give me a list of the most popular files in my region that I don’t have yet." The API gives a bunch of files sorted by rating, the daemon pumps them out, carries them to the regions and gives them files from there. This is a fundamental difference between pu / pp and Sun from caches: they give the file through themselves immediately, even if the file does not exist in the cache, and the cache first downloads the file to itself, and then it starts to give it away.

    At the same time, we get content closer to usersand smearing network load. For example, only from the Moscow cache we distribute more than 1 Tbit / s during busy hours.

    But there are problems - cache servers are not rubber . For super popular content, sometimes there is not enough network on a separate server. We have 40-50 Gbit / s cache servers, but there is content that completely clogs such a channel. We are striving to realize the storage of more than one copy of popular files in the region. I hope that we will realize it by the end of the year.

    We examined the general architecture.

    • Front servers that accept requests.
    • Backends that handle requests.
    • Vaults that are closed by two types of proxies.
    • Regional caches.

    What is missing from this scheme? Of course, the databases in which we store data.

    Databases or engines


    We call them not databases, but Engines engines, because in the generally accepted sense we have practically no databases.



    This is a necessary measure . It happened because in 2008-2009, when VK had an explosive growth in popularity, the project worked completely on MySQL and Memcache and there were problems. MySQL liked to fall and ruin files, after which it did not rise, and Memcache gradually degraded in performance, and had to be restarted.

    It turns out that in the project that was gaining popularity there was a persistent storage that corrupted the data, and a cache that slowed down. In such conditions, it is difficult to develop a growing project. It was decided to try to rewrite the critical things that the project rested on on their own bikes.

    The decision was successful.. The ability to do this was, as was an urgent need, because other scaling methods did not exist at that time. There was no heap of bases, NoSQL did not exist yet, there were only MySQL, Memcache, PostrgreSQL - and that’s all.

    Universal operation . The development was led by our team of C-developers and everything was done in the same way. Regardless of the engine, everywhere there was approximately the same format of the files written to the disk, the same startup parameters, the signals were processed the same and behaved the same in case of edge situations and problems. With the growth of engines, it is convenient for administrators to operate the system - there is no zoo that needs to be maintained, and to learn to operate each new third-party base again, which made it possible to quickly and conveniently increase their number.

    Types of engines


    The team has written quite a few engines. Here are just a few of them: friend, hints, image, ipdb, letters, lists, logs, memcached, meowdb, news, nostradamus, photo, playlists, pmemcached, sandbox, search, storage, likes, tasks, ...

    For every task that requires a specific data structure or processes atypical requests, the C-team writes a new engine. Why not.

    We have a separate memcached engine , which is similar to the usual one, but with a bunch of buns, and which does not slow down. Not ClickHouse, but works too. There is separately pmemcached - this is persistent memcached, which can also store data on disk, and more than it gets into RAM, so as not to lose data when restarting. There are various engines for individual tasks: queues, lists, sets - all that is required by our project.

    Clusters


    From the point of view of the code, there is no need to imagine engines or databases as certain processes, entities or instances. The code works specifically with clusters, with groups of engines - one type per cluster . Let's say there is a memcached cluster - it's just a group of machines.

    The code does not need to know the physical location, size and number of servers. He goes to the cluster by some identifier.

    For this to work, you need to add another entity, which is located between the code and the engines - proxy .

    RPC proxy


    Proxy - a connecting bus , which runs almost the entire site. At the same time, we do not have service discovery - instead of it there is a config of this proxy, which knows the location of all clusters and all shards of this cluster. This is done by admins.

    Programmers generally do not care how much, where and what it costs - they just go to the cluster. This allows us a lot. Upon receipt of the request, the proxy redirects the request, knowing where - it determines this.



    At the same time, proxy is a point of protection against service failure. If any engine slows down or crashes, then the proxy understands this and accordingly responds to the client side. This allows you to remove the timeout - the code does not wait for the engine to respond, but understands that it does not work and you need to behave differently. The code should be prepared for the fact that the databases do not always work.

    Specific implementations


    Sometimes we still really want to have some kind of custom solution as an engine. At the same time, it was decided not to use our ready-made rpc-proxy, created specifically for our engines, but to make a separate proxy for the task.

    For MySQL, which we still have in some places, we use db-proxy, and for ClickHouse - Kittenhouse .

    This works overall like that. There is a server, kPHP, Go, Python are running on it - in general, any code that can follow our RPC protocol. The code goes locally to RPC-proxy - on each server where there is code, its own local proxy is launched. Upon request, the proxy understands where to go.



    If one engine wants to go to another, even if it is a neighbor, it goes through a proxy, because the neighbor can be in a different data center. The engine should not be tied to knowing the location of anything other than itself - we have this standard solution. But, of course, there are exceptions :)

    An example of a TL-scheme according to which all engines work.

    memcache.not_found                                = memcache.Value;
    memcache.strvalue	value:string flags:int = memcache.Value;
    memcache.addOrIncr key:string flags:int delay:int value:long = memcache.Value;
    tasks.task
        fields_mask:#
        flags:int
        tag:%(Vector int)
        data:string
        id:fields_mask.0?long
        retries:fields_mask.1?int
        scheduled_time:fields_mask.2?int
        deadline:fields_mask.3?int
        = tasks.Task;
    tasks.addTask type_name:string queue_id:%(Vector int) task:%tasks.Task = Long;

    This is a binary protocol, the closest analogue of which is protobuf. The scheme describes in advance optional fields, complex types - extensions of built-in scalars, and queries. Everything works according to this protocol.

    RPC over TL over TCP / UDP ... UDP?


    We have an RPC protocol for querying the engine, which runs on top of the TL scheme. This all works on top of the TCP / UDP connection. TCP - it’s clear why we are often asked about UDP.

    UDP helps to avoid the problem of a huge number of connections between servers . If there is an RPC-proxy on each server and in general it can go to any engine, then you get tens of thousands of TCP connections to the server. There is a load, but it is useless. In the case of UDP, this is not a problem.

    No redundant TCP-handshake . This is a typical problem: when a new engine or a new server comes up, many TCP connections are established at once. For small lightweight requests, for example, UDP payload, all communication between the code and the engine are two UDP packets:one flies in one direction, the second in the other. One round trip - and the code received a response from the engine without a handshake.

    Yes, it all works only with a very small percentage of packet loss . The protocol has support for retransmitts, timeouts, but if we lose a lot, we get practically TCP, which is not profitable. Across the oceans, do not drive UDP.

    We have thousands of such servers, and the same scheme is there: a pack of engines is placed on each physical server. Basically, they are single-threaded to work as quickly as possible without blocking, and are shredded as single-threaded solutions. At the same time, we have nothing more reliable than these engines, and much attention is paid to persistent data storage.

    Persistent data storage


    Engines write binlogs . A binlog is a file at the end of which an event is added to change a state or data. In different solutions it is called differently: binary log, WAL , AOF , but the principle is one.

    In order for the engine to not re-read the entire binlog during a restart during many years, the engines write snapshots - the current status . If necessary, they first read from it, and then read from the binlog. All binlogs are written in the same binary format - according to the TL-scheme, so that admins can administer them equally with their tools. There is no such need for snapshots. There is a general heading that indicates whose snapshot is the int, magic of the engine, and which body is not important to anyone. This is the problem of the engine that recorded the snapshot.

    I’ll briefly describe the principle of work. There is a server on which the engine is running. He opens a new empty binlog for recording, writes a change event into it.



    At some point, he either decides to take a snapshot, or he receives a signal. The server creates a new file, completely writes its state into it, appends the current size of the binlog - offset to the end of the file, and continues to write further. A new binlog is not created.



    At some point, when the engine restarts, there will be a binlog and a snapshot on the disk. The engine reads in full snapshot, raises its state at a certain point.



    Subtracts the position that was at the time the snapshot was created, and the size of the binlog.



    Reads the end of the binlog to get the current state and continues to write further events. This is a simple scheme, all of our engines work on it.

    Data replication


    As a result, the data replication is statement-based - we are not writing any page changes to the binlog, but rather requests for changes . Very similar to what comes over the network, only a little changed.

    The same scheme is used not only for replication, but also for creating backups . We have an engine - a writing master who writes in a binlog. In any other place where the admins set up, copying this binlog rises, and that’s all - we have a backup.



    If you need a reading replica in order to reduce the load on reading on the CPU, the reading engine just rises, which reads the end of the binlog and executes these commands locally.

    The lag here is very small, and there is an opportunity to find out how much the replica is behind the master.

    Data sharding in RPC-proxy


    How does sharding work? How does the proxy understand which cluster shard to send to? The code does not say: “Send to 15 shard!” - no, it does a proxy.

    The simplest scheme is firstint , the first number in the request.

    get(photo100_500) => 100 % N.

    This is an example for a simple memcached text protocol, but, of course, requests are complex, structured. The example takes the first number in the query and the remainder of the division by the cluster size.

    This is useful when we want to have data locality of one entity. Let's say 100 is a user or group ID, and we want all the data of one entity to be on the same shard for complex queries.

    If we don’t care how the requests are spread across the cluster, there is another option - hashing the entire shard .

    hash(photo100_500) => 3539886280 % N

    We also get the hash, the remainder of the division and the number of the shard.

    Both of these options work only if we are prepared for the fact that when we increase the size of the cluster, we will split or increase it by a multiple of times. For example, we had 16 shards, we are missing, we want more - you can safely get 32 ​​without downtime. If we want to build up multiple times, there will be a downtime, because it will not be possible to carefully crush everything without loss. These options are useful, but not always.

    If we need to add or remove an arbitrary number of servers, consistent hashing on the a la Ketama ring is used . But at the same time, we completely lose the locality of the data, we have to make a merge request to the cluster so that each piece returns its small answer, and already combine the responses to the proxy.

    There are super-specific queries. It looks like this: RPC-proxy receives a request, determines which cluster to go to and defines a shard. Then there are either writing masters, or if the cluster has replica support, it sends it to the replica on request. This is all done by proxy.



    Logs


    We write logs in several ways. The most obvious and easiest - we write logs in memcache .

    ring-buffer: prefix.idx = line

    There is a key prefix - the name of the log, line, and there is the size of this log - the number of lines. We take a random number from 0 to the number of lines minus 1. The key in memcache is the prefix concatenated with this random number. In the value we save the line of the log and the current time.

    When it is necessary to read the logs, we carry out a Multi Get of all the keys, sort them by time, and thus get a real-time production log. The scheme is used when you need to sell something in production in real time, without breaking anything, without stopping or letting traffic to other machines, but this log does not live for long.

    For reliable log storage, we have a logs-engine. It is for this that it was created, and is widely used in a huge number of clusters. The largest cluster I know stores 600 TB of packed logs.

    The engine is very old, there are clusters that are already 6-7 years old. There are problems with it that we are trying to solve, for example, we began to actively use ClickHouse to store logs.

    Collection of logs in ClickHouse


    This diagram shows how we go to our engines.



    There is a code that locally goes to RPC-proxy by RPC, but he understands where to go to the engine. If we want to write logs in ClickHouse, we need to change two parts in this scheme:

    • replace some engine with ClickHouse;
    • replace RPC-proxy, which can’t go to ClickHouse, with some solution that can, moreover, by RPC.

    The engine is simple - we replace it with a server or a server cluster with ClickHouse.

    And to go to ClickHouse, we made KittenHouse . If we go directly from KittenHouse to ClickHouse - he will not cope. Even without requests, from HTTP connections of a huge number of machines it adds up. For the scheme to work, a local reverse proxy is raised on the server with ClickHouse , which is written in such a way that it can withstand the required volume of connections. It can also relatively reliably buffer data within itself.



    Sometimes we do not want to implement the RPC scheme in non-standard solutions, for example, in nginx. Therefore, KittenHouse has the ability to receive logs over UDP.



    If the sender and receiver of the logs work on the same machine, then the probability of losing a UDP packet within the local host is quite low. As a compromise between the need to implement RPC in a third-party solution and reliability, we use just sending over UDP. We will return to this scheme.

    Monitoring


    We have two types of logs: those that administrators collect on their servers and those that developers write from code. Two types of metrics correspond to them: system and product .

    System metrics


    We run Netdata on all servers , which collects statistics and sends them to Graphite Carbon . Therefore, as a storage system, ClickHouse is used, and not Whisper, for example. If necessary, you can directly read from ClickHouse, or use Grafana for metrics, graphs, and reports. As developers, we have access to Netdata and Grafana.

    Product Metrics


    For convenience, we wrote a lot of things. For example, there is a set of ordinary functions that allow you to write Counts, UniqueCounts values ​​to statistics, which are sent somewhere further.

    statlogsCountEvent   ( ‘stat_name’,            $key1, $key2, …)
    statlogsUniqueCount ( ‘stat_name’, $uid,    $key1, $key2, …)
    statlogsValuetEvent  ( ‘stat_name’, $value, $key1, $key2, …)
    $stats = statlogsStatData($params)
    

    Subsequently, we can use the filters for sorting, grouping, and do whatever we want from statistics - to build graphs, configure Watshdogs.

    We write a lot of metrics, the number of events from 600 billion to 1 trillion per day. At the same time, we want to keep them for at least a couple of years in order to understand the trends in metrics. Gluing it all together is a big problem that we have not yet solved. I’ll tell you how it works over the past few years.

    We have functions that write these metrics to the local memcache in order to reduce the number of entries. Once in a short amount of time, the locally running stats-daemon collects all the records. Further the demon merges metrics in two layers of serverslogs-collectors , which aggregates statistics from the heap of our machines so that the layer does not die behind them.



    If necessary, we can write directly to logs-collectors.



    But writing from code directly to collectors bypassing stas-daemom is a poorly scalable solution because it increases the load on the collector. The solution will only work if for some reason we cannot raise memcache stats-daemon on the machine, or it crashes and we go directly.

    Then logs-collectors merge statistics into meowDB - this is our database, which also knows how to store metrics.



    Then from the code we can make binary “near-SQL” selections.



    Experiment


    In the summer of 2018, we had an internal hackathon, and the idea came up to try replacing the red part of the circuit with something that can store metrics in ClickHouse. We have logs on ClickHouse - why not try it?



    We had a scheme that wrote logs through KittenHouse.



    We decided to add another “* House” to the scheme , which will accept metrics in the format that our UDP code writes. Then this * House turns them into inserts, like the logs that KittenHouse understands. He knows how to deliver these logs to ClickHouse, which should be able to read them.



    The scheme with memcache, stats-daemon and logs-collectors bases is replaced with this.



    The scheme with memcache, stats-daemon and logs-collectors bases is replaced with this.

    • There is a dispatch from code that is written locally in StatsHouse.
    • StatsHouse writes to KittenHouse UDP metrics already converted to SQL-inserts in batches.
    • KittenHouse sends them to ClickHouse.
    • If we want to read them, then we are already reading around StatsHouse - directly from ClickHouse using regular SQL.

    This is still an experiment , but we like the result. If we fix the problems of the circuit, then, perhaps, we will completely switch to it. Personally, I hope so.

    The circuit does not save hardware . Less servers are needed, local stats-daemons and logs-collectors are not needed, but ClickHouse requires a server fatter than those on the current scheme. Servers are needed less, but they must be more expensive and more powerful .

    Deploy


    First, let's look at the PHP deploy. We are developing in git : we use GitLab and TeamCity for deployment. Developer branches merge into the master branch, from the master for testing, merge into staging, from staging into production.

    Before the deployment, the current production branch and the previous one are taken, diff files are considered in them - change: created, deleted, changed. This change is recorded in the binlog of the special copyfast engine, which can quickly replicate changes to our entire fleet of servers. This is not using direct copying, but gossip replicationwhen one server sends changes to its nearest neighbors, those to its neighbors, and so on. This allows you to update the code in tens and units of seconds throughout the park. When the change reaches the local replica, it applies these patches to its local file system . The rollback is also performed according to the same scheme.

    We also deploy kPHP a lot and it also has its own development on git according to the scheme above. Since this is an HTTP server binary , we cannot produce diff - the release binary weighs hundreds of MB. Therefore, here the option is different - the version is written to binlog copyfast . With each build, it is incremented, and with a rollback, it also increases. Version replicated to servers. Local copyfasts see that a new version has got into binlog, and with the same gossip replication they pick up a fresh version of the binary, without tiring our master server, but gently spreading the load over the network. What follows is a graceful restart to the new version.

    For our engines, which are also essentially binary, the scheme is very similar:

    • git master branch;
    • binary in .deb ;
    • the version is written to binlog copyfast;
    • Replicated to servers
    • the server pulls out a fresh .dep;
    • dpkg -i ;
    • graceful restart to the new version.

    The difference is that our binary is packaged in .deb archives , and when downloaded, they are dpkg -i installed on the system. Why do we have kPHP deployed with a binary, and engines with dpkg? It so happened. Works - do not touch.

    Useful links:


    Alexey Akulovich is one of those who, as part of the Program Committee, has helped PHP Russia on May 17 become the largest event in recent years for PHP developers. Look at how cool our PC is, what speakers (two of them are developing the PHP core!) - it seems that if you write in PHP, you should not miss it.

    Also popular now: