Parsing a large number of HTML pages in parallel using Apache Ignite (GridGain) in 200 lines of code

Periodically, I have tasks to process a large number of files. Usually this is converting from one format to another: XSLT transformation, parsing, image or video conversion. To solve these problems, I adapted the GridGain In-Memory Data Fabric framework . It makes it possible to do distributed computing, MapReduce, distributed caches and queues, a distributed file system in memory, moving code to data, job stealing, accelerators for Hadoop, and many other fashionable things today. And all this is easy and for different OSes. You can easily feel all this under Windows.

I’ll try to tell about my experience using a simple task as an example.

Recently there was a task to extract product descriptions from a large site (2.5 million products).
The site was downloaded using the wget utility:
   C:>start wget110 --recursive  --level 10 -nc --no-clobber    --html-extension   --exclude-directories=it,fr   --convert-links   http://site.com


You can run several instances of the utility, they will download in parallel. But still a long time, a week somewhere.

Initially, I tried to parse using the Xidel console program:
REM обход дерева каталогов и запуск утилиты для каждого файла
FOR /R ./  %%G IN (dbfamily*.html) DO xidel  "%%G"  --quiet    --extract-file=mytemplate.txt 

The mytemplate.txt file contained the template:
//td/h1  || " # " ||  //td[contains(text(),"ЗНАЧ")] || " # " ||  //td[contains(text(),"ПОЛ")] || " # " ||  //td[contains(text(),"ПРИСХОЖДЕНИЕ")] || " # " ||  //td[contains(text(),"ПЕРЕВОД")] 

But sequential parsing took a lot of time. Therefore, it was decided to do this in parallel on several computers. Surprisingly, in the end it resulted in just one class in java.

Each HTML page is parsed using XPath expressions. The document is pre-cleaned using html-cleaner

HTML parsing code for the HTML page:
public static ArrayList parseWithXPathList(String path, ArrayList XPathList) {
	count++;
	String str = "";
	ArrayList list = new ArrayList();
	try {
		String content = readFile(path, StandardCharsets.UTF_8);
		TagNode tagNode = new HtmlCleaner().clean(content);
		org.w3c.dom.Document doc = new DomSerializer(new CleanerProperties()).createDOM(tagNode);
		// And then use the standard JAXP interfaces to query it:
		XPath xpath = XPathFactory.newInstance().newXPath();
		Iterator it = XPathList.iterator();
		while (it.hasNext()) {
			String XPath = it.next();
			String res = (String) xpath.evaluate(XPath, doc, XPathConstants.STRING);
				list.add(res);
			}
			// System.out.println(str);
		} catch (Exception e) {
			str = "" + e;
			list.add(str);
		}
		return list;
	}


Called like this:
ArrayList Xpaths = new ArrayList (Arrays.asList ("// title", "// td / h1"));
ArrayList ResultList = parseWithXPathList (param.toString (), Xpaths);

Now it remains only to bypass the directory tree using the standard walker:
Files.walkFiletree (startingDir, opts, Integer.MAX_VALUE, parseFiles)
And apply a parser to each file.
if (file.toString().endsWith(".html")) {
	 ArrayList Xpaths= new
	 ArrayList(Arrays.asList("//title","//td/h1"));
	 ArrayList
	 ResultList=parseWithXPathList(file.toString(),Xpaths);
	 System.out.format(" %d ) %s %n", count,""+ResultList);
	  //castOneAsync(ignite, "" + file);
	}


Parsing a million files on one machine sequentially takes ~ 12 hours.
I tried to parallelize the code using the GridGain framework, or rather, its non-commercial version of Apache Ignite.
This thing works like this: you need to run the nodes (on one or several machines), the nodes will find each other on the network and write in their consoles how many processors and memory you have organized in a cluster (these are your slaves). I launched 12 nodes on 3 machines (each with 4 cores and 16 GB RAM).
After downloading the site (~ 500GB), the daddy with the html-files is shared for access over the grid. The shared daddy should be visible to all nodes (check permissions!)

Next, you should write a simple java app in which the master node should also start:
Ignite ignite = Ignition.start("D:\\grid\\1.4\\apache-ignite-fabric-1.4.0-bin\\config\\default-config.xml");


After that, you can ask her about the state of the cluster and cast jobs on arbitrary nodes.
The division into master and slave is conditional here. Nodes are equal. I call the master the node on which the initial code will be executed. Actually, we can do cluster segmentation by type of node, but we don’t need it now.

Code casting job with parsing on a node:

public static void castOneAsync(Ignite ignite, String param) {
	// Enable asynchronous mode.
	IgniteCluster cluster = ignite.cluster();
IgniteCompute asyncCompute =                                                       ignite.compute(cluster.forRemotes()).withAsync();
// Asynchronously execute a job.
	asyncCompute.call(() -> {
		System.out.println("processing: " + param);
ArrayList Xpaths = new  ArrayList(Arrays.asList("//title", "//td/h1"));
ArrayList ResultList = parseWithXPathList(param.toString(), Xpaths);
	System.out.format(" %d ) %s \n %n", count, "" + ResultList);
	return ""+param+" :" + ResultList;
	});
	// Get the future for the above invocation.
	IgniteFuture fut = asyncCompute.future();
	// Asynchronously listen for completion and print out the result.
	fut.listen(f -> {
		String resultStr = f.get()+" \n";
		// System.out.println("Job result: " + resultStr);
		count++;
			try {
           Files.write(Paths.get("d:\\grid\\result.txt"), resultStr.getBytes(), StandardOpenOption.APPEND);
			} catch (IOException e) {
				System.out.println("" + e);
			}
			if (count%100==0) System.out.println( "processed: "+count   );
		});
	}

Important points:
  • Noda is a folder with java program and configuration in the form of default-config.xml file. It starts from ignite.bat.
    Copy the folder with the node to arbitrary machines on the local network. You can run multiple instance instances on the same machine.
    Download the node (They call it fabric)
  • All nodes must have the same configuration file.
  • Ignite supports the concept of peer class loading (Zero Deployment). This means that you do not need to rewrite your project to all nodes. He himself is drowning. This is a very cool feature. Saves a ton of time.
    But you need to include this feature in the configuration file. They write that with the feature turned off, it works faster. Did not check.
  • You can run several nodes on the same machine and emulate working with a cluster.
  • Yes, you can add and remove nodes to the cluster during the operation of the app.
  • In the configuration file, specify the IP addresses of the machines in the cluster
  • Need java 8 since lambda is used
  • If you stop the master node, then the tasks that she castanul will die on other machines.
  • You can ask the framework for data on any machine in the cluster: CPU load, free memory, number of jobs, but I trusted the wizard to decide which machine is better to throw job

As a result, I managed to fit the entire project into one java class ~ 200 lines of code with comments. The class needs jar files with htmlcleaner and apache ignite.

You can use the external Xidel utility instead of the html cleaner. It supports XQuery and XPath.
Then you need to register it on all machines with nodes in the PATH system variable and then call it directly from java. But you will enjoy XQuery.

If the publication is of interest, then I will write more about distributed cache, queues, and other distributed things on this framework.

The source code of the project for Eclipse

is
package gridE;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.Ignition;
import org.apache.ignite.lang.IgniteFuture;
import org.htmlcleaner.CleanerProperties;
import org.htmlcleaner.DomSerializer;
import org.htmlcleaner.HtmlCleaner;
import org.htmlcleaner.TagNode;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathFactory;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Scanner;
import static java.nio.file.FileVisitResult.CONTINUE;
/**
 * Created by Veaceslav Kunitki on 11/13/2015.
 * This class parse files on cluster with "Apache Ignite" framework
 */
import static java.nio.file.FileVisitResult.*;
public class ParseFilesOnCluster extends SimpleFileVisitor {
	Ignite ignite; 
	public static long count = 0; //  counter of parsed files
	// Java standart FileTree walker 
	@Override
	public FileVisitResult visitFile(Path file, BasicFileAttributes attr) {
		if (attr.isSymbolicLink()) {
			System.out.format("Symbolic link: %s ", file);
		} else if (attr.isRegularFile()) {
			// System.out.format("Regular file: %s ", file);
			if (file.toString().endsWith(".html") ) {
			//if (file.toString().endsWith(".html") ) { // uncomment it for serial  processing
				//ArrayList Xpaths = new ArrayList(Arrays.asList("//title", "//td/h1"));
				// ArrayList
				// ResultList=parseWithXPathList(file.toString(),Xpaths);
				// System.out.format(" %d ) %s %n", count,""+ResultList);
				castOneAsync(ignite, "" + file); // parallel processing
			}
		} else {
			System.out.format("Other: %s ", file);
		}
		return CONTINUE;
	}
	// Print each directory visited.
	@Override
	public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
		System.out.format("Directory: %s%n", dir);
		return CONTINUE;
	}
	@Override
	public FileVisitResult visitFileFailed(Path file, IOException exc) {
		System.err.println(exc);
		return CONTINUE;
	}
	static String readFile(String path, Charset encoding) throws IOException {
		byte[] encoded = Files.readAllBytes(Paths.get(path));
		return new String(encoded, encoding);
	}
	public static ArrayList parseWithXPathList(String path, ArrayList XPathList) {
		count++;
		String str = "";
		ArrayList list = new ArrayList();
		try {
			String content = readFile(path, StandardCharsets.UTF_8);
			TagNode tagNode = new HtmlCleaner().clean(content);
			org.w3c.dom.Document doc = new DomSerializer(new CleanerProperties()).createDOM(tagNode);
			// And then use the standard JAXP interfaces to query it:
			XPath xpath = XPathFactory.newInstance().newXPath();
			// String str = (String) xpath.evaluate("//div//td[contains(@id,
			// 'foo')]/text()",
			Iterator it = XPathList.iterator();
			while (it.hasNext()) {
				String XPath = it.next();
				String res = (String) xpath.evaluate(XPath, doc, XPathConstants.STRING);
				list.add(res);
			}
			// System.out.println(str);
		} catch (Exception e) {
			str = "" + e;
			list.add(str);
		}
		return list;
	}
	/*
	 * Asynchronously execute a job on external PC
	 */
	public static void castOneAsync(Ignite ignite, String param) {
		// Enable asynchronous mode.
		IgniteCluster cluster = ignite.cluster();
		// IgniteCompute compute1 = ignite.compute(cluster.forRemotes());
		IgniteCompute asyncCompute = ignite.compute(cluster.forRemotes()).withAsync();
		// Asynchronously execute a job.
		asyncCompute.call(() -> {
			// Print hello world on some cluster node and wait for completion.
			System.out.println("processing: " + param);
			ArrayList Xpaths = new ArrayList(Arrays.asList("//title", "//li/@data-zoom"));
			ArrayList ResultList = parseWithXPathList(param.toString(), Xpaths);
			System.out.format(" %d ) %s \n %n", count, "" + ResultList);
			String text = new Scanner(new File(param.toString()), "UTF-8").useDelimiter("\\A").next();
			return "{ 'url':" + param + " ,'ResultList'=" + ResultList + " }";
		});
		// Get the future for the above invocation.
		IgniteFuture fut = asyncCompute.future();
		// Asynchronously listen for completion and print out the result.
		fut.listen(f -> {
			String resultStr = f.get() + " \n";
			// System.out.println("Job result: " + resultStr);
			count++;
			try {
				Files.write(Paths.get("d:\\grid\\result.txt"), resultStr.getBytes(), StandardOpenOption.APPEND ); //Warning! File must be exist, do it manual!
			} catch (IOException e) {
				System.out.println("" + e);
			}
			if (count % 100 == 0)
				System.out.println("processed: " + count);
		});
	}
	public static void main(String[] args) throws Exception {
		System.out.println("# Distributed parser!");
		Ignite ignite = Ignition.start("D:\\grid\\1.4\\apache-ignite-fabric-1.4.0-bin\\config\\default-config.xml");	
		IgniteCluster cluster = ignite.cluster();
		// Compute instance over remote nodes.
		IgniteCompute compute4remote = ignite.compute(cluster.forRemotes());
		// Print hello message on all remote nodes.
		compute4remote.broadcast(
				() -> System.out.println("---===Distributed parser started===---: " + cluster.localNode().id()));
		System.out.println( "Cluster ready!"   );
		if (true) { // start parsing job
			// final Path startingDir = Paths.get("d:/home/familytree.ru/");
			Path startingDir = Paths.get("\\\\SERGIU-PC\\temp"); // shared directory with HTML-files
			EnumSet opts = EnumSet.of(FileVisitOption.FOLLOW_LINKS);
			ParseFiles parseFiles = new ParseFiles();
			parseFiles.ignite = ignite;
			// log time to file
			PrintWriter writer = new PrintWriter("d:\\grid\\start.txt", "UTF-8");			
			String dateTime = "" + (new Date());
			writer.println(dateTime + "\n");
			System.out.println(dateTime + "\n");
			writer.close();
			System.out.println("# walking...!");
			Files.walkFileTree(startingDir, opts, Integer.MAX_VALUE, parseFiles);
			// log end time			
			dateTime = "" + (new Date());
			Files.write(Paths.get("d:\\grid\\start.txt"), dateTime.getBytes(), StandardOpenOption.APPEND);
		}
	}
}


POM-file with project dependencies

4.0.0gridEgridE0.0.1-SNAPSHOTGridGain External Repositoryhttp://www.gridgainsystems.com/nexus/content/repositories/externalorg.apache.igniteignite-core1.4.0org.apache.igniteignite-spring1.1.4org.apache.igniteignite-indexing1.4.0org.apache.igniteignite-examples1.0.0-RC1net.sourceforge.htmlcleanerhtmlcleaner2.15


Configuration file for a node


                    
                    -->127.0.0.1:47500..47509192.168.4.110:47500..47509192.168.4.117:47500..47509

Also popular now: