Centralized application logs using heka + elasticsearch + kibana

This article describes how to configure central logging for different types of applications (Python, Java (java.util.logging), Go, bash) using a fairly new Heka project.

Heka is developed in Mozilla and written in Go. That is why I use it instead of logstash, which has similar capabilities.

Heka is based on plugins that have five types:
  1. Input - somehow receive data (listens to ports, reads files, etc.);
  2. Decoders - process incoming requests and translate them into internal structures for messages;
  3. Filters - perform any actions with messages;
  4. Encoders (unclear how to translate) - encode internal messages in formats that are sent through output plugins;
  5. Weekend - send data somewhere.

For Java applications, for example, the input plugin is LogstreamerInput, which looks for changes to the log file. New lines in the log are processed by the PayloadRegexDecoder decoder (according to the specified format) and then sent to elasticsearch via the ElasticSearchOutput output plugin. The output plugin in turn encodes the message from the internal structure into elasticsearch format through ESJsonEncoder.

Heka Installation

All installation methods are described on the website ( http://hekad.readthedocs.org/en/v0.8.2/installing.html#binaries ). But the easiest way is to download a ready-made binary package for your system from the page https://github.com/mozilla-services/heka/releases .

Since I use servers under ubuntu, then the description will be for this system. In this case, the installation boils down to installing the deb package itself, configuring the /etc/hekad.toml configuration file and adding upstart to the services.

In the basic setting of /etc/hekad.toml I have to configure the number of processes (I set the number of cores), dashboard (in which you can see which plugins are enabled) and the udp server on port 5565, which is waiting for messages via the google protobuf protocol (used for python and go applications):

maxprocs = 4
type = "DashboardOutput"
address = ":4352"
ticker_interval = 15
address = ""
parser_type = "message.proto"
decoder = "ProtobufDecoder"

Configuration for upstart /etc/init/hekad.conf:

start on runlevel [2345]
exec /usr/bin/hekad -config=/etc/hekad.toml

Logging Python Applications

This uses the https://github.com/kalail/heka-py library and a special handler for the logging module. The code:

import logging
from traceback import format_exception
    import heka
        logging.CRITICAL: heka.severity.CRITICAL,
        logging.ERROR: heka.severity.ERROR,
        logging.WARNING: heka.severity.WARNING,
        logging.INFO: heka.severity.INFORMATIONAL,
        logging.DEBUG: heka.severity.DEBUG,
        logging.NOTSET: heka.severity.NOTICE,
except ImportError:
    heka = None
class HekaHandler(logging.Handler):
    _notified = None
    conn = None
    host = ''
    def __init__(self, name, host=None):
        if host is not None:
            self.host = host
        self.name = name
        super(HekaHandler, self).__init__()
    def emit(self, record):
        if heka is None:
        fields = {
            'Message': record.getMessage(),
            'LineNo': record.lineno,
            'Filename': record.filename,
            'Logger': record.name,
            'Pid': record.process,
            'Exception': '',
            'Traceback': '',
        if record.exc_info:
            trace = format_exception(*record.exc_info)
            fields['Exception'] = trace[-1].strip()
            fields['Traceback'] = ''.join(trace[:-1]).strip()
        msg = heka.Message(
            if self.conn is None:
                self.__class__.conn = heka.HekaConnection(self.host)
            if self.__class__._notified is None:
                print "Sending HEKA message failed"
                self.__class__._notified = True

By default, the handler expects Heka to listen on port 5565.

Logging Go Applications

For logging, I forked a library for logging https://github.com/ivpusic/golog and added the ability to send messages directly to Heka. The result is located here: https://github.com/ildus/golog


import "github.com/ildus/golog"
import "github.com/ildus/golog/appenders"
logger := golog.Default
    "addr": "",
    "proto": "udp",
    "env_version":  "2",
    "message_type": "myserver.log",
logger.Debug("some message")

Java Application Logging

For Java applications, an input plugin of the LogstreamerInput type with a special regexp decoder is used. It reads application logs from files that must be written in a specific format.

The configuration for heka, responsible for reading and decoding logs:

type = "LogstreamerInput"
log_directory = "/some/path/to/logs"
file_match = 'app_(?P\d+\.\d+)\.log'
decoder = "JDecoder"
priority = ["Seq"]
type = "PayloadRegexDecoder"
#Parses com.asdf[INFO|main|2014-01-01 3:08:06]: Server started
match_regex = '^(?P[\w\.]+)\[(?P[A-Z]+)\|(?P[\w\d\-]+)\|(?P[\d\-\s:]+)\]: (?P.*)'
timestamp_layout = "2006-01-02 15:04:05"
timestamp_location = "Europe/Moscow"
INFO = 6
FINE = 6
Type = "myserver.log"
Message = "%Message%"
Logger = "%LoggerName%"
Thread = "%Thread%"
Payload = ""

In the application, you need to change the Formatter through logging.properties. Example logging.properties:

handlers= java.util.logging.FileHandler java.util.logging.ConsoleHandler
java.util.logging.FileHandler.pattern = logs/app_%g.%u.log
java.util.logging.FileHandler.limit = 1024000
java.util.logging.FileHandler.formatter = com.asdf.BriefLogFormatter

BriefLogFormatter Code:

package com.asdf;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.logging.Formatter;
import java.util.logging.LogRecord;
public class BriefLogFormatter extends Formatter {
    private static final DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private static final String lineSep = System.getProperty("line.separator");
     * A Custom format implementation that is designed for brevity.
    public String format(LogRecord record) {
        String loggerName = record.getLoggerName();
        if(loggerName == null) {
            loggerName = "root";
        StringBuilder output = new StringBuilder()
            .append(format.format(new Date(record.getMillis())))
            .append("]: ")
            .append(record.getMessage()).append(' ')
        return output.toString();

Script Logging (bash)

For bash, heka adds the TcpInput input filter (which listens on a specific port) and PayloadRegexDecoder for decoding messages. Configuration in hekad.toml:

address = ""
parser_type = "regexp"
decoder = "TcpPayloadDecoder"
type = "PayloadRegexDecoder"
#Parses space_checker[INFO|2014-01-01 3:08:06]: Need more space on disk /dev/sda6
match_regex = '^(?P[\w\.\-]+)\[(?P[^\|]+)\|(?P[A-Z]+)\|(?P[\d\-\s:]+)\]: (?P.*)'
timestamp_layout = "2006-01-02 15:04:05"
timestamp_location = "Europe/Moscow"
INFO = 6
Type = "scripts"
Message = "%Message%"
Logger = "%LoggerName%"
Hostname = "%Hostname%"
Payload = "[%Hostname%|%LoggerName%] %Message%"

For logging, a function is written that sends messages to a TCP port in the specified format:

    if [ "$1" ]; then
            echo -e "app1[`hostname`|INFO|`date '+%Y-%m-%d %H:%M:%S'`]: $1" | nc 5566 || true
            echo $1

Sending a message with INFO level with type app1:

log "test test test"

Posting records to elasticsearch

The following configuration is added to hekad.conf:

index = "heka-%{Type}-%{2006.01.02}"
es_index_from_timestamp = true
type_name = "%{Type}"
message_matcher = "Type == 'myserver.log' || Type=='scripts' || Type=='nginx.access' || Type=='nginx.error'"
server = "http://:9200"
flush_interval = 5000
flush_count = 10
encoder = "ESJsonEncoder"

Here we indicate where elasticsearch is located, how indexes should be formed, and what types of messages to send there.

View Logs

To view the logs, Kibana 4 is used. It is still in beta, but it is already quite working. To install, you need to download the archive from the page http://www.elasticsearch.org/overview/kibana/installation/ , unzip it to a folder on the server and specify the location of the elasticsearch server in the config / kibana.yml file (elasticsearch_url parameter).

At the first start, you will need to add indexes in the Settings tab. In order to be able to add an index template and correctly define the fields, you must send a test message from each application. Then you can add an index template of the form heka - * (which will show all messages) or heka-scripts- *, thereby separating applications from each other. Then you can go to the Discover tab and see the records themselves.


I showed only part of what can be logged with Heka.
If anyone is interested, I can show part of the Ansible configuration, which automatically installs heka on all servers, and on selected elasticsearch with kibana.

Also popular now: