Yandex opens ClickHouse

    Today, the internal development of Yandex - the ClickHouse database management system , has become available to everyone. Sources are published on GitHub under the Apache 2.0 license. ClickHouse allows you to perform analytical queries interactively on real-time data. The system can scale up to tens of trillions of records and petabytes of stored data. Using ClickHouse opens up opportunities that were previously hard to imagine: you can save the entire data stream without preliminary aggregation and quickly receive reports in any sections. ClickHouse was developed in Yandex for the tasks of Yandex.Metrica , the second largest web analytics system in the world.





    In this article we will tell how and why ClickHouse appeared in Yandex and what it can do; compare it with other systems and show how to raise it with yourself with minimal effort.

    Where is the ClickHouse Niche?


    Why would someone need to use ClickHouse when there are many other technologies for working with big data?

    If you just need to store logs, you have many options. You can upload logs to Hadoop, analyze them using Hive, Spark or Impala. In this case, it is not necessary to use ClickHouse. Everything becomes more complicated if you need to perform queries interactively on non-aggregated data entering the system in real time. To solve this problem, open technologies of suitable quality have not yet existed.

    There are separate areas in which other systems can be used. They can be classified as follows:

    1. Commercial OLAP DBMS for use in your own infrastructure.
      Examples: HP Vertica , Actian Vector , Actian Matrix , EXASol , Sybase IQ and others.
      Our differences: we made the technology open and free.

    2. Cloud solutions.
      Examples: Amazon Redshift and Google BigQuery .
      Our differences: the client can use ClickHouse in their infrastructure and not pay for the clouds.

    3. Add-ons for Hadoop.
      Examples: Cloudera Impala , Spark SQL , Facebook Presto , Apache Drill .
      Our differences:
      • unlike Hadoop, ClickHouse allows you to serve analytical queries even within the framework of a mass service available publicly, such as Yandex.Metrica;
      • ClickHouse does not require deploying a Hadoop infrastructure to function; it is easy to use and is suitable even for small projects;
      • ClickHouse allows you to download data in real time and is independently engaged in their storage and indexing;
      • unlike Hadoop, ClickHouse operates in geographically distributed data centers.

    4. Open-source OLAP DBMS.
      Examples: InfiniDB , MonetDB , LucidDB .
      The development of all these projects is abandoned, they were never mature enough and, in fact, never left the alpha version. These systems were not distributed, which is critical for big data processing. The active development of ClickHouse, the maturity of the technology and the orientation towards the practical needs that arise when processing big data are ensured by the tasks of Yandex. Without using “in battle” on real tasks that go beyond the capabilities of existing systems, it would be impossible to create a quality product.

    5. Open-source analytics systems that are not Relational OLAP DBMSs.
      Examples: Metamarkets Druid , Apache Kylin .
      Our differences: ClickHouse does not require data pre-aggregation. ClickHouse supports a dialect of the SQL language and provides the convenience of relational DBMSs.

    Within the rather narrow niche in which ClickHouse is located, it still has no alternatives. Within a wider field of application, ClickHouse may be more advantageous than other systems in terms of speed of processing requests , resource efficiency and ease of operation.


    Click map in Yandex.Metrica and the corresponding request in ClickHouse

    Initially, we developed ClickHouse exclusively for Yandex.Metrica tasks - to build reports interactively using non-aggregated user action logs. Due to the fact that the system is a full-fledged DBMS and has a very wide functionality, already at the beginning of use in 2012, detailed documentation was written. This distinguishes ClickHouse from many typical internal developments - specialized and embedded data structures for solving specific problems, such as, for example, Metrage and OLAPServer, which I described in a previous article .

    The developed functionality and the availability of detailed documentation led to the fact that ClickHouse gradually spread to many departments of Yandex. It suddenly turned out that the system can be installed according to the instructions and works out of the box, that is, it does not require the involvement of developers. ClickHouse began to be used in Yandex.Direct, Market, Mail, AdFox, Webmaster, in monitoring and in business analytics. ClickHouse allowed either to solve problems for which there were no suitable tools before, or to solve problems by orders of magnitude more effectively than other systems.

    Gradually, a demand arose for using ClickHouse not only in Yandex internal products. For example, in 2013, ClickHouse was used to analyze metadata about the events of the LHCb experiment in CERN . The system could be used more widely, but at that time it was hindered by a closed status. Another example: Yandex.Tank's open-source technology inside Yandex uses ClickHouse to store telemetry data, while for external users only MySQL was available as a database, which is poorly suited for this task.

    As the user base expanded, it became necessary to spend a little more effort on development, although not very much compared to the labor required to solve the metric tasks. But in return we get an improvement in the quality of the product, especially in terms of usability.

    The expansion of the user base allows us to consider examples of use that without this would hardly have crossed my mind. It also allows you to quickly find bugs and inconveniences that are important, including for the main use of ClickHouse - in Metric. Without a doubt, all this improves the quality of the product. Therefore, it is beneficial for us to make ClickHouse open today.

    How to Stop Being Afraid and Start Using ClickHouse


    Let's try working with ClickHouse as an example of "toy" open data - information about flights to the USA from 1987 to 2015. This cannot be called big data (166 million rows in total, 63 GB of uncompressed data), but you can quickly download it and start experimenting. You can download the data from here .

    Data can also be downloaded from the source. How to do this is written here .

    To get started, install ClickHouse on one server. Below you will also see how to install ClickHouse on a cluster with sharding and replication.

    On Ubuntu and Debian Linux, you can install ClickHouse from ready-made packages . On other Linux systems, you can build ClickHouse from sourceand install it yourself.

    The clickhouse-client package contains the clickhouse-client program - ClickHouse client for working interactively. The clickhouse-server-base package contains the clickhouse-server binary, and clickhouse-server-common contains the configuration files for the server.

    Server configuration files are located in / etc / clickhouse-server /. The main thing that you should pay attention to before starting work is the path element - the data storage location. It is not necessary to modify the config.xml file directly - this is not very convenient when updating packages. Instead, you can override the necessary items in the files in the config.d directory .
    It also makes sense to pay attention to the permissions settings .

    The server does not start independently when installing the package and does not restart itself during the update.
    To start the server, run:

    sudo service clickhouse-server start

    Server logs are located by default in the directory / var / log / clickhouse-server /.
    After the Ready for connections message appears in the log, the server will accept connections.

    To connect to the server, use the clickhouse-client program.

    Short help
    Interactive work:
    
    clickhouse-client
    clickhouse-client --host=... --port=... --user=... --password=...
    

    Enable multiline queries:
    
    clickhouse-client -m
    clickhouse-client --multiline
    

    Executing requests in batch mode:
    
    clickhouse-client --query='SELECT 1'
    echo 'SELECT 1' | clickhouse-client
    

    Insert data in a given format:
    
    clickhouse-client --query='INSERT INTO table VALUES' < data.txt
    clickhouse-client --query='INSERT INTO table FORMAT TabSeparated' < data.tsv
    



    Create a table for test data


    Table creation
    
    $ clickhouse-client --multiline
    ClickHouse client version 0.0.53720.
    Connecting to localhost:9000.
    Connected to ClickHouse server version 0.0.53720.
    :) CREATE TABLE ontime
    (
        Year UInt16,
        Quarter UInt8,
        Month UInt8,
        DayofMonth UInt8,
        DayOfWeek UInt8,
        FlightDate Date,
        UniqueCarrier FixedString(7),
        AirlineID Int32,
        Carrier FixedString(2),
        TailNum String,
        FlightNum String,
        OriginAirportID Int32,
        OriginAirportSeqID Int32,
        OriginCityMarketID Int32,
        Origin FixedString(5),
        OriginCityName String,
        OriginState FixedString(2),
        OriginStateFips String,
        OriginStateName String,
        OriginWac Int32,
        DestAirportID Int32,
        DestAirportSeqID Int32,
        DestCityMarketID Int32,
        Dest FixedString(5),
        DestCityName String,
        DestState FixedString(2),
        DestStateFips String,
        DestStateName String,
        DestWac Int32,
        CRSDepTime Int32,
        DepTime Int32,
        DepDelay Int32,
        DepDelayMinutes Int32,
        DepDel15 Int32,
        DepartureDelayGroups String,
        DepTimeBlk String,
        TaxiOut Int32,
        WheelsOff Int32,
        WheelsOn Int32,
        TaxiIn Int32,
        CRSArrTime Int32,
        ArrTime Int32,
        ArrDelay Int32,
        ArrDelayMinutes Int32,
        ArrDel15 Int32,
        ArrivalDelayGroups Int32,
        ArrTimeBlk String,
        Cancelled UInt8,
        CancellationCode FixedString(1),
        Diverted UInt8,
        CRSElapsedTime Int32,
        ActualElapsedTime Int32,
        AirTime Int32,
        Flights Int32,
        Distance Int32,
        DistanceGroup UInt8,
        CarrierDelay Int32,
        WeatherDelay Int32,
        NASDelay Int32,
        SecurityDelay Int32,
        LateAircraftDelay Int32,
        FirstDepTime String,
        TotalAddGTime String,
        LongestAddGTime String,
        DivAirportLandings String,
        DivReachedDest String,
        DivActualElapsedTime String,
        DivArrDelay String,
        DivDistance String,
        Div1Airport String,
        Div1AirportID Int32,
        Div1AirportSeqID Int32,
        Div1WheelsOn String,
        Div1TotalGTime String,
        Div1LongestGTime String,
        Div1WheelsOff String,
        Div1TailNum String,
        Div2Airport String,
        Div2AirportID Int32,
        Div2AirportSeqID Int32,
        Div2WheelsOn String,
        Div2TotalGTime String,
        Div2LongestGTime String,
        Div2WheelsOff String,
        Div2TailNum String,
        Div3Airport String,
        Div3AirportID Int32,
        Div3AirportSeqID Int32,
        Div3WheelsOn String,
        Div3TotalGTime String,
        Div3LongestGTime String,
        Div3WheelsOff String,
        Div3TailNum String,
        Div4Airport String,
        Div4AirportID Int32,
        Div4AirportSeqID Int32,
        Div4WheelsOn String,
        Div4TotalGTime String,
        Div4LongestGTime String,
        Div4WheelsOff String,
        Div4TailNum String,
        Div5Airport String,
        Div5AirportID Int32,
        Div5AirportSeqID Int32,
        Div5WheelsOn String,
        Div5TotalGTime String,
        Div5LongestGTime String,
        Div5WheelsOff String,
        Div5TailNum String
    )
    ENGINE = MergeTree(FlightDate, (Year, FlightDate), 8192);
    


    We have created a table of type MergeTree . MergeTree family tables are recommended for all serious applications. Such tables contain a primary key, by which the data is incrementally sorted, which allows you to quickly perform queries on the range of the primary key.

    For example, if we have logs of the advertising network and we need to show reports for specific clients-advertisers, then the primary key in the table should begin with the client identifier, so that to get data for one client, it was enough to read only a small range of data.

    Loading data into a table


    xz -v -c -d < ontime.csv.xz | clickhouse-client --query="INSERT INTO ontime FORMAT CSV"

    The INSERT query in ClickHouse allows you to load data in any supported format . At the same time, O (1) memory is consumed for data loading. You can transfer any amount of data to the input of an INSERT request. Data should always be inserted in batches of not too small size . In this case, inserting data blocks of size up to max_insert_block_size (= 1,048,576 rows by default) is atomic: the data block is either inserted entirely or not inserted entirely. In the event of a disconnection during the insertion process, you may not know if a data block has been inserted. To achieve exactly-once semantics for replicated tables, idempotency is supported: you can insert the same data block repeatedly, possibly on a different replica, and it will be inserted only once. In this example, we are inserting data from localhost, so we don’t worry about bundling and exactly-once semantics.

    An INSERT query on tables of type MergeTree is non-blocking, as is SELECT. After loading the data or even during the loading process, we can already perform SELECTs.

    In this example, some non-optimality is that the table uses the String data type when Enum fitsor numeric type. If the set of different string values ​​is notoriously small (example: name of the operating system, mobile phone manufacturer), then for maximum performance, we recommend using Enum or numbers. If the set of strings is potentially unlimited (example: search query, URL), then use the String data type.

    Secondly, we note that in this example, the table structure contains redundant columns Year, Quarter, Month, DayOfMonth, DayOfWeek, while only one FlightDate is enough. Most likely, this was done for the effective operation of other DBMSs, in which functions for manipulating the date and time may not work fast enough. In the case of ClickHouse, this is not necessary, since the corresponding functionswell optimized. However, extra columns are not a problem: since ClickHouse is a column-based DBMS , you can afford to have quite a few columns in a table. Hundreds of columns are fine for ClickHouse.

    Examples of working with downloaded data



    • Which destinations were the most popular in 2015;
      SELECT
          OriginCityName,
          DestCityName,
          count(*) AS flights,
          bar(flights, 0, 20000, 40)
      FROM ontime WHERE Year = 2015 GROUP BY OriginCityName, DestCityName ORDER BY flights DESC LIMIT 20
      

      SELECT
          OriginCityName < DestCityName ? OriginCityName : DestCityName AS a,
          OriginCityName < DestCityName ? DestCityName : OriginCityName AS b,
          count(*) AS flights,
          bar(flights, 0, 40000, 40)
      FROM ontime WHERE Year = 2015 GROUP BY a, b ORDER BY flights DESC LIMIT 20
      


    • from which cities more flights depart;
      SELECT OriginCityName, count(*) AS flights FROM ontime GROUP BY OriginCityName ORDER BY flights DESC LIMIT 20
      

    • from which cities you can fly in the maximum number of directions;
      SELECT OriginCityName, uniq(Dest) AS u FROM ontime GROUP BY OriginCityName ORDER BY u DESC LIMIT 20
      

    • how the delay of flights departure depends on the day of the week;
      SELECT DayOfWeek, count() AS c, avg(DepDelay >  60) AS delays FROM ontime GROUP BY DayOfWeek ORDER BY DayOfWeek
      

    • from which cities, planes are more often delayed with a departure of more than an hour;
      SELECT OriginCityName, count() AS c, avg(DepDelay >  60) AS delays
      FROM ontime
      GROUP BY OriginCityName
      HAVING c >  100000
      ORDER BY delays DESC
      LIMIT 20
      

    • what are the longest flights;
      SELECT OriginCityName, DestCityName, count(*) AS flights, avg(AirTime) AS duration
      FROM ontime
      GROUP BY OriginCityName, DestCityName
      ORDER BY duration DESC
      LIMIT 20
      

    • distribution of arrival delay time by airline;
      SELECT Carrier, count() AS c, round(quantileTDigest(0.99)(DepDelay), 2) AS q
      FROM ontime GROUP BY Carrier ORDER BY q DESC
      

    • which airlines have stopped flights;
      SELECT Carrier, min(Year), max(Year), count()
      FROM ontime GROUP BY Carrier HAVING max(Year) < 2015 ORDER BY count() DESC
      

    • which cities began to fly more in 2015;
      SELECT
          DestCityName,
          sum(Year = 2014) AS c2014,
          sum(Year = 2015) AS c2015,
          c2015 / c2014 AS diff
      FROM ontime
      WHERE Year IN (2014, 2015)
      GROUP BY DestCityName
      HAVING c2014 >  10000 AND c2015 >  1000 AND diff >  1
      ORDER BY diff DESC
      

    • flights to which cities are more dependent on seasonality.
      SELECT
          DestCityName,
          any(total),
          avg(abs(monthly * 12 - total) / total) AS avg_month_diff
      FROM
      (
          SELECT DestCityName, count() AS total
          FROM ontime GROUP BY DestCityName HAVING total > 100000
      )
      ALL INNER JOIN
      (
          SELECT DestCityName, Month, count() AS monthly
          FROM ontime GROUP BY DestCityName, Month HAVING monthly > 10000
      )
      USING DestCityName
      GROUP BY DestCityName
      ORDER BY avg_month_diff DESC
      LIMIT 20
      



    How to install ClickHouse on a cluster of multiple servers


    From the point of view of installed software, the ClickHouse cluster is homogeneous, without dedicated nodes. You need to install ClickHouse on all servers of the cluster, then register the cluster configuration in the configuration file, create a local table on each server and then create a Distributed table .

    A distributed table is a “view” of local tables on a ClickHouse cluster. With SELECTs from a distributed table, the request will be processed in a distributed manner, using the resources of all the shards of the cluster. You can declare configurations of several different clusters and create several Distributed tables that look at different clusters.

    A cluster of three shards, each of which contains data on only one replica
    example-perftest01j.yandex.ru9000example-perftest02j.yandex.ru9000example-perftest03j.yandex.ru9000


    Creating a local table:
    CREATE TABLE ontime_local (...) ENGINE = MergeTree(FlightDate, (Year, FlightDate), 8192);

    Creating a distributed table that looks at local tables on the cluster:
    CREATE TABLE ontime_all AS ontime_local ENGINE = Distributed(perftest_3shards_1replicas, default, ontime_local, rand());

    You can create a Distributed table on all servers in the cluster - then to perform distributed queries, you can access any server in the cluster. In addition to the Distributed table, you can also use the table function remote .

    In order to distribute the table across several servers, make an INSERT SELECT in the Distributed table.

    INSERT INTO ontime_all SELECT * FROM ontime;

    Note that for re-shuffling large tables, this method is not suitable; instead, use the built-in re-shuffling functionality .

    As expected, more or less long requests work several times faster if they are performed on three servers, and not on one.
    Example


    You may notice that the result of calculating the quantiles is slightly different. This is because the implementation of the t-digest algorithm is non-deterministic - it depends on the order of data processing.

    In this example, we used a cluster of three shards, each shard of which consists of one replica. For real-world tasks, with a view to fault tolerance, each shard should consist of two or three replicas located in different data centers. (An arbitrary number of replicas is supported.)

    Cluster configuration from one shard, on which data is located in three replicas
    
        ...
        example-perftest01j.yandex.ru9000example-perftest02j.yandex.ru9000example-perftest03j.yandex.ru9000


    Replication (metadata storage and coordination) requires ZooKeeper . ClickHouse will independently ensure the consistency of data on replicas and perform disaster recovery. It is recommended that the ZooKeeper cluster be located on separate servers.

    In fact, using ZooKeeper is not necessary: ​​in the simplest cases, you can duplicate data by writing it to all replicas manually, and do not use the built-in replication mechanism. But this method is not recommended - in fact, in this case, ClickHouse will not be able to ensure the consistency of data on replicas.

    Write the ZooKeeper addresses in the configuration file
    zoo01.yandex.ru2181zoo02.yandex.ru2181zoo03.yandex.ru2181


    We also write the substitutions that identify the shard and replica - they will be used to create the table.

    0101

    If there are no other replicas when creating the replicated table, the first replica is created, and if there is, a new replica is created that clones the data of the existing replicas. You can either immediately create all the replica tables and then load the data into them, or first create a part of the replicas, and then add others - already after loading or during data loading.

    CREATE TABLE ontime_replica (...)
    ENGINE = ReplicatedMergeTree(
        '/clickhouse_perftest/tables/{shard}/ontime',
        '{replica}',
        FlightDate,
        (Year, FlightDate),
        8192);
    

    Here you can see that we are using the ReplicatedMergeTree table type , specifying the path in ZooKeeper containing the shard identifier as well as the replica identifier as parameters.

    INSERT INTO ontime_replica SELECT * FROM ontime;

    Replication works in multi-master mode. You can insert data into any replica, and the data is automatically scattered across all replicas. Moreover, replication is asynchronous, and at a given point in time, replicas may not contain all recently recorded data. To record data, the availability of at least one replica is sufficient. The remaining replicas will download new data and restore consistency as soon as they become active. Such a scheme allows the loss of newly inserted data.

    How to influence the development of ClickHouse


    If you have any questions, you can ask them in the comments to this article or on StackOverflow with the tag "clickhouse". You can also create a topic for discussion in a group or write your proposal on the newsletter clickhouse-feedback@yandex-team.ru. And if you want to try working on ClickHouse from the inside, we invite you to join our team in Yandex. We have open vacancies and internships .

    Also popular now: