ClickHouse Product Analytics VKontakte



    Developing any product, whether it’s a video service or a tape, stories or articles, I want to be able to measure the conditional "happiness" of the user. To understand whether we are making our changes better or worse, to adjust the direction of product development, relying not on intuition and our own feelings, but on metrics and numbers that you can believe in.

    In this article, I will tell you how we managed to launch product statistics and analytics on a service with a 97-million-monthly audience, while receiving extremely high performance analytical queries. We will talk about ClickHouse, the engines used and the features of the queries. I will describe an approach to data aggregation, which allows us to obtain complex metrics in a split second, and talk about data conversion and testing.

    Now we have about 6 billion food events per day, in the near future we will reach 20–25 billion. And then - not at such a fast pace, we will rise to 40-50 billion by the end of the year, when we describe all the food events of interest to us.

    1 rows in set. Elapsed: 0.287 sec. Processed 59.85 billion rows, 59.85 GB (208.16 billion rows / s., 208.16 GB / s.)

    Details under cat.

    Foreword


    Analytical tools were VKontakte before. Unique users were considered, it was possible to build event schedules by slices and thereby fall into the depths of the service. However, they talked about fixed slices in advance, about aggregated data, about HLL for unique ones, about some stiffness and inability to answer quickly questions a little more complicated than “how much?”.

    Of course, there was, is and will hadoop, it was also written, written and will be written a lot, a lot of logs of using services. Unfortunately, hdfs was used only by some teams to implement their own tasks. Even more sadly, hdfs is not about quick analytical queries: there were questions to many fields, the answers to which had to be found in the code, and not in the documentation accessible to everyone.

    We came to the conclusion that it is no longer possible to live like this. Each team should have data, queries over them should be fast, and the data itself should be accurate and rich in useful parameters.

    Therefore, we formulated clear requirements for the new system of statistics / analytics:

    • analytic queries should be fast;
    • the data is quite accurate, ideally these are raw user interaction events with the service;
    • the structure of events should be described, understood and accessible;
    • reliable data storage, one-time delivery guarantee;
    • it is possible to count the uniques, the audience (daily, weekly, monthly), retention metrics, time spent by the user in the service, quantified actions on unique and other metrics by the set of slices;
    • testing, data conversion and visualization are in progress.

    On the kitchen


    Experience suggested that we needed two databases: a slow one, where we would aggregate and enrich the data, and a fast one, where we could work with this data and build graphs on top of it. This is one of the most common approaches, in which in a slow base, for example, in hdfs, different projections are built - on unique ones and on the number of events by slices for a certain period of time.

    On a warm September afternoon, while talking over a cup of tea in the kitchen overlooking the Kazan Cathedral, we had the idea to try ClickHouse as a quick base - at that time we already used it to store technical logs. There were many doubts related primarily to speed and reliability: the declared performance tests seemed unrealistic, and new database releases periodically broke existing functionality. Therefore, the proposal was simple - to try.

    First samples


    We deployed a cluster of two machines with this configuration:
    2xE5-2620 v4 (32 cores in total), 256G ram, 28T places (raid10 with ext4).

    Initially, it was near layout, but then we switched to far. ClickHouse has many different table engines, but the main ones are from the MergeTree family. We chose ReplicatedReplacingMergeTree with roughly the following settings:

    PARTITION BY dt 
    ORDER BY (toStartOfHour(time), cityHash64(user_id), event_microsec, event_id)
    SAMPLE BY cityHash64(user_id)
    SETTINGS index_granularity = 8192;

    Replicated - means that the table is replicated, and this solves one of our reliability requirements.

    Replacing - the table supports deduplication by the primary key: by default, the primary key matches the sort key, so the ORDER BY section just tells you what the primary key is.

    SAMPLE BY - I also wanted to try sampling: sample returns a uniformly pseudo-random sample.

    index_granularity = 8192 is the magic number of data rows between index serifs (yes, it's sparse), which is used by default. We did not change it.

    Partitioning was done by day (although by default - by month). A lot of data requests were supposed to be intraday - for example, build a minute chart of video views for a given day.

    Next, we took a piece of technical logs and filled the table with about a billion rows. Excellent compression, grouping by column type Int *, counting unique values ​​- everything worked incredibly fast!

    Speaking of speed, I mean that not a single request lasted longer than 500 ms, and most of them fit into 50-100 ms. And this is on two machines - and, in fact, only one was involved in the calculations.

    We looked at all this and imagined that instead of the UInt8 column there will be an id of the country, and the Int8 column will be replaced by data, for example, about the user's age. And they realized that ClickHouse is completely suitable for us, if everything is done correctly.

    Strong data typing


    The benefit of ClickHouse begins exactly when the correct data schema is formed. Example: platform String - bad, platform Int8 + dictionary - good, LowCardinality (String) - convenient and good (I'll talk about LowCardinality a bit later).

    We created a special generator class in php, which, upon request, creates wrapper classes over events based on tables in ClickHouse, and a single entry point to logging. I will explain the example of the scheme that turned out:

    1. Analyst / data engineer / developer describes the documentation: which fields, possible values, events need to be logged.
    2. A table is created in ClickHouse in accordance with the data structure from the previous paragraph.
    3. Wrapping classes for events based on a table are generated.
    4. The product team implements filling out the fields of an object of this class, sending.

    Changing the scheme at the php level and the type of logged data will not work without first changing the table in ClickHouse. And this, in turn, cannot be done without coordination with the team, changes in documentation and description of events.

    For each event, you can set two settings that control the percentage of events sent to ClickHouse and hadoop, respectively. Settings are needed primarily for gradual rolling with the ability to cut down logging if something goes wrong. Before hadoop, data is delivered in a standard way using Kafka. And in ClickHouse, they fly through a scheme with KittenHouse in persistent mode, which guarantees at least a single event delivery.

    The event is delivered to the buffer table to the desired shard, based on the remainder of dividing some hash from user_id by the number of shards in the cluster. Next, the buffer table flushes the data to the local ReplicatedReplacingMergeTree. And on top of the local tables, a distributed table is pulled with the Distributed engine, which allows you to access data from all shards.

    Denormalization


    ClickHouse is a columnar DBMS. It is not about normal forms, which means that it is better to have all the information right in the event than to join. There are also Join, but if the right table does not fit in memory, pain begins. Therefore, we made a strong-willed decision: all the information we are interested in should be stored in the event itself. For example, gender, user's age, country, city, birthday - all that is public information that can be useful for audience analytics, as well as all the useful information about the interaction object. If, for example, we are talking about video, it’s video_id, video_owner_id, video upload date, length, quality at the time of the event, maximum quality, and so on.

    In total, in each table we have from 50 to 200 columns, while in all the tables there are service fields. For example, the error log is error_log - in fact, we call an error out of range of the type. In case strange values ​​go beyond the size of the type in the field with age.

    Type LowCardinality (T)


    ClickHouse has the ability to use external dictionaries. They are stored in memory, periodically updated, can be effectively used in various scenarios, including as classic reference books. For example, you want to log the operating system and you have two alternatives: a string or a number + a directory. Of course, on large amounts of data, and for high performance analytical queries, it is logical to write a number, and get a string representation from the dictionary when you need:

    dictGetString('os', 'os_name', toUInt64(os_id))

    But there is a much more convenient way - to use the type LowCardinality (String), which automatically builds a dictionary. The performance with LowCardinality under the condition of low cardinality of the set of values ​​is radically higher than with String.

    For example, we use LowCardinality (String) for the event types 'play', 'pause', 'rewind'. Or for platform: 'web', 'android', 'iphone':

    SELECT
        vk_platform,
        count()
    FROM t
    WHERE dt = yesterday()
    GROUP BY vk_platform
    Elapsed: 0.145 sec. Processed 1.98 billion rows, 5.96 GB 
    (13.65 billion rows/s., 41.04 GB/s.)

    The feature is still experimental, so to use it you must perform:

    SET allow_experimental_low_cardinality_type = 1;

    But there is a feeling that after some time she will no longer be under the setting.

    VKontakte data aggregation


    Since there are a lot of columns, and there are a lot of events, the natural desire is to cut the “old” partitions, but first - to assemble the units. Occasionally, it is necessary to analyze raw events (a month or a year ago), so we do not cut the data in hdfs - any analyst can contact the desired parquet for any date.

    As a rule, when aggregating in a time interval, we always rest on the fact that the number of rows per unit time is equal to the product of the cut power. This imposes restrictions: countries begin to collect in groups such as 'Russia', 'Asia', 'Europe', 'The rest of the world', and ages - in intervals, in order to reduce the dimension to a conditional million lines per date.

    Aggregation by dt, user_id


    But we have a reactive ClickHouse! Can we accelerate to 50-100 million lines on a date?
    Quick tests showed that we can, and at that moment a simple idea arose - to leave the user in the machine. Namely, to aggregate not by “date, slices” using spark tools, but by “date, user” means by ClickHouse, while doing some “transposition” of data.

    With this approach, we store users in aggregated data, which means that we can still consider audience indicators, retention and frequency metrics. We can connect units, counting the common audiences of several services up to the entire VKontakte audience. All this can be done by any slice that is present in the table for the conditionally the same time.

    I will illustrate with an example:



    After aggregation (many more columns on the right):



    In this case, aggregation occurs precisely by (dt, user_id). For fields with user information, with such aggregation, you can use the functions any, anyHeavy (selects a frequently occurring value). You can, for example, collect anyHeavy (platform) in an aggregate to know which platform the user is using for the most part from video events. If desired, you can use groupUniqArray (platform) and store an array of all platforms from which the user raised the event. If this is not enough, you can create separate columns for the platform and store, for example, the number of unique videos screened to half from a specific platform:

    uniqCombinedIf(cityHash64(video_owner_id, video_id), 
    (platform = 'android') AND (event = '50p')) as uniq_videos_50p_android

    With this approach, a rather wide aggregate is obtained in which each row is a unique user, and each column contains information either on the user or on his interaction with the service.

    It turns out that in order to calculate the DAU of a service, it is enough to execute such a request on top of its aggregate:

    SELECT
        dt,
        count() as DAU
    FROM agg
    GROUP BY dt
    Elapsed: 0.078 sec.

    Or calculate how many days users were in the service for the week:

    SELECT
        days_in_service,
        count() AS uniques
    FROM
    (
        SELECT uniqUpTo(7)(dt) AS days_in_service
        FROM agg2
        WHERE dt > (yesterday() - 7)
        GROUP BY user_id
    )
    GROUP BY days_in_service
    ORDER BY days_in_service ASC
    7 rows in set. Elapsed: 2.922 sec. 

    We can accelerate by sampling, while almost without losing accuracy:

    SELECT
        days_in_service,
        10 * count() AS uniques
    FROM
    (
        SELECT uniqUpTo(7)(dt) AS days_in_service
        FROM agg2
        SAMPLE 1 / 10
        WHERE dt > (yesterday() - 7)
        GROUP BY user_id
    )
    GROUP BY days_in_service
    ORDER BY days_in_service ASC
    7 rows in set. Elapsed: 0.454 sec. 

    It should be noted right away that sampling is not by the percentage of events, but by the percentage of users - and as a result it becomes an incredibly powerful tool.

    Or the same for 4 weeks with 1/100 sampling - about 1% less accurate results are obtained.

    SELECT
        days_in_service,
        100 * count() AS uniques
    FROM
    (
        SELECT uniqUpTo(7)(dt) AS days_in_service
        FROM agg2
        SAMPLE 1 / 100
        WHERE dt > (yesterday() - 28)
        GROUP BY user_id
    )
    GROUP BY days_in_service
    ORDER BY days_in_service ASC
    28 rows in set. Elapsed: 0.287 sec. 

    Aggregation on the other hand


    When aggregating by (dt, user_id), we don’t lose the user, we don’t miss out on information about his interaction with the service, but, of course, we lose the metrics about a specific interaction object. But you don’t have to lose this either - let's build an aggregate using
    (dt, video_owner_id, video_id), adhering to the same ideas. We keep the information about the video as much as possible, we don’t miss out on data about the interaction of the video with the user, and we completely miss information about the specific user.

    SELECT starts
    FROM agg3
    WHERE (dt = yesterday()) AND (video_id = ...) AND (video_owner_id = ...)
    1 rows in set. Elapsed: 0.030 sec

    Or the top 10 video views yesterday:

    SELECT
        video_id,
        video_owner_id,
        watches
    FROM video_agg_video_d1
    WHERE dt = yesterday()
    ORDER BY watches DESC
    LIMIT 10
    10 rows in set. Elapsed: 0.035 sec.

    As a result, we have a scheme of aggregates of the form:

    • aggregation by “date, user” within the product;
    • aggregation by “date, object of interaction” within the product;
    • sometimes other projections arise.

    Azkaban and TeamCity


    Finally, a few words about the infrastructure. Our aggregate collection starts at night, starting with OPTIMIZE on each of the tables with raw data to trigger an extraordinary data merge in ReplicatedReplacingMergeTree. The operation can last long enough, however, it is necessary to remove takes, if they occur. It is worth noting that so far I have never encountered duplicates, but there are no guarantees that they will not appear in the future.

    The next step is the creation of aggregates. These are bash scripts in which the following occurs:

    • first we get the number of shards and some host from the shard:

      SELECT
          shard_num,
          any(host_name) AS host
      FROM system.clusters
      GROUP BY shard_num
    • then the script executes sequentially for each shard (clickhouse-client -h $ host) a request of the form (for aggregates by users):

      INSERT INTO ... SELECT ... FROM ... SAMPLE 1/$shards_count OFFSET 1/$shard_num

    This is not entirely optimal and can generate a lot of network interaction between hosts. However, when adding new shards, everything continues to work out of the box, the locality of the data for the units is maintained, so we decided not to worry much about it.

    We have Azkaban as the task scheduler. I would not say that this is a super-convenient tool, but it copes with its task perfectly, including when it comes to building slightly more complex pipelines and when one script needs to wait for several others to complete.

    The total time that is spent on converting the events now existing into aggregates is 15 minutes.

    Testing


    Every morning we run automated tests that answer questions regarding raw data, as well as the readiness and quality of aggregates: “Check that for yesterday there were no more than half a percent less data or unique data on raw data or in aggregates compared to the same day a week ago. "

    Technologically, these are ordinary unit tests using JUnit and implementing the jdbc driver for ClickHouse. The run of all tests is launched in TeamCity and takes about 30 seconds in 1 thread, and in case of failures we get VKontakte notifications from our wonderful TeamCity bot.

    Conclusion


    Use only stable versions of ClickHouse and your hair will be soft and silky. It is worth adding that ClickHouse does not slow down .

    Also popular now: