MapReduce for processing weakly structured data in HDInsight
In this example, we will examine the creation and execution of a typical MapReduce job in Microsoft's cloud-based Hadoop implementation, called HDInsight.
In the previous example, we created a 3-node Hadoop cluster and loaded an abstract log of a slightly structured format, which is now to be processed. The magazine is generally a large (in our specific example, small, but this does not affect the fundamental demonstration of the idea) text file containing lines with the signs TRACE, DEBUG, INFO, WARN, ERROR, FATAL. Our elementary task will be to count the number of lines with each attribute, i.e. how many times the WARN situation occurred, how many ERROR, etc.
In terms of SQL, you need to do COUNT () ... GROUP BY by the attribute field. It is clear that there is no field as such, since the file is not a label, but a set of lines with a text description of the problem, in which there is a substring with the name of the symptom. It is necessary to go over all the lines, select a substring of the attribute and add up. Simply put, from
is required to get something like
The idea behind the MapReduce model is very simple. In the presence of a distributed system, which is a Hadoop cluster, the general task is divided (Map) into parallel sub-tasks. As noted in the previous example, the user to be processed when saving to the Hadoop file system is transparently divided into fragments by nodes for the user. Theoretically, these nodes can be distributed geographically, i.e. be in various geographical locations. To minimize the costs associated with transferring data between data centers (or simply between separate nodes), Hadoop takes into account the territorial proximity of the data - each sub-task works with its own piece of data. In our case, there are only 3 nodes in the cluster, not luxury. Assignments will be executed on the same nodes where the data fragments lie. The results of the subtasks are then aggregated by the Reduce functions into a single result returned to the user. In other words, each node will produce its own private sub-result, for example, the first -
second -
of which the overall result will be compiled Script 2. This is a general idea of parallel processing, which was implemented, among other things, in traditional relational database servers (e.g. Oracle RAC, Microsoft SQL Server Parallel Datawarehouse, etc.) and cloud Relational data processing services (e.g., a federated database in Windows Azure SQL Database, formerly known as sharding in SQL Azure). But in this case, we are not dealing with a relational, but a poorly structured input data format, therefore, instead of SQL scripts, we will have to write the functions that perform the role of Map / Reduce ourselves. MapReduce idea implementedin various languages. For example, the free open source Apache Hadoop project uses Java for these purposes. Since Microsoft HDInsight is compatible with Apache Hadoop, we will also use the Java language and the org.apache.hadoop.mapreduce package .
First, the Map class, derived from Mapper, is implemented. Mapper classconverts the initial set of key / value pairs to an intermediate one. In our case, the input values are the lines of the text log file — the value parameter of the Text type of the map method. Inside the method, in each value, we look for square brackets, pull out what is in between, compare it with a constant set of signs, which we put in the pattern variable at the beginning and, if it matches (if (matcher.matches ())), we form the output key-pair value. The key is the substring of the TRACE / DEBUG / ... flag (logLevel text variable), and the value is 1. The value is contained in the accumulator variable of the IntWritable type, which we initialized in the constructor with one. IntWriteable is a wrapper around a Java int type that implements the Writable interface. Hadoop uses its own serialization format. We will add these ones in the Reduce function to calculate the number of occurrences of each attribute. Intermediate (output) values are grouped by the Hadoop environment for each output key. At the mapping stage, you can pre-aggregate with setCombinerClass to reduce data. transmitted to Reducer. In this example, this feature is not used. The Reporter class (the last parameter of the map method) is intended to display the status and progress of execution, update counters, etc. In our simple example, it is also not used. In this example, this feature is not used. The Reporter class (the last parameter of the map method) is intended to display the status and progress of execution, update counters, etc. In our simple example, it is also not used. In this example, this feature is not used. The Reporter class (the last parameter of the map method) is intended to display the status and progress of execution, update counters, etc. In our simple example, it is also not used.
The Reduce class, derived from Reducer, solves the inverse problem. It collects the intermediate results of mapping and aggregates them, performing in this case the notorious COUNT () values, because GROUP BY by keys (including sorting) was performed during mapping. The input types (Text, IntWritable) for Reduce must match the output types from Map. During the merge of the results at the Reduce stage, the Hadoop environment performs a secondary sort, since the results obtained from different mappers may have the same keys. Thus, the input result for the Reduce method is a set of key strings - a collection of values corresponding to it. For example, one of the lines will be TRACE (key) and a collection of as many units as the number of occurrences of this attribute was determined by one or another instance of the mapper. It remains for us to go over the collection and sum the ones in the variable count. In the OutputCollector we write the traditional key-value pair, only the value here will be the result of aggregation by key.
The main () method is used to create a Hadoop job based on the created Map and Reduce classes and execute it. The JobConf object generates a job specification. The code is written to a JAR file that Hadoop will distribute across the cluster. Instead of explicitly specifying the file name, you can pass the enclosing class containing the executable code (MapReduceTest) to the JobConf constructor, by which Hadoop will find the corresponding JAR file. The setOutputKeyClass () and setOutputValueClass () methods set the output types for the Map and Reduce functions. As a rule, they coincide, i.e. Map produces the same as Reduce. If they differ, the output types of the Map function can be specified using the setMapOutputKeyClass () and setMapOutputValueClass () methods. Which class Map will do, and which Reduce, as you might guess, is set using the setMapperClass () and setReducerClass () methods. It remains to prescribe the input / output format. This is done by the setInputFormat () and setOutputFormat () methods. In this case, this could not be done, because text format is accepted by default. In conclusion, you need to register the paths to the files with the source data and results using the static methods FileInputFormat.setInputPaths () and FileOutputFormat.setOutputPath (). We will pass file names through command line arguments. As the name of the method shows, there can be several input files. Maybe a directory, then all the files contained in it will be taken. You can specify a file name template. A directory is assigned as the location where the result files will be added. It should not exist, otherwise an error will occur during execution. A kind of protection measure so that one task does not fray the result of another.
Putting it together, we get the following code:
Script 5
Let's go to the Hadoup cluster through Remote Desktop, as shown in the previous article, and save this code in the MapReduceTest.java file, say, in the same d: \ Temp. HDInsight's Java support libraries are located in C: \ apps \ java \ bin. Hadoop does not know about this. It makes sense to go to the Hadoop command prompt window (
Go to the d: \ Temp directory and compile the Java file into bytecode class files. The -encoding switch was required since I saved MapReduceTest.java in Unicode encoding.
In d: \ Temp, the MapReduceTest.class file and the MapReduceTest $ Map.class and MapReduceTest $ Reduce.class files corresponding to the classes are formed. Build the assembly:

Fig. 1
On the current path d: \ Temp, the Java archive MapReduceTest.jar was formed.

Fig. 2
Here Sample1 / input / Sample.log is the log file to be processed downloaded from the local d: \ Temp directory to the HDFS / Sample1 / Input directory - see Fig. 5 of the previous article. Last time, I forgot to focus on the fact that before downloading it is necessary to explicitly create the HDFS input directory (hadoop fs -mkdir Sample1 / input /) and only after that put the file (hadoop fs -put d: \ Temp \ Sample.log Sample1 / input /). If you try to download a file without first creating a directory, it is created, but the file is not loaded into it, as you can see hadoop fs -ls Sample1 / input /.
Meanwhile, the task successfully completed. In the output directory HDFS Sample1 / output, a file was generated with the results containing the number of occurrences of each attribute in the log, as ordered:

Fig. 3
In the previous example, we created a 3-node Hadoop cluster and loaded an abstract log of a slightly structured format, which is now to be processed. The magazine is generally a large (in our specific example, small, but this does not affect the fundamental demonstration of the idea) text file containing lines with the signs TRACE, DEBUG, INFO, WARN, ERROR, FATAL. Our elementary task will be to count the number of lines with each attribute, i.e. how many times the WARN situation occurred, how many ERROR, etc.
In terms of SQL, you need to do COUNT () ... GROUP BY by the attribute field. It is clear that there is no field as such, since the file is not a label, but a set of lines with a text description of the problem, in which there is a substring with the name of the symptom. It is necessary to go over all the lines, select a substring of the attribute and add up. Simply put, from
2012-02-03 18:35:34 SampleClass6 [INFO] everything normal for id 577725851
2012-02-03 18:35:34 SampleClass4 [FATAL] system problem at id 1991281254
2012-02-03 18:35:34 SampleClass3 [DEBUG] detail for id 1304807656
2012-02-03 18:35:34 SampleClass3 [WARN] missing id 423340895
2012-02-03 18:35:34 SampleClass5 [TRACE] verbose detail for id 2082654978
2012-02-03 18:35:34 SampleClass0 [ERROR] incorrect id 1886438513
...
Script 1 is required to get something like
[TRACE] 10
[DEBUG] 20
[INFO] 30
[WARN] 555
[ERROR] 777
[FATAL] 1
Script 2The idea behind the MapReduce model is very simple. In the presence of a distributed system, which is a Hadoop cluster, the general task is divided (Map) into parallel sub-tasks. As noted in the previous example, the user to be processed when saving to the Hadoop file system is transparently divided into fragments by nodes for the user. Theoretically, these nodes can be distributed geographically, i.e. be in various geographical locations. To minimize the costs associated with transferring data between data centers (or simply between separate nodes), Hadoop takes into account the territorial proximity of the data - each sub-task works with its own piece of data. In our case, there are only 3 nodes in the cluster, not luxury. Assignments will be executed on the same nodes where the data fragments lie. The results of the subtasks are then aggregated by the Reduce functions into a single result returned to the user. In other words, each node will produce its own private sub-result, for example, the first -
[TRACE] 1
[DEBUG] 2
[INFO] 3
...
Script 3 second -
[TRACE] 9
[DEBUG] 5
[INFO] 7
...
Script 4 of which the overall result will be compiled Script 2. This is a general idea of parallel processing, which was implemented, among other things, in traditional relational database servers (e.g. Oracle RAC, Microsoft SQL Server Parallel Datawarehouse, etc.) and cloud Relational data processing services (e.g., a federated database in Windows Azure SQL Database, formerly known as sharding in SQL Azure). But in this case, we are not dealing with a relational, but a poorly structured input data format, therefore, instead of SQL scripts, we will have to write the functions that perform the role of Map / Reduce ourselves. MapReduce idea implementedin various languages. For example, the free open source Apache Hadoop project uses Java for these purposes. Since Microsoft HDInsight is compatible with Apache Hadoop, we will also use the Java language and the org.apache.hadoop.mapreduce package .
First, the Map class, derived from Mapper, is implemented. Mapper classconverts the initial set of key / value pairs to an intermediate one. In our case, the input values are the lines of the text log file — the value parameter of the Text type of the map method. Inside the method, in each value, we look for square brackets, pull out what is in between, compare it with a constant set of signs, which we put in the pattern variable at the beginning and, if it matches (if (matcher.matches ())), we form the output key-pair value. The key is the substring of the TRACE / DEBUG / ... flag (logLevel text variable), and the value is 1. The value is contained in the accumulator variable of the IntWritable type, which we initialized in the constructor with one. IntWriteable is a wrapper around a Java int type that implements the Writable interface. Hadoop uses its own serialization format. We will add these ones in the Reduce function to calculate the number of occurrences of each attribute. Intermediate (output) values are grouped by the Hadoop environment for each output key. At the mapping stage, you can pre-aggregate with setCombinerClass to reduce data. transmitted to Reducer. In this example, this feature is not used. The Reporter class (the last parameter of the map method) is intended to display the status and progress of execution, update counters, etc. In our simple example, it is also not used. In this example, this feature is not used. The Reporter class (the last parameter of the map method) is intended to display the status and progress of execution, update counters, etc. In our simple example, it is also not used. In this example, this feature is not used. The Reporter class (the last parameter of the map method) is intended to display the status and progress of execution, update counters, etc. In our simple example, it is also not used.
The Reduce class, derived from Reducer, solves the inverse problem. It collects the intermediate results of mapping and aggregates them, performing in this case the notorious COUNT () values, because GROUP BY by keys (including sorting) was performed during mapping. The input types (Text, IntWritable) for Reduce must match the output types from Map. During the merge of the results at the Reduce stage, the Hadoop environment performs a secondary sort, since the results obtained from different mappers may have the same keys. Thus, the input result for the Reduce method is a set of key strings - a collection of values corresponding to it. For example, one of the lines will be TRACE (key) and a collection of as many units as the number of occurrences of this attribute was determined by one or another instance of the mapper. It remains for us to go over the collection and sum the ones in the variable count. In the OutputCollector we write the traditional key-value pair, only the value here will be the result of aggregation by key.
The main () method is used to create a Hadoop job based on the created Map and Reduce classes and execute it. The JobConf object generates a job specification. The code is written to a JAR file that Hadoop will distribute across the cluster. Instead of explicitly specifying the file name, you can pass the enclosing class containing the executable code (MapReduceTest) to the JobConf constructor, by which Hadoop will find the corresponding JAR file. The setOutputKeyClass () and setOutputValueClass () methods set the output types for the Map and Reduce functions. As a rule, they coincide, i.e. Map produces the same as Reduce. If they differ, the output types of the Map function can be specified using the setMapOutputKeyClass () and setMapOutputValueClass () methods. Which class Map will do, and which Reduce, as you might guess, is set using the setMapperClass () and setReducerClass () methods. It remains to prescribe the input / output format. This is done by the setInputFormat () and setOutputFormat () methods. In this case, this could not be done, because text format is accepted by default. In conclusion, you need to register the paths to the files with the source data and results using the static methods FileInputFormat.setInputPaths () and FileOutputFormat.setOutputPath (). We will pass file names through command line arguments. As the name of the method shows, there can be several input files. Maybe a directory, then all the files contained in it will be taken. You can specify a file name template. A directory is assigned as the location where the result files will be added. It should not exist, otherwise an error will occur during execution. A kind of protection measure so that one task does not fray the result of another.
Putting it together, we get the following code:
//Стандартный явовский импорт
import java.io.IOException;
import java.util.Iterator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
//Импорт, относящийся к Hadoop
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class MapReduceTest
{
/*
* Маппирование
*/
public static class Map extends MapReduceBase implements Mapper
{
private static final Pattern pattern = Pattern.compile("(TRACE)|(DEBUG)|(INFO)|(WARN)|(ERROR)|(FATAL)"); //список паттернов признаков
private static final IntWritable accumulator = new IntWritable(1); //константная единичка в кач-ве значения, если признак найден
private Text logLevel = new Text();
public void map(LongWritable key, Text value, OutputCollector collector, Reporter reporter)
throws IOException
{ // поиск по разделителям '[' и ']'
final String[] tokens = value.toString().split("[ \\[\\]]");
if(tokens != null)
{
//вычленяем признак logLevel
for(final String token : tokens)
{
final Matcher matcher = pattern.matcher(token);
if(matcher.matches()) //если найден
{
logLevel.set(token);
collector.collect(logLevel, accumulator); //формируем пары ключ-значение
}
}
}
}
}
/*
* Редуцирование
*/
public static class Reduce extends MapReduceBase implements Reducer
{
public void reduce(Text key, Iterator values, OutputCollector collector, Reporter reporter) throws IOException
{
int count = 0;
//агрегируем в count число вхождений признака
while(values.hasNext())
{ count += values.next().get(); }
System.out.println(key + "\t" + count);
collector.collect(key, new IntWritable(count));
}
}
/*
* Создаем задание
*/
public static void main(String[] args) throws Exception
{
//конфигурация джобы с назаначением объемлющего класса и классов, выполняющих Map/Reduce
final JobConf conf = new JobConf(MapReduceTest.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
//входные-выходные пути берутся из аргументов командной строки
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
//выполняем задание
JobClient.runJob(conf);
}
}
Script 5
Let's go to the Hadoup cluster through Remote Desktop, as shown in the previous article, and save this code in the MapReduceTest.java file, say, in the same d: \ Temp. HDInsight's Java support libraries are located in C: \ apps \ java \ bin. Hadoop does not know about this. It makes sense to go to the Hadoop command prompt window (
D:\Windows\system32\cmd.exe /k pushd "c:\apps\dist\hadoop-1.1.0-SNAPSHOT" && "c:\apps\dist\hadoop-1.1.0-SNAPSHOT\bin\hadoop.cmd", for convenience, there is a shortcut on the HDInsight desktop) and write this path to the% path% environment variable:set PATH=%PATH%;C:\apps\java\bin
Script 6 Go to the d: \ Temp directory and compile the Java file into bytecode class files. The -encoding switch was required since I saved MapReduceTest.java in Unicode encoding.
javac -encoding UNICODE -classpath C:\apps\dist\hadoop-1.1.0-SNAPSHOT\hadoop-core-*.jar d:\Temp\MapReduceTest.java
Script 7 In d: \ Temp, the MapReduceTest.class file and the MapReduceTest $ Map.class and MapReduceTest $ Reduce.class files corresponding to the classes are formed. Build the assembly:
jar -cvf MapReduceTest.jar *.class
Script 8 
Fig. 1
On the current path d: \ Temp, the Java archive MapReduceTest.jar was formed.
hadoop jar MapReduceTest.jar MapReduceTest Sample1/input/Sample.log Sample1/output
Script 9 
Fig. 2
Here Sample1 / input / Sample.log is the log file to be processed downloaded from the local d: \ Temp directory to the HDFS / Sample1 / Input directory - see Fig. 5 of the previous article. Last time, I forgot to focus on the fact that before downloading it is necessary to explicitly create the HDFS input directory (hadoop fs -mkdir Sample1 / input /) and only after that put the file (hadoop fs -put d: \ Temp \ Sample.log Sample1 / input /). If you try to download a file without first creating a directory, it is created, but the file is not loaded into it, as you can see hadoop fs -ls Sample1 / input /.
Meanwhile, the task successfully completed. In the output directory HDFS Sample1 / output, a file was generated with the results containing the number of occurrences of each attribute in the log, as ordered:
hadoop fs -cat Sample1/output/part-00000
Script 10 
Fig. 3