
Hadoop Part 2: Collecting Data Through Flume
- Tutorial

In a previous post, we looked at the deployment process for a Cloudera-based Hadoop cluster in detail. In this article, we would like to talk in detail about methods and tools for collecting data in Hadoop. You can upload data to the system using simple copying to HDFS, as well as using special tools.
The easiest way to transfer data to the cluster is to copy files through the file manager web interface in the Hue control panel. The web interface is located at http: // [Hue_node]: 8888 / filebrowser / (instead of [Hue_node], the address of the node on which Hue is deployed is indicated). It is intuitive and requires no further explanation.
The web interface is good for novice users; using it is convenient to explore the directory structure of HDFS. At the same time, it is inconvenient for downloading large files (from a few gigabytes).

To download a large number of files or large files, it is preferable to copy the files to HDFS using the hadoop utility. This operation is performed using the following command executed from any server included in the HDFS cluster:
hadoop fs -put file_for_hadoop / path / to / put / file / in / HDFS /
In this case, you can always use traditional pipes or pre-copy files to the server.
The described methods are well suited for situations when it is necessary to transfer existing data to HDFS. But it seems more natural to collect data right away in Hadoop. For this purpose, specialized tools are used. One such tool is being developed as part of the Apache Hadoop project. We are talking about Flume - a universal tool for collecting logs and other data. In this article, we would like to talk about the architecture of Flume and share practical experience in its use.
About Flume
The word flume in translation means "channel". This tool is designed to manage data flows: collect them from various sources and send them to a centralized repository.
The Flume project is being developed by the Apache Software Foundation. The first versions of Flume were released in 2009. All versions released before 2011 (before version 0.9.4) are called Flume OG (OG - old generation, i.e. the old generation). In 2011, work began on a new project branch, under which it is expected to significantly expand the functionality of Flume OG. This thread is known as Flume 728 (by the task number in JIRA, which lists all the main comments and suggestions for improvement) or Flume NG (NG means new generation, i.e. a new generation). The latest version 1.4.0 to date was released in July 2013.
Architecture
Basic concepts
We begin with a description of the Flume architecture by defining basic concepts:
- event (event) - a dataset transmitted by Flume from the point of origin to destination;
- flow - flow of events from the point of origin to the point of destination;
- client - any application that transfers data to the Flume agent;
- agent is an independent process in which components such as sources, channels, and sinks are executed; carries out storage of events and transfer to the next node;
- source - an interface that receives messages through various data transfer protocols. The source transmits the received events to one or more channels. Flume supports the following standards for transferring data from external sources: Avro, log4j, syslog, HTTP Post with a JSON body. Using the ExecSource library, you can implement support for other standards;
- channel - temporary storage for events. The event is in the channel until it is withdrawn from it by the drain. Channels store event queues, which allows you to separate streams and sources with different performance and architecture. Depending on the type of channel, events can be stored in memory, in a regular file on disk, or in a database (for example, a JDBC channel);
- sink is an interface implementation that takes an event from a channel and passes it to the next agent in the stream or stores it in the final storage (for example, HDFS). The sinks that transmit the event to the destination repository are called destination sinks. Examples of final sinks include the HDFS file system, the Hive database, the Solr search server. An example of a typical flow is Avro, which simply sends messages to other agents.
Flow structure
The flow begins with the client, which passes the event to the agent (more precisely, to the source within the agent). The source that received the event transmits it to one or more channels. From the channels, the event is transmitted to the drains that are part of the same agent. He can transfer it to another agent, or (if it is the final agent) to the destination node.
Since the source can transmit events on several channels, flows can be routed to several destination nodes. This is clearly shown in the figure below: the agent reads the event in two channels (Channel 1 and Channel 2), and then transfers them to independent sinks.

Multiple threads can be combined into one. This requires that several sources within the same agent transmit data to the same channel. The interaction scheme of the components when combining the flows is shown in the figure below (here, each of the three agents, including several sources, transmits data to the same channel and then to the drain):

Reliability and error handling
Data transfer between sources and channels, as well as between agents, is carried out using transactions, which ensures data safety.
Error handling is also based on a transactional mechanism. When a thread passes through several different agents and communication problems occur during the passage, events are buffered on the last agent available in the thread. The error processing scheme is presented more clearly in the figure below:

Error handling in a stream: (a) events move from the client to the central repository without communication problems; (b) there is a communication problem in the area between Agent2 and the central repository, and events are buffered on Agent2; (c) after fixing communication problems, events buffered on Agent2 were sent to the central repository.
Install Flume through Cloudera Manager
Since we have already deployed the Cloudera Hadoop cluster (see previous article), we will install Flume using the Cloudera Manager. On the page with the list of cluster services, select “Actions” → “Add a Service”.

Select “Flume” and click on the “Continue” button:

Now select the Zookeeper service with which our Flume service will be associated. You can create multiple clusters or services of the same type controlled by various Zookeeper instances.

Next, we indicate the hosts in the cluster on which Flume agents will be installed. You can configure several independent services with agents located on different hosts and having different settings. In our case, we will select all available hosts:

Click on the “Continue” button. Soon a message will appear on the screen about the successful addition of a new service:

Now let's move on to the Flume dashboard by choosing “Services” → “flume1” in the Cloudera Manager:

The service page opens, containing the following tabs: general status, service instances (in this case, agents are listed in this tab), service management commands (enable, disable, reboot), service settings, access rights settings, statistics graphs and load. Open the settings tab “Configuration →“ View and Edit ”:

By default, the settings of all Flume agents are stored in one configuration file (its contents are displayed in the Configuration File field). This file is shared by all agents and is inherited by each of them:

Configure Flume Agent
Let's look at an example of setting up the flume agent, which collects syslog logs over UDP and stores them in HDFS in a cluster.
### syslog cfg a1.sources = r1 a1.channels = c1 a1.sinks = k1 # source a1.sources.r1.type = syslogudp a1.sources.r1.port = 5140 a1.sources.r1.host = cdh2.example.com a1.sources.r1.channels = c1 # insert timestamp a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp # sink a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = / log / flume / events /% y-% m-% d /% H-% M a1.sinks.k1.hdfs.filePrefix = flume- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 5 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.rollCount = 100000 a1.sinks.k1.hdfs.rollSize = 0 # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000
As you can see from the above example, all entries in the file have a hierarchical structure; the order of the lines is not important. Each parameter is preceded by the name of the agent to which it refers. Next, indicate the type of object (source, channel or sink) and its name, and after that - types and subtypes of parameters and the value itself.
For all agents, a single configuration file is created by default. Thanks to a common configuration file, several agents can have the same name and, accordingly, the same set of settings. This is convenient for ensuring fault tolerance of agents or for balancing the load between them. To change the role of an agent, just change its name without rewriting the configuration file.
You can read more about configuring Flume agents in the documentation .
Consider the structure of the configuration file in more detail. First, we assign names to the main objects and “bind” them to a specific agent. In our case, we indicate for the agent “a1” the source “r1”, the channel “c1” and the sink “k1”:
a1.sources = r1 a1.channels = c1 a1.sinks = k1
When specifying several objects, their names are listed with a space (for example, “a1.sources = r1 r2 r3”)
Let's start by setting up the channel. As a channel, we use the so-called memory channel, which simply stores the event queue in memory. In case of unforeseen bursts of activity, the maximum queue size is set to 1000 messages, although the number of messages in the queue usually does not exceed 10.
# channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000
Syslog UDP Source Configuration
As a source, we will use UDP Syslog, which is part of the standard Flume package:
a1.sources.r1.type = syslogudp a1.sources.r1.port = 5140 a1.sources.r1.host = cdh2.example.com a1.sources.r1.channels = c1
The type, port, and host parameters speak for themselves. The channels parameter indicates the channels to which the source will be connected. If you specify multiple channels, their names are listed with a space. The channel name is indicated without an agent name prefix: it is understood that only objects belonging to it can be used for each agent.
Next, an object is indicated, which should be discussed in more detail - an interceptor. Interceptors are not separate entities, but are part of the sources.
# insert timestamp a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp
The components of events in Flume are the body (the data itself) and additional headers, the list of which may vary depending on the type of source. The interceptor preprocesses data on the fly before transmitting to the channel.
Interceptors can chain together; they are executed in accordance with the order specified in the .interceptors directive (for example, “a1.sources.r1.interceptors = i1 i2 i3”). In our case, only one interceptor “i1” is used for the source “r1”.
The syslog source writes only the message itself to the event body. All other headers (corresponding to the Syslog RFC) are written to the corresponding headers of the Flume event.
The syslog source writes the timestamp in the timestamp header not in unixtime format, but in a human-readable form (for example, “2013: 09: 17-09: 03: 00”). To fix this, we use the timestamp interceptor, overwriting the timestamp header in unixtime format. We will need this header in the flow settings, which will be discussed later.
Setting up stock in HDFS
Finally, let's move on to configuring the flow, which will save our data in HDFS:
a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = / log / flume / events /% y-% m-% d /% H-% M a1.sinks.k1.hdfs.filePrefix = flume- a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 5 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollCount = 100000 a1.sinks.k1.hdfs.rollSize = 0
With the type and channel parameters, everything is clear, but the rest should be discussed in more detail.
The path parameter specifies the path to the files in HDFS, in which data from events will be saved. When saving logs, it is very desirable to distribute files into subfolders according to timestamps - this simplifies the control and subsequent data processing. It is enough to specify a directory with a date and a mask to process the logs for a certain period. To form the path to the drain file, you need a timestamp for the event, which we redefined earlier using the interceptor.
The filePrefix parameter sets the prefix for files, which is convenient to use when collecting logs from different agents into the same folders.
The fileType parameter specifies the format of the files in which events will be saved. DataStream is a standard format in which each event is saved as a string in a plain text file.
The parameters round, roundValue, and roundUnit indicate that the timestamp value will be rounded to a multiple of 5 minutes. This will save the files in subfolders in increments of 5 minutes. Without these parameters, subfolders would be created in 1 minute increments, which is not very convenient.
When working with large message flows, it seems advisable to additionally split files inside subfolders rather than writing one huge file. This can be done using the roll * parameters:
rollCount = 100000 indicates the number of messages in one file, after which the current file is closed and a new one is created.
rollSize = 0 indicates that we do not limit the size of each file.
Client setup
So, our agent is configured and ready to receive and convert data, and then save it in HDFS. It remains only to send the data itself via UDP in Syslog format.
Consider the data collection procedure for our Cloud Storage service as an example. As a load balancer, we use haproxy, which passes the logs of HTTP requests to the Flume agent. These logs contain the client address, request URL, the amount of data transferred and other service data.
Here is an example of the part of the haproxy config that is responsible for logging:
global log [FQDN_Flume_agent]: 5140 local0 info maxconn 60,000 nbproc 16 user haproxy group haproxy daemon tune.maxaccept -1 tune.bufsize 65536 defaults log global mode http # for hadoop option httplog #option logasap log-format \ t% T \ t% ci \ t% cp \ t% ft \ t% b \ t% s \ t% ST \ t% B \ t% sq \ t% bq \ t% r option dontlognull #option dontlog-normal
The log option indicates the server address and the port on which the Flume agent is running, as well as the facility local0 parameters for syslog and the notify log level.
The mode http and option httplog directives indicate that we will save access logs via HTTP protocol. You can read more about haproxy log formats in the documentation .
To save as much information as possible, disable the logasap and dontlog-normal options. If the logasap option is disabled, haproxy will log an HTTP request upon its completion with an indication of the amount of received and transmitted data. In order for haproxy to log all requests, including successful ones, you need to disable the dontlog-normal option.
In order to present the data in a format close to machine-readable and to simplify further data processing, we changed the formatting of the logs (directive log-format). By default, a space character is used as a separator in the logs, but it can also be contained in the data itself, which complicates their further processing. Therefore, we replaced it with a tab character. In addition, we have disabled quotes for the URL and request method.
For reference: access logs from the Storage per day are 20-30GB. At the moment, we have already collected more than 200 TB of data for further research. The Flume agent practically does not load the server it is running on (Load Average ~ 0.01), and the HDFS service easily decomposes and backs up the received data to three independent cluster nodes. And all this is on far from the most productive servers with conventional spindle drives.
Conclusion
In this article, we examined the procedure for collecting and storing data in a Hadoop cluster using the Flume service using the example of our Cloud Storage logs. Naturally, this example of Flume is far from exhausted.
In the next article, we will move on to the most interesting - data processing in Hadoop using the Pig tool. Stay tuned!
For those who cannot comment on posts on Habré, we invite to our blog .