
Hadoop Part 3: Pig, Data Processing
- Tutorial

In a previous publication, we examined in detail the process of collecting data using the specialized Flume tool. But in order to fully work with information, it is not enough just to collect and save it: it needs to be processed and extracted from it something necessary and useful.
Hadoop uses MapReduce technology to process data.
MapReduce Technology
History
Data processing in Hadoop is carried out using MapReduce technology. This technology was originally developed by Google in 2004.
In 2004, Google developers Jeffrey Dean and Sanjay Gemavat published an article in which they proposed the following solution for processing large volumes of "raw" data (indexed documents, query logs, etc.): a huge amount of information is divided into parts, and processing of each These parts are assigned to a separate server. As a rule, data is processed on the same servers where it is stored, which allows you to speed up the processing process and avoid unnecessary data transfers between servers. After that, the results are combined into a single whole.
Google specialists in the article mentioned above limited themselves to a description of the basic algorithms, not dwelling on the implementation details. However, this information was enough for Hadoop developers to create their own MapReduce framework.
Today it is used in many well-known web projects - Yahoo !, Facebook, Last.Fm and others.
Consider the architecture and principles of Hadoop MapRedus in more detail.
Architecture and principles of work
MapReduce architecture is built on the principle of "master - subordinates" (master - workers). The JobTracker server acts as the main server, distributing tasks to the subordinate nodes of the cluster and controlling their execution.

Data processing is divided into the following stages:
- Application launch: transferring the application code to the main (master) and subordinate nodes (workers);
- The wizard assigns specific tasks (Map or Reduce) and distributes parts of the input data to the computing nodes (workers);
- Map nodes read the input data assigned to them and start processing them;
- Map nodes locally store intermediate results: each node saves the result to local disks;
- Reduce nodes read intermediate data from Map nodes and perform Reduce data processing;
- Reduce nodes store the final results in output files, usually in HDFS.
Creating applications for MapReduce is a rather time-consuming task. Writing all the functions, compiling and packaging takes a lot of time. To make things easier, Yahoo! developed a specialized tool called Pig, which increases the level of abstraction in data processing.
Pig
Pig consists of two parts:
- language for describing Pig Latin streams;
- runtime environment for running Pig Latin scripts (two options are available: running on a local JVM or running in a Hadoop cluster).
The Pig script includes a series of operations (transformations) that must be applied to the input in order to get the output. These operations describe the data stream, which is then converted (compiled) by the Pig runtime into an executable representation and launched for execution. In an internal implementation, Pig transforms the transformations into a MapReduce job series.
Initially, Pig was created to work from the console (Grunt Shell). In the implementation from Cloudera, working with Pig is done through a simple and convenient web interface. You can open it through the familiar Hue interface http: // [node_with_hue_installed_name]: 8888 / pig /

The web interface includes a full-fledged editor (there is even auto-substitution of operators) and a script manager. With it, you can save scripts directly to Hue, run them, view a list of running tasks, results and launch logs.
Test challenge
As a test task, we will process the access logs of our storage for a certain day (day). We calculate the following parameters:
- total number of requests;
- the number of requests from each unique IP;
- the number of requests for each unique URL;
- The amount of data transferred to each URL.
The following is a script that solves the problem. It should be noted right away that this script (like all scripts in Pig) is not executed line by line, as in interpreted languages. The Pig compiler parses dependencies and defines data streams. Compiling the script starts from the end, that is, with the STORE command. For data, after processing of which there is no save command, no tasks will be created and the data itself will not even be read. This allows you to write a script in a fairly arbitrary form, all the work on optimization, determining the execution order and parallelization will be undertaken by Pig.
A complete listing of the script will look like this:
records = LOAD '/ log / flume / events / 14-02-20 /' USING PigStorage ('\ t') AS ( date: chararray, clientip: chararray, clientport: chararray, proto: chararray, statuscode: int, bytes: int, sq: chararray, bq: chararray, request: chararray); count_total = FOREACH (GROUP records ALL) GENERATE COUNT (records); count_ip = FOREACH (GROUP records BY clientip) GENERATE group AS ip, COUNT (records) AS cnt; top_ip = ORDER count_ip BY cnt DESC; filtered_req = FILTER records BY statuscode == 200 OR statuscode == 206; count_req = FOREACH (GROUP filtered_req BY request) GENERATE group AS req, COUNT (filtered_req) AS cnt, SUM (filtered_req.bytes) AS bytes; top_req = ORDER count_req BY bytes DESC; % declare DT `date +% y% m% dT% H% M` STORE count_total INTO '$ DT / count_total'; STORE top_ip INTO '$ DT / top_ip'; STORE top_req INTO '$ DT / top_req';
It consists of three parts: data loading, processing and storage. This order is common to most tasks. In some cases, solving problems may include additional steps - for example, generating data (for example, structured artificial data to verify the algorithm) or saving intermediate results of calculations. A detailed description of the syntax, data types, and operators can be found in the official documentation.
Let's consider each stage in more detail.
Loading
records = LOAD '/ log / flume / events / 14-02-20 /' USING PigStorage ('\ t') AS ( date: chararray, clientip: chararray, clientport: chararray, proto: chararray, statuscode: int, bytes: int, sq: chararray, bq: chararray, request: chararray);
As input, we use web server logs. For a better understanding of further processing, we give an example of input data:
07 / Dec / 2013: 20: 05: 13 95.153.193.56 37877 http 200 1492030 0 0 GET /745dbda3-894e-43aa-9146-607f19fe4428.mp3 HTTP / 1.1 08 / Dec / 2013: 15: 00: 28 178.88.91.180 13600 http 200 4798 0 0 GET /public/cars/bmw7l/down.png HTTP / 1.1 08 / Dec / 2013: 15: 00: 29 193.110.115.45 64318 http 200 1594 0 0 GET /K1/img/top-nav-bg-default.jpg HTTP / 1.1
First, consider a data model and terminology. The main object in Pig Latin is “ attitude ”. It is relations that all language operators work with. In the form of relations, input and output data are presented.
Each relation is a set of objects of the same type - “ tuples ” (tuples). Analogs in the database: a tuple is a row, a relation is a table.
Tuples, respectively, consist of numbered or named objects - “ fields ”, arbitrary basic types (number, string, Boolean variable, etc.).
So, in Pig Latin, the result of any operator is a relation, which is a collection of tuples.
The LOAD statement creates a records relation from files in HDFS from the directory '/ log / flume / events / 14-02-20 /' using the standard PigStorage interface (we also indicate that the delimiter in the files is the tab character '\ t'). Each line of files will appear as a tuple in the relation. The AS section assigns types and names to the fields in the tuple, according to which it will be more convenient for us to access them.
Treatment
We calculate the total number of log entries using the COUNT operator. Before that, it is necessary to combine all the rows in records into one group with the FOREACH and GROUP statements.
count_total = FOREACH (GROUP records ALL) GENERATE COUNT (records); count_ip = FOREACH (GROUP records BY clientip) GENERATE group AS ip, COUNT (records) AS cnt; top_ip = ORDER count_ip BY cnt DESC;
Translated from Pig Latin into natural language, the above script looks like this: for each record (FOREACH), from the records grouped together (GROUP ALL), count the records in records (GENERATE COUNT).
Now let's calculate the number of requests from unique addresses. In our tuples regarding records, the clientip field contains the IP addresses from which the requests were made. Group the tuples in records by the clientip field and define a new relation consisting of two fields:
- field ip, the value of which is taken from the name of the group in relation to records;
- the number of entries in the group is cnt calculated by the COUNT statement, that is, the number of entries corresponding to a specific IP address in the IP field.
Next, we define another top_ip relation, consisting of the same data as count_ip, but sorted by the cnt field with the ORDER operator. Thus, in top_ip we will have a list of IP addresses of clients from which requests were most often made. In the future, we can bind this data to GEO-IP and see in which cities and countries our storage is most popular =)
filtered_req = FILTER records BY statuscode == 200 OR statuscode == 206; count_req = FOREACH (GROUP filtered_req BY request) GENERATE group AS req, COUNT (filtered_req) AS cnt, SUM (filtered_req.bytes) AS bytes; top_req = ORDER count_req BY bytes DESC;
After that, we calculate the number of successful requests for each URL, as well as the total amount of data downloaded for each URL. To do this, first use the FILTER filtering operator, selecting only successful requests with HTTP codes 200 OK and 206 Partial Content. This statement defines the new filtered_req relation from the records relation by filtering it by the statuscode field.
Next, similarly to counting IP addresses, we calculate the number of unique URLs, grouping the entries for requests by the request field. We are also interested in the transferred amount of data for each URL: it can be calculated using the SUM operator, which adds the bytes fields in the grouped records of the filtered_req relation.
Now sort by bytes, defining a new top_req relation.
Saving Results
% declare DT `date +% y% m% dT% H% M` STORE count_total INTO '$ DT / count_total'; STORE top_ip INTO '$ DT / top_ip'; STORE top_req INTO '$ DT / top_req';
It is preferable to save the results of each script execution in a separate directory, the name of which includes the date and time of execution. To do this, you can use the function to call an arbitrary shell command directly from the Pig script (you need to write it in backquotes). In the example, the result of the date command is stored in the DT variable, which is then substituted in the data storage path. Save the results with the STORE command: each relation is in its own directory.
You can view the output through the file manager in Hue; By default, the path in HDFS is relative to the home directory of the user who launched the script.

Information on the results of the tasks will be displayed in the Pig logs as follows:
http: // cdh3: 8888 / pig / # logs / 1100715 Input (s): Successfully read 184442722 records (32427523128 bytes) from: "/ log / flume / events / 14-02-20" Output (s): Successfully stored 1 records (10 bytes) in: "hdfs: // cdh3: 8020 / user / admin / 140225T1205 / count_total" Successfully stored 8168550 records (1406880284 bytes) in: "hdfs: // cdh3: 8020 / user / admin / 140225T1205 / top_req" Successfully stored 2944212 records (49039769 bytes) in: "hdfs: // cdh3: 8020 / user / admin / 140225T1205 / top_ip" Counters: Total records written: 11112763 Total bytes written: 1455920063
Report from Oozie:
Last Modified Tue, 25 Feb 2014 00:22:00 Start Time Tue, 25 Feb 2014 00:05:16 Created Time Tue, 25 Feb 2014 00:05:15 End Time Tue, 25 Feb 2014 00:22:00
From the given logs it is seen that when performing the test task, more than 180 million records were processed with a total volume of more than 32 GB. The entire processing procedure took about 15 minutes.
During the active phase of Map, 22 processor cores and 91GB of RAM were involved. For a small cluster consisting of three servers five years ago, this result can be considered quite good.
It was said above that Pig during the execution of the script creates MapReduce tasks and sends them for execution to the MR cluster. This process is clearly shown in the statistics graphs in the Cloudera Manager control panel:


- Map stage: processors and disks on each node are busy processing their data parts.
- Reduce Stage: The results obtained in the first stage are transmitted over the network and combined.
- At the third stage, the results are saved in the file system (the graph shows a jump in recording in HDFS).
On the graphs you can see that the solution to the problem included two passes MapReduce. During the first pass, unique records were counted, and during the second pass, sorting was performed. These procedures cannot be parallelized and performed in one pass, since the second procedure works with the results of the first.
Conclusion
In this article, we talked about the architecture of MapReduce, and also examined the features of its work using the Pig tool. Writing programs for MapReduce is a very difficult task that requires a special approach. All the difficulties, however, are compensated by the power, scalability and high speed of processing huge volumes of arbitrary data.
In the near future we plan to continue a series of articles about Hadoop. The next publication will be devoted to working with the Impala database.
Readers who are not able to comment on posts on Habré are welcome to our blog .