Switching from Redshift to ClickHouse



    For a long time iFunny used Redshift as a database for events that occur in backend services and mobile applications. He was chosen because at the time of implementation, by and large, there were no alternatives that are comparable in cost and convenience.

    However, everything changed after the public release of ClickHouse. We studied it for a long time, compared the cost, estimated the approximate architecture, and then, finally, this summer we decided to see how useful it was to us. In this article, you will learn what problem Redshift helped us solve, and how we transferred this solution to ClickHouse.

    Problem


    iFunny needed a service similar to Yandex.Metrica, but only for internal consumption. I will explain why.

    External clients write events. These can be mobile applications, web sites or internal backend services. It is very difficult for these customers to explain that the event reception service is now unavailable, “try sending in 15 minutes or an hour later”. There are many customers, they want to send events all the time and cannot wait at all.

    In contrast, there are internal services and users who are quite tolerant in this regard: they can work correctly even with an unavailable analytics service. And most of the product metrics and A / B test results generally make sense to look only once a day, and maybe less often. Therefore, reading requirements are quite low. In the event of an accident or update, we can afford to be inaccessible or inconsistent in reading for several hours or even days (in a particularly neglected case).

    If we talk about numbers, then we need to take about five billion events (300 GB of compressed data) per day, while storing the data for three months in a “hot” form available for SQL queries, and in a “cold” one for two years or more, but so that within a few days we can turn them into “hot” ones.

    Basically, data is a set of events ordered by time. Event types about three hundred, each has its own set of properties. There is also some data from third-party sources that should be synchronized with the analytics database: for example, a collection of application installations from MongoDB or the external service AppsFlyer.

    It turns out that under the database we need about 40 TB of disk, and under the “cold” storage - about 250 TB more.

    Redshift Solution




    So, there are mobile clients and backend services from which you need to receive events. The data is received by the HTTP service, it performs minimal validation, collects events on the local disk into files grouped by one minute, immediately compresses and sends them to the S3 batch. The availability of this service depends on the availability of the servers with the application and AWS S3. Applications do not store state, so they are easily balanced, scaled and interchanged. S3 is a relatively simple file storage service with a good reputation and availability, so you can rely on it.

    Next you need to somehow deliver the data to Redshift. It's all quite simple: Redshift has a built-in S3-importer, which is the recommended way to load data. Therefore, once every 10 minutes, a script is launched that connects to Redshift and asks it to load data by prefix. s3://events-bucket/main/year=2018/month=10/day=14/10_3*

    In order to track the status of the download task, we use Apache Airflow : it allows you to repeat the operation in case of errors and have a clear execution history, which is important when there are many such tasks. And in case of problems, you can repeat the download for some time intervals or download the “cold” data from S3 a year ago.

    In the same Airflow, in the same way, according to the schedule, the scripts work that connect to the database and perform periodic downloads from external repositories, or build aggregations over events in the form of INSERT INTO ... SELECT ...

    Redshift and have weak availability guarantees. Once a week, for up to half an hour (a time window is specified in the settings) AWS can stop the cluster for updating or any other scheduled work. In case of a crash on one node, the cluster also becomes unavailable until the host is restored. It usually takes about 15 minutes and happens about once every six months. In the current system, this is not a problem, it was originally designed for the fact that the base will be periodically unavailable.

    Under Redshift, 4 ds2.8xlarge instances (36 CPU, 16 TB HDD) were used, which in total gives us 64 TB of disk space.

    The last point is the backup. The backup schedule can be specified in the cluster settings, and it works fine.

    Motivation for the transition to ClickHouse


    Of course, if there were no problems, no one would have thought about the migration to ClickHouse. However, they were.

    If you look at the ClickHouse storage scheme with the MergeTree and Redshift engine, you can see that their ideology is very similar. Both databases are columnar, they work fine with a large number of columns and compress data on a disk very well (and in Redshift you can configure compression types for each individual column). Even the data is stored in the same way: they are sorted by the primary key, which allows you to read only specific blocks and not to keep separate indices in memory, and this is important when working with large amounts of data.

    The essential difference, as always, is in the details.

    Table for every day


    Sorting data on disk and the actual deletion in Redshift occurs at the moment when you execute:
    VACUUM <tablename>
    In this case, the vacuum process works with all the data in this table. If you store data for all three months in one table, this process takes an indecent amount of time, and you need to perform it at least daily, because old data is deleted and new data is added. We had to build separate tables for each day and combine them through the View, and this is not only the difficulty in rotating and supporting this View, but also slowing down queries. Upon request, judging by explain, all tables were scanned. And although scanning a single table takes less than a second, with their number in 90 pieces, it turns out that any query takes at least a minute. This is not very convenient.

    Duplicates


    The next problem is duplicates. Anyway, when transferring data over the network there are two options: either to lose data, or to receive duplicates. We could not lose the message, so we simply resigned to the fact that some small percentage of events would be duplicated. You can remove duplicates per day by creating a new table, inserting data from the old one into it, where using the window function, you deleted rows with duplicate id, deleting the old table and renaming the new one. Since there was a view over the day tables, it was necessary not to forget about it and delete the tables for the time of renaming the tables. At the same time, it was also necessary to keep track of the locks, otherwise, in the case of a query that blocked the view or one of the tables, this process could take a long time.

    Monitoring and maintenance


    No Redshift request takes less than a couple of seconds. Even if you just want to add a user or see a list of active requests, you will have to wait a couple of tens of seconds. Of course, you can suffer, and for this class of databases is acceptable, but in the end results in a lot of wasted time.

    Cost of


    According to our calculations, deploying ClickHouse on AWS instances with exactly the same resources is exactly two times cheaper. Of course, this is the way it should be, because using Redshift, you get a ready-made database, to which you can connect to any PostgreSQL client immediately after pressing a couple of buttons in the AWS console, and AWS does the rest for you. However, is it worth it? We already have the infrastructure, we seem to be able to do backups, monitoring and configuration, and we are doing this for a bunch of internal services. Why not take up the support of ClickHouse?

    Transition process


    To begin with, we raised a small ClickHouse installation from one machine, where we began to periodically, with the help of embedded tools, load data from S3. Thus, we were able to test our assumptions about the speed and capabilities of ClickHouse.

    After a couple of weeks of tests on a small copy of the data, it became clear that in order to replace Redshift with Clickhouse, I would have to resolve several issues:

    • on which types of instances and disks to deploy;
    • Do I use replication?
    • how to install, configure and run;
    • how to do monitoring;
    • what kind of scheme will be;
    • how to deliver data from S3;
    • How to rewrite all queries from standard SQL to non-standard?

    Instance types and disks . In the number of processors, disk and memory decided to build on the current installation of Redshift. There were several options, including i3 instances with local NVMe disks, but they decided to stop at r5.4xlarge and storage in the form of 8T ST1 EBS for each instance. According to estimates, this should have been comparable to Redshift performance at half the cost. In this case, due to the use of EBS-disks, we get simple backups and recovery through snapshots of disks, almost like in Redshift.

    Replication . Since they were repelled by what is already in Redshift, they decided not to use replication. In addition, this does not force ZooKeeper to be immediately studied, which is not yet in the infrastructure, but it’s great that now there is an opportunity to do replication on demand.

    Installation . This is the easiest part. A small enough Ansible role that installs ready-made RPM packages and makes the same configuration on each host.

    Monitoring . To monitor all services, Prometheus is used together with Telegraf and Grafana, so they simply put Telegraf agents on ClickHouse hosts, collected a dashboard in Grafana, which showed the current server load on the processor, memory and disks. Through the plug-in to Grafana, current active queries on the cluster, the status of imports from S3, and other useful things were brought to this dashboard. It turned out even better and more informative (and significantly faster) than the dashboard that gave the AWS console.

    Scheme. One of our most important mistakes in Redshift was to place in the separate columns only the main fields of events, and the fields that are rarely used, put
    in one big column properties. On the one hand, this gave us the flexibility to change the fields at the initial stages, when there was no complete understanding of what events we were going to collect, with what properties, besides, they changed 5 times a day. On the other hand, requests for a large column of properties took more and more time. In ClickHouse, we decided to do it right away, so we collected all possible columns and entered the best type for them. It turned out a table with about two hundred columns.

    The next task was to choose the right engine for storage and partitioning.
    With partitioning, they didn’t think again, but did the same as it was in Redshift, by partition for each day, but now all partitions are one table, which
    significantly speeds up queries and simplifies maintenance. The storage engine took ReplacingMergeTree, as it allows you to remove duplicates from a particular partition, simply by running OPTIMIZE ... FINAL . In addition, the daily partitioning scheme allows, in case of errors or accidents, to work only with data for a day, not a month, which is significantly faster.

    Delivery of data from s3 to ClickHouse. It was one of the longest processes. It was impossible to simply load the built-in ClickHouse tools, because the data on S3 is in JSON, each field needs to be retrieved in its own jsonpath, as we did in Redshift, and sometimes also apply a transformation: for example, DD96C92F-3F4D-44C6-BCD3-E25EB26389E9convert the UUID message from the standard record in bytes and put in type FixedString (16).

    I wanted to have a special service similar to what we had in Redshift as a COPY command. They did not find anything ready, so I had to do it. How it works, you can write a separate article, but in short, this is an HTTP service, which is deployed on each host with ClickHouse. You can contact any of them. The request parameters specify the S3 prefix, from which the files are taken, the jsonpath list for converting from JSON to the set of columns, and the set of transformations for each column. The server to which the request came, begins to scan files on S3 and distribute the work on parsing to the other hosts. In this case, it is important for us that the rows that could not be imported, together with the error, were added to a separate table ClickHouse. This greatly helps to investigate the problems and bugs in the service of receiving events and the clients that generate these events. Placing the importer directly on the database hosts we disposed of those resources that, as a rule, are idle, because complex requests to them do not go around the clock. Of course, if there are more requests, you can always bring the service of the importer to individual hosts.

    There was no big problem with importing data from external sources. In those scripts that were, just changed the assignment from Redshift to ClickHouse.

    There was an option to connect MongoDB in the form of a dictionary, and not to make daily copies. Unfortunately, it did not fit, because the dictionary must be placed in memory, and the size of most collections in MongoDB do not allow it. But the dictionaries are also useful to us: with their help, it is very convenient to connect GeoIP databases from MaxMind and use them in queries. For this we use the layout ip_trie and CSV files that are provided by the service. For example, the configuration of the geoip_asn_blocks_ipv4 dictionary looks like this:

    <dictionaries><dictionary><name>geoip_asn_blocks_ipv4</name><source><file><path>GeoLite2-ASN-Blocks-IPv4.csv</path><format>CSVWithNames</format></file><\/source><lifetime>300</lifetime><layout><ip_trie /></layout><structure><key><attribute><name>prefix</name><type>String</type></attribute></key><attribute><name>autonomous_system_number</name><type>UInt32</type><null_value>0</null_value></attribute><attribute><name>autonomous_system_organization</name><type>String</type><null_value>?</null_value></attribute></structure></dictionary></dictionaries>

    This config is enough to put in /etc/clickhouse-server/geoip_asn_blocks_ipv4_dictionary.xml, after which you can make queries to the dictionary to get the name of the provider by IP address:

    SELECT dictGetString('geoip_asn_blocks_ipv4', 'autonomous_system_organization', tuple(IPv4StringToNum('192.168.1.1')));
    

    Change data schema . As mentioned above, we decided not to use replication yet, as we can now afford to become inaccessible during accidents or planned works, and a copy of the data already lies on s3 and we can transfer it to ClickHouse within a reasonable time. If there is no replication, then ZooKeeper was not expanded, and the absence of ZooKeeper also leads to the impossibility of using the ON CLUSTER clause in DDL queries. This problem was solved by a small python script that connects to each ClickHouse host (as long as there are only eight of them) and executes the specified SQL query.

    Not full SQL support in ClickHouse. The process of transferring requests from the Redshift syntax to the ClickHouse syntax went along with the development of the importer, and it was mainly the team of analysts who worked on it. Strangely enough, but the matter turned out to be not even in JOIN, but in window functions. It took several days to understand how they can be done through arrays and lambda functions. It is good that this question is often covered in reports about ClickHouse, of which there are a huge number, for example, events.yandex.ru/lib/talks/5420. At this point, the data was already written in two places at once: both in Redshift and in the new ClickHouse, so when transferring queries, we compared the results. It was problematic to compare the speed, since we removed one large properties column, and most of the requests began to work only with the required columns, which naturally gave a significant increase, but those requests where the properties column did not participate, worked the same way, or slightly faster.

    The result was the following scheme:



    results


    In the bottom line, we got the following benefits:

    • One table instead of 90
    • Service requests are executed in milliseconds
    • The cost has decreased by half
    • Easy removal of duplicate events

    There are disadvantages to which we are ready:

    • In case of an accident, you will have to repair the cluster yourself.
    • Schema changes now need to be made on each host separately
    • Update on the new version will have on their own

    We cannot compare the speed of requests in the forehead, since the data scheme has changed significantly. Many queries have become faster, simply because they read less data from the disk. In an amicable way, such a change had to be made in Redshift, but it was decided to combine it with the migration to ClickHouse.

    The whole migration together with the preparation took about three months. She went from the beginning of July to the end of September and demanded the participation of two people. On September 27th, we turned off Redshift and since then we are working only on ClickHouse. It turns out, already a little more than two months. The term is small, but so far never encountered data loss or a critical bug, due to which the entire cluster would have risen. Ahead of us are waiting for updates on new versions!

    Also popular now: