We are developing a system of real-time fulltext search for error logs based on ClickHouse from Yandex

    In this article I will talk about how to develop a system for indexing and full-text search for error-logs (or any other logs) based on the Yandex database management system called ClickHouse. Yandex wrote about the base on Habr, first when the base was closed , and then when they opened it up . The database is primarily intended for analytics and for the implementation of the Yandex.Metrica service, but it can actually be used for anything if it suits you to load data in batches, delete them in huge batches as well, and never update individual rows.

    What do we do


    We will implement a system for indexing and searching by error logs. At the same time, it is believed that you have already managed to deliver the logs to the central server (or several servers) and have already put the message texts into the database, that is, you already have a table in some database of the following type:

    CREATE TABLE Messages (
        message_id BIGINT PRIMARY KEY AUTO_INCREMENT,
        created_ts DATETIME,
        message_text BLOB
    )
    


    We will learn how to quickly submit search results by such a log (that is, always sorted by time) and index it in real time.


    Why not ElasticSearch / Sphinx / MySQL / other_solution?


    It seems interesting to me to see what ClickHouse is and what tasks it can be solved with. The purpose of the article is to give people an overview and food for thought, rather than give a turnkey solution. Elastic, Sphinx and others are ready-made search engines, while ClickHouse is a general-purpose database from which you can blind anything. Also, I have an opinion that the search system presented in the article based on ClickHouse will cope with the task of searching by logs better than Sphinx, and at the same time it will not be necessary to use 2 types of indexes (realtime and regular). Your experience may differ, so I recommend that you first try to make a prototype before introducing such a system into production.

    Server installation


    Entrust the installation of ClickHouse ( github ) to your system administrator or install it yourself from the docker if you do not want to solve anything, or you are just too lazy. If you build yourself from source codes, you will need up to 30 GB of space , keep this in mind.

    Client Installation


    If for some reason you do not have curl or php on your system, install them. Further examples will use curl as the base API and PHP to write an indexing and search system.

    Preparing data structures for the index


    As a rule, structures for full-text search in search engines are very simple. The structure is called the Inverted Index , and we will implement it in a slightly simplified form. We will use the default engine, recommended for data that has both a primary key and a date - MergeTree :

    CREATE TABLE FT (
    EventDate Date,
    word_id UInt32,
    message_id UInt64
    ) ENGINE=MergeTree(EventDate, (word_id, message_id), 8192);
    


    To create a table in the database, you can use the following command:

    $ cat create.sql | curl 'http:/hostname:8123/?query=' --data-binary @-
    

    In this command, the create.sql file must contain the request that needs to be executed, and hostname is the host with ClickHouse raised, 8123 is the default port.

    In the above structure, word_id is the id of the word in the dictionary (which we will create later, the word_text => word_id correspondence is stored in the dictionary), and message_id is the id of the corresponding entry in the log table (analogous to document_id for Sphinx).

    Parameters for the MergeTree engine: the first field EventDate means the name of the column with the date of the event, the second column (word_id, message_id) defines the primary key (in fact, a normal index) and 8192 is the setting that affects the granularity of the index, we will leave it by default.

    MergeTree sorts data by primary key and breaks them by date, so searching by a specific day and a specific word with sorting by message_id should be very fast.

    We create structures for the dictionary


    In order to fill this index, we need a dictionary-like structure, which is needed to store numbers in ClickHouse instead of strings. The dictionary can be created in the database, and if it is MySQL, then the structure will look like this:

    CREATE TABLE Words (
      id int(11) unsigned NOT NULL AUTO_INCREMENT,
      word varchar(150) COLLATE ascii_bin NOT NULL DEFAULT '',
      PRIMARY KEY (id),
      UNIQUE KEY word (word)
    ) ENGINE=InnoDB DEFAULT CHARSET=ascii COLLATE=ascii_bin;
    

    Pay attention to the ASCII comparison, this allows you to greatly increase the performance of text indexes in the case when all the words are in English. If you do not have all the logs in English, then I recommend that you revise your views. You can leave the comparison by default (utf8_unicode_ci).

    Indexing process


    In order to control the indexing process and to initiate the initial indexing, you can create a separate table in MySQL with a queue for messages that we have not yet indexed:

    CREATE TABLE IndexQueue (
      message_id bigint(20) unsigned NOT NULL DEFAULT '0',
      shard_id int(11) NOT NULL,
      PRIMARY KEY (shard_id,message_id)
    );
    


    To populate this table for the first time, you can use the following query:

    INSERT IGNORE INTO IndexQueue (message_id, shard_id) SELECT message_id, message_id % 4 FROM Messages
    


    Here 4 is the number of indexer threads that we will use. On PHP7, the code from the example below gives a performance of about 3.5 mb / s per process, in 4 streams, respectively, 14 mb / s. If you write more error logs than 14 mb / s, then you probably need to urgently fix your production and you are not up to the fact that full-text search is a little behind :).

    The indexer algorithm will be as follows:
    1. View entries in the queue (IndexQueue) for the specified shard
    2. Select a bunch of records and select words in each message and put them into an $ index array of the form message_id => array (word1, ..., wordN)
    3. For each word, find the corresponding word_id in the dictionary, and if there is no such word, add
    4. Insert records in the ClickHouse index for all the words of all messages


    Below is a slightly simplified code for parsing the queue and indexing, you will have to modify it yourself if you want to use it at home:

    Simplified indexer implementation in PHP
    const CH_HOST = ':8123';
    const MAX_WORD_LEN = 150; // должно соответствовать тому, что в таблице Words
    $mysqli = mysql_connect(...); // коннект к базе
    $limit = 10000; // максимальный размер пачки сообщений при индексации
    $shard_id = intval($argv[1] ?? 0); // номер шарда (указывается первым аргументом скрипту, если не указан, то будет 0)
    echo "Indexing shard $shard_id\n";
    while ($mysqli->query('SELECT MAX(message_id) FROM IndexQueue WHERE shard_id = ' . $shard_id)->fetch_row()[0]) {
        $index = "";
        $start = microtime(true);
        $ids = [];
        foreach ($mysqli->query('SELECT message_id FROM IndexQueue WHERE shard_id = ' . $shard_id . ' ORDER BY message_id LIMIT ' . $limit)->fetch_all() as $row) {
            $ids[] = $row[0];
        }
        if (empty($ids)) {
            break;
        }
        $message_texts = $mysqli->query('SELECT message_id, `message_text` FROM Messages WHERE message_id IN(' .  implode(', ', $ids) . ')')->fetch_all(MYSQLI_ASSOC);
        $unknown_words = [];
        $msg_words = [];
        $total_length = 0;
        foreach ($message_texts as $msg) {
            $msg_id = $msg['message_id'];
            $text = $msg['message_text'];
            $total_length += strlen($text);
            $words = array_unique(
                array_filter(
                    preg_split('/\W+/s', $text),
                    function($a) {
                        $len = strlen($a);
                        return $len >= 2 && $len <= MAX_WORD_LEN;
                    }
                )
            );
            foreach ($words as $word) {
                $unknown_words[$word] = true;
            }
            $msg_words[$msg_id] = $words;
        }
        if (!count($message_texts)) {
            $mysqli->query('DELETE FROM IndexQueue WHERE shard_id = ' . $shard_id . ' AND message_id IN(' . implode(', ', $ids) . ')');
            continue;
        }
        if (!count($unknown_words)) {
            var_dump($message_texts);
            die("Empty message texts!\n");
        }
        $words_res = $mysqli->query('SELECT word, id FROM Words WHERE word IN(' . INstr(array_keys($unknown_words)) . ')')->fetch_all(MYSQLI_ASSOC);
        $word_ids = [];
        foreach ($words_res as $row) {
            $word_ids[$row['word']] = $row['id'];
            unset($unknown_words[$row['word']]);
        }
        if (count($unknown_words)) {
            echo "Inserting " . count($unknown_words) . " words into dictionary\n";
            $values = [];
            foreach ($unknown_words as $word => $_) {
                $values[] = "('" . $mysqli->escape_string($word) . "')";
            }
            $mysqli->query('INSERT IGNORE INTO Words (word) VALUES ' . implode(',', $values));
            $words_res = $mysqli->query('SELECT word, id FROM Words WHERE word IN(' . INstr(array_keys($unknown_words)) . ')')->fetch_all(MYSQLI_ASSOC));
            foreach ($words_res as $row) {
                $word_ids[$row['word']] = $row['id'];
                unset($unknown_words[$row['word']]);
            }
        }
        if (count($unknown_words)) {
            die("Could not fill dictionary\n");
        }
        foreach ($msg_words as $msg_id => $words) {
            foreach ($words as $word) {
                // здесь неявно предполагается, что unix timestamp из message_id можно вычислить путем отрезания последних 32 бит
                $index .= date('Y-m-d', $msg_id >> 32) . "\t" . $word_ids[$word] . "\t" . $msg_id . "\n";
            }
        }
        $ch = curl_init('http://' . CH_HOST . '/?query=' . rawurlencode('INSERT INTO FT FORMAT TabSeparated'));
        curl_setopt($ch, CURLOPT_POSTFIELDS, $index);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
        $res = curl_exec($ch);
        if ($res !== "") {
            die($res . "\n");
        }
        $mysqli->query('DELETE FROM IndexQueue WHERE shard_id = ' . $shard_id . ' AND message_id IN(' . implode(', ', $ids) . ')');
        echo "Speed " . round($total_length / 1024 / (microtime(true) - $start), 2) . " KiB/sec\n";
    }
    function INstr(array $values) {
        global $mysqli;
        $res = [];
        foreach ($values as $v) $res[] = "'" . $mysqli->escape_string($v) . "'";
        return implode(',', $res);
    }
    



    Index Search


    We do not need search ranking algorithms, which are so rich in Elastic, Sphinx and other solutions, and we just need sorting by date, so the search will be extremely simple. In fact, to find something by the query “hello world 111”, we first need to find the word_id in the dictionary (suppose it will be 1, 2 and 3, respectively) and execute the following query:

    SELECT message_id FROM FT
    WHERE word_id IN(1, 2, 3)
    GROUP BY message_id
    HAVING uniq(word_id) = 3
    ORDER BY message_id DESC
    LIMIT 50
    


    Please note that every document that we are looking for must contain all the words from the query, so we write HAVING uniq (word_id) = 3 (uniq (word_id) is an analogue of COUNT (DISTINCT word_id) in regular SQL databases) where 3 is the number of different words in the query.

    We assume that sorting by message_id will mean sorting by time. This can be achieved by writing UNIX TIMESTAMP events in seconds in the first 32 bits of message_id, and microseconds of the event (if any) and random numbers in the second half.

    results


    To test the performance of this solution, I took an error log database from our development server with a capacity of 3 GB (1.6 million events) and indexed. The indexer showed an indexing speed of 3.5 Mb / s per stream, which for my case was more than enough. At the moment, we are using Sphinx for full-text search in error logs, so I can roughly compare the performance of these two solutions, since they work in approximately the same conditions on the same hardware. Sphinx indexing (at least building a non-realtime index) is several times faster per single core, but keep in mind that the sphinx indexer is written in C ++, and ours is written in PHP :).

    To calculate the heaviest query for ClickHouse (and obviously for Sphinx too), I decided to find the most popular words in the index:
    $ echo 'SELECT word_id, count() AS cnt FROM FT GROUP BY word_id ORDER BY cnt DESC LIMIT 5' | curl 'http://hostname:8123/?query=' --data-binary @-
    5       1669487
    187     1253489
    183     1217494
    159     1216255
    182     1199507
    


    The request took 130 ms with a total of 86 million records, impressive! (2 cores on the test machine).

    So, if you take the top 5 and turn word_id into normal words, then the request for execution will be the following: "php wwwrun _packages ScriptFramework badoo". We find these words in almost every message and can be safely thrown out of the index, but I left them to test the search performance.

    We execute the request in ClickHouse:
    SELECT message_id FROM FT WHERE word_id IN(189, 159, 187, 5, 183) GROUP BY message_id HAVING uniq(word_id) = 5 ORDER BY message_id DESC LIMIT 51
    


    And a similar query in Sphinx:
    SELECT message_id FROM FT WHERE MATCH('php wwwrun _packages ScriptFramework badoo') ORDER BY message_id DESC LIMIT 51
    


    Request execution times (both daemons can use both cores to execute the request, everything is placed in RAM):

    ClickHouse: 700 ms
    Sphinx: 1500 ms

    Given that Sphinx can rank results, but our system does not, Sphinx has quite good time. Do not forget that during the execution of the request, both daemons had to combine the results for ~ 6 million documents (1.2 million documents per word) and did it on a modest 2 cores. It is possible that with proper configuration, the times indicated in this (slightly synthetic) test will change places, but nevertheless, I am personally very pleased with the results and we can safely say that ClickHouse suits very well for building a real-time search by logs.

    Thank you for reading the article to the end and I hope you enjoyed it.

    PS I am not an employee of Yandex and is not connected with Yandex in any way, I just wanted to try their database for a real task :).

    References


    1. ClickHouse Website
    2. Article on Habré to open-source
    3. Open-source article on Habré
    4. Github
    5. ClickHouse Docker


    * UPD: * It is better to use the uniqUpTo (N) function , since uniq is approximate, although it gives a very accurate result with the number of elements less than 65536.

    Also popular now: