Getting out of Tarantool networks. Node synchronization when filtering traffic

    image

    Variti specializes in protection against bots and DDoS attacks, and also conducts stress and load testing. Since we work as an international service, it is extremely important for us to ensure uninterrupted exchange of information between servers and clusters in real time. At the Saint HighLoad ++ 2019 conference, Variti developer Anton Barabanov told how we use UDP and Tarantool, why we took such a bunch, and how we had to rewrite the Tarantool module from Lua to C.

    You can also read the report abstracts below, and see below under the spoiler video.

    Report video


    When we started making a traffic filtering service, we immediately decided not to deal with IP transit, but to protect HTTP, API and game services. Thus, we terminate traffic at the L7 level in the TCP protocol and pass it on. Protection on L3 & 4 at the same time occurs automatically. The diagram below shows the service diagram: requests from people go through a cluster, that is, servers and network equipment, and bots (shown as a ghost) are filtered.



    For filtering, it is necessary to break the traffic into separate requests, analyze the sessions accurately and quickly, and since we do not block by IP addresses, define bots and people inside the connection from the same IP address.

    What happens inside the cluster


    Inside the cluster, we have independent filter nodes, that is, each node works on its own and only with its own piece of traffic. Traffic is randomly distributed between nodes: if, for example, 10 connections are received from one user, then they all diverge on different servers.

    We have very stringent performance requirements as our customers are located in different countries. And if, for example, a user from Switzerland visits a French site, then he is already faced with 15 milliseconds of network delay due to an increase in the traffic route. Therefore, we are not entitled to add another 15-20 milliseconds inside our processing center - the request will go on for a critically long time. In addition, if we process each HTTP request for 15-20 milliseconds, then a simple attack of 20 thousand RPS will add up the entire cluster. This, of course, is unacceptable.

    Another requirement for us was not just tracking the request, but also understanding the context. Suppose a user opens a web page and sends a slash request. After that, the page is loaded, and if it is HTTP / 1.1, then the browser opens 10 connections to the backend and in 10 streams requests statics and dynamics, makes ajax requests and subqueries. If, instead of proxying a subquery, in the process of returning the page, you start interacting with the browser and try to give it, say, JS Challenge for the subquery, then most likely you will break the page. On the very first request, you can give CAPTCHA (although this is bad) or JS challenges, make a redirect, and then any browser will process everything correctly. After testing, it is necessary to disseminate information on all clusters that the session is legitimate. If there is no exchange of information between the clusters,

    It is also important to respond quickly to all load surges and changes in traffic. If something jumped on one node, then, after 50-100 milliseconds, a jump will occur on all other nodes. Therefore, it is better if the nodes know about the changes in advance and set the protection parameters in advance so that no jump occurs on all other nodes.
    An additional service for protecting against bots was the post-markup service: we put a pixel on the site, write bot / person information and send this data via API. These verdicts must be kept somewhere. That is, if earlier we talked about synchronization within a cluster, now we are adding synchronization of information between clusters as well. Below we show the scheme of the service at the L7 level.



    Between clusters


    After we made the cluster, we started to scale. We work through BGP anycast, that is, our subnets are announced from all clusters and traffic comes to the closest one. Simply put, a request is sent from France to a cluster in Frankfurt, and from St. Petersburg to a cluster in Moscow. Clusters should be independent. Network streams are permissible independent.

    Why is it important? Suppose a person drives a car, works with a website from the mobile Internet and crosses a certain Rubicon, after which traffic suddenly switches to another cluster. Or another case: the traffic route was rebuilt because somewhere the switch or router burned out, something fell, the network segment disconnected. In this case, we provide the browser (for example, in cookies) with sufficient information so that when switching to another cluster it is possible to inform the necessary parameters about the passed or failed tests.
    In addition, you must synchronize protection mode between clusters. This is important in the case of low volume attacks, which are most often carried out under the cover of flooding. Since attacks run in parallel, people think that their site is breaking the flood and do not see a low volume attack. For the case when low volume comes to one cluster, and flood to another, synchronization of protection mode is necessary.

    And as already mentioned, we synchronize between the clusters the very verdicts that accumulate and are given by API. In this case, there can be many verdicts and they must be synchronized reliably. In protection mode, you can lose something inside the cluster, but not between the clusters.

    It is worth noting that there is a large latency between the clusters: in the case of Moscow and Frankfurt, this is 20 milliseconds. Synchronous requests cannot be made here; all interaction should go asynchronously.

    Below we show the interaction between the clusters. M, l, p are some technical parameters for an exchange. U1, u2 is user markup as illegitimate and legitimate.



    Internal interaction between nodes


    Initially, when we made the service, filtering at the L7 level was started on only one node. This worked well for two clients, but no more. When scaling, we wanted to achieve maximum responsiveness and minimum latency.

    It was important to minimize the CPU resources spent on processing packets, so interaction through, for example, HTTP would not be suitable. It was also necessary to ensure a minimum overhead consumption of not only computing resources, but also packet rate. Nevertheless, we are talking about filtering attacks, and these are situations in which there is obviously not enough performance. Usually, when building a web project, x3 or x4 is enough for the load, but we always have x1, since a large-scale attack can always come.

    Another requirement for the interaction interface is the presence of a place where we will write information and from where we can then consider what state we are in now. It's no secret that C ++ is often used to develop filtering systems. But unfortunately, programs written in C ++ sometimes crash. Sometimes such programs need to be restarted in order to update, or, for example, because the configuration has not been re-read. And if we restart the node under attack, then we need to take somewhere the context in which this node existed. That is, the service should not be stateless, it should remember that there are a certain number of people whom we blocked, whom we check. There must be the same internal communication so that the service can receive a primary set of information. We had thoughts to put near a certain database, for example, SQLite,

    In fact, we work with only three operations. The first function is “send” to all nodes. This applies, for example, to messages on synchronization of the current load: each node must know the total load on the resource within the cluster in order to track peaks. The second operation is to “save”; it concerns verdicts of verification. And the third operation is a combination of “send to everyone” and “save”. Here we are talking about state-change messages that we send to all nodes and then save in order to be able to subtract. Below is the resulting interaction scheme, in which we will need to add parameters for saving.



    Options and Result


    What options for preserving verdicts have we looked at? Firstly, we were thinking about the classics, RabbitMQ, RedisMQ and our own TCP-based service. We rejected these decisions because they work slowly. The same TCP adds x2 to the packet rate. In addition, if we send a message from one node to all the others, then we either need to have a lot of sending nodes, or this node can poison 1/16 of those messages that 16 machines can send to it. It is clear that this is unacceptable.

    As a result, we took UDP multicast, because in this case the sending center is network equipment, which is not limited in performance and allows you to completely solve problems with the speed of sending and receiving. It is clear that in the case of UDP, we do not think about text formats, but send binary data.

    In addition, we immediately added packaging and a database. We took Tarantool, because, firstly, all three founders of the company had experience working with this database, and secondly, it is as flexible as possible, that is, it is also a kind of application service. In addition, Tarantool has CAPI, and the ability to write in C is a matter of principle for us because maximum protection is required to protect against DDoS. None of the interpreted languages ​​can provide sufficient performance, unlike C.

    In the diagram below, a database has been added to the cluster, in which states for internal communication are stored.



    Add database


    In the database, we store the state in the form of a log of calls. When we came up with how to save information, there were two options. It was possible to store some state with constant updating and change, but it is rather difficult to implement. Therefore, we used a different approach.

    The fact is that the structure of data sent via UDP is unified: there is timing, some kind of code, three or four data fields. So we started writing this structure in space Tarantool and added a TTL record there, which makes it clear that the structure is outdated and needs to be deleted. Thus, a message log is accumulated in Tarantool, which we clear with the specified timing. To delete old data, we initially took expirationd. Subsequently, we had to abandon it, because it caused certain problems, which we will discuss below. So far, the scheme: on it two databases were added to our structure.



    As we already mentioned, in addition to storing cluster states, it is also necessary to synchronize verdicts. Verdicts we synchronize intercluster. Accordingly, it was necessary to add an additional installation of Tarantool. It would be strange to use another solution, because Tarantool is already there and it is ideal for our service. In the new installation, we began to write verdicts and replicate them with other clusters. In this case, we use not master / slave, but master / master. Now in Tarantool there is only an asynchronous master / master, which for many cases is not suitable, but for us this model is optimal. With minimal latency between clusters, synchronous replication would be in the way, while asynchronous replication does not cause problems.

    Problems


    But we had a lot of problems. The first block of complexity is related to UDP : it is no secret that the protocol can beat and lose packets. We solved these problems by the ostrich method, that is, we simply hid our heads in the sand. Nevertheless, packet damage and rearrangement of their places is impossible with us, since communication takes place within the framework of a single switch, and there are no unstable connections and unstable network equipment.

    There may be a problem of packet loss if a machine freezes, an Input-Output occurs somewhere, or a node is overloaded. If such a hang occurred for a short period of time, say, 50 milliseconds, then this is terrible, but is solved by increased sysctl queues. That is, we take sysctl, configure the size of the queues and get a buffer in which everything lies until the node starts working again. If a longer freeze occurs, then the problem will not be the loss of connectivity, but part of the traffic that goes to the node. So far, we have simply had no such cases. Tarantool asynchronous replication issues

    were much more complicated .. Initially, we did not take master / master, but a more traditional model for operating master / slave. And everything worked exactly until the slave took over the master load for a long time. As a result, expirationd worked and deleted data on master, but on slave it was not. Accordingly, when we switched several times from master to slave and back, so much data accumulated on slave that at some point everything broke. So for full fault tolerance, I had to switch to asynchronous master / master replication.

    And here again difficulties arose. Firstly, keys may intersect between different replicas. Suppose, within the cluster, we wrote data to one master, at this point the connection broke, we wrote everything to the second master, and after we performed asynchronous replication, it turned out that the same primary key in space and the replication was scattered.

    We solved this problem simply: we took a model in which the primary key necessarily contains the name of the Tarantool node we are writing to. Due to this, conflicts ceased to arise, but a situation has become possible when user data is duplicated. This is an extremely rare case, so we again simply neglected it. If duplication occurs frequently, then Tarantool has many different indexes, so you can always do deduplication.

    Another problem concerns the preservation of verdicts and arises when the data recorded on one master has not yet appeared on another, and a request has already arrived at the first master. To be honest, we have not yet resolved this issue and are simply delaying the verdict. If this is unacceptable, then we will organize a kind of push about data readiness. That’s how we dealt with master / master replication and its problems.

    There was a block of problems directly related to Tarantool, its drivers and expirationd module. Some time after the launch, attacks began to come to us every day, respectively, the number of messages that we save in the database for synchronization and storage of context has become very large. And during stripping, so much data began to be deleted that the garbage collector stopped coping. We solved this problem by writing in C our own expirationd module called IExpire.

    However, with expirationd there is one more difficulty that we have not yet managed with and that lies in the fact that expirationd works only on one master. And if the expirationd node falls, the cluster will lose critical functionality. Suppose we clean all data older than one hour - it is clear that if a node lies, say, five hours, then the amount of data will be x5 to the usual. And if at that moment a large attack comes, that is, two bad cases coincide, then the cluster will fall. We do not yet know how to deal with this.

    Finally, there remained difficulties with the Tarantool driver for C. When we broke down the service (for example, due to race condition), it took a long time to find the reason and debug. Therefore, we just wrote our Tarantool driver. It took us five days to implement the protocol along with testing, debugging, and launching in production, but we already had our own code for working with the network.

    Problems outside


    Recall that we already have Tarantool replication ready, we already know how to synchronize verdicts, but there is no infrastructure for transmitting messages about attacks or problems between clusters yet.
    We had a lot of different thoughts about the infrastructure, including the thought of writing our own TCP service. But still there is a Tarantool Queue module from the Tarantool team. In addition, we already had Tarantool with cross-cluster replication, “holes” were twisted, that is, there was no need to go to the admins and ask to open ports or drive traffic. Again, integration into software filtration was ready.

    There was a difficulty with the host node. Suppose there are n independent nodes inside a cluster and you need to choose the one that will interact with the write queue. Because otherwise 16 messages will be sent or 16 times the same message will be subtracted from the queue. We solved this problem simply: we register a responsible node in space Tarantool, and if the node burns down, then we simply change the space if we do not forget. But if we forget, then this is a problem that we also want to solve in the future.

    Below is an already detailed diagram of a cluster with an interaction interface.



    What I want to improve and add


    Firstly, we want to post in open source IExpire. It seems to us that this is a useful module, because it allows you to do everything the same as expirationd, but with almost zero overhead. There you should add a sorting index to remove only the oldest tuple. So far, we have not done this, since the main operation in Tarantool for us is “writing”, and an extra index will cause extra load due to its support. We also want to rewrite most of the methods in CAPI to avoid folding the database.

    The question remains with the choice of a logical master, but it seems that this problem is completely impossible to solve. That is, if the node with expirationd falls, it remains only to manually select another node and run expirationd on it. This is unlikely to happen automatically, because replication is asynchronous. Although we will probably consult on this with the Tarantool team.

    In the event of an exponential growth of clusters, we will also have to ask the Tarantool team for help. The fact is that all-to-all replication is used for Tarantool Queue and intercluster saving of verdicts. This works well, while there are three clusters, for example, but when there are 100 of them, the number of connections that need to be monitored will be incredibly large and something will constantly break. Secondly, it is not a fact that Tarantool can withstand such a load.

    conclusions


    The first conclusions concern UDP multicast and Tarantool. Multicast does not need to be afraid of it; its use inside the cluster is good, correct and fast. There are many cases when there is a constant synchronization of states, and after 50 milliseconds it does not matter what happened before. And in this case, most likely, the loss of one state will not be a problem. So using UDP multicast is justified, because you do not limit performance and get the optimal packet rate.

    The second point is Tarantool. If you have a service on go, php and so on, then most likely Tarantool is applicable as is. But if you have heavy loads, you will need a file. But to be honest, in this case, the file is needed at all for everything: both for Oracle and for PostgeSQL.

    Of course, there is an opinion that you do not need to reinvent the wheel, and if you have a small team, then you should take a ready-made solution: Redis for synchronization, standard go, python and so on. It is not true. If you are sure that you need a new solution, if you worked with open source, found out that nothing suits you, or you know in advance that it doesn’t even make sense to try, then sawing your decision is useful. Another conversation that it is important to stop on time. That is, you do not need to write your Tarantool, you do not need to implement your messaging, and if you just need a broker, take Redis already and you will be happy.

    Also popular now: