HBase loading large datasets via bulk load

    Hello colleagues.
    I want to share my experience using HBase, namely to talk about bulk loading . This is another data loading method. It differs fundamentally from the usual approach (writing to the table through the client). There is an opinion that with the help of bulk load you can load huge amounts of data very quickly. This is what I decided to figure out.

    And so, first things first. Loading through bulk load occurs in three stages:

    • We put data files in HDFS
    • We start the MapReduce task, which converts the source data directly to HFile format files, so HBase stores its data in such files.
    • We start bulk load function which will fill in (bind) the received files in the HBase table.




    In this case, I needed to feel this technology and understand it in numbers: what is the speed equal to, how does it depend on the number and size of files. These numbers are too dependent on external conditions, but help to understand the orders between normal loading and bulk load.

    Initial data:


    Cluster managed by Cloudera CDH4, HBase 0.94.6-cdh4.3.0.
    Three virtual hosts (on the hypervisor), in the CentOS / 4CPU / RAM configuration of 8GB / HDD 50GB The
    test data was stored in CSV files of various sizes, with a total volume of 2GB, 3.5GB, 7.1GB and 14.2GB
    First about the results:

    Bulk loading


    Speed:
    • Max 29.2 Mb / sec or 58K rec / sec (3.5GB in 28 files)
    • Average 27 Mb / sec or 54K rec / sec (working speed closer to reality)
    • Min 14.5 Mb / sec or 29K rec / sec (2GB in 100 files)
    • 1 file uploads 20% faster than 100


    Size of one record (row): 0.5Kb
    MapReduce Job initialization time: 70 sec
    File upload time to HDFS from the local file system:
    • 3.5GB / 1 file - 65 sec
    • 7.5GB / 100 - 150 sec
    • 14.2G / 1 file - 285 sec


    Download through clients:


    Download was carried out from 2 hosts with 8 threads each.
    Clients started on the crown at the same time, the CPU load did not exceed 40%. The
    size of one record (row), as in the previous case, was 0.5Kb.



    What is the result?




    I decided to implement this test in the wake of the talk about bulk load as a way of ultra-fast data loading. It is worth saying that in the official documentation we are talking only about reducing the load on the network and CPU. Be that as it may, I do not see a gain in speed. Tests show that bulk load is only 1.5 times faster, but let's not forget that this is not taking into account the initialization of m / r job. In addition, the data must be delivered to HDFS, it will also take some time.
    I think it’s worth treating bulk load simply as another way to load data, architecturally different (in some cases very convenient).

    And now for the implementation


    Theoretically, everything is quite simple, but in practice there are several technical nuances.

    //Создаём джоб
    Job job = new Job(configuration, JOB_NAME);
    job.setJarByClass(BulkLoadJob.class);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);
    job.setMapperClass(DataMapper.class);
    job.setNumReduceTasks(0);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(HFileOutputFormat.class);
    FileInputFormat.setInputPaths(job, inputPath);
    HFileOutputFormat.setOutputPath(job, new Path(outputPath));
    HTable dataTable = new HTable(jobConfiguration, TABLE_NAME);
    HFileOutputFormat.configureIncrementalLoad(job, dataTable);
    //Запускаем
    ControlledJob controlledJob = new ControlledJob(
        job,
        null
    );
    JobControl jobController = new JobControl(JOB_NAME);
    jobController.addJob(controlledJob);
    Thread thread = new Thread(jobController);
    thread.start();
    .
    .
    .
    //Даём права на output
    setFullPermissions(JOB_OUTPUT_PATH);
    //Запускаем функцию bulk-load
    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(jobConfiguration);
    loader.doBulkLoad(
            new Path(JOB_OUTPUT_PATH),
            dataTable
    );
    


    • MapReduce Job creates output files with the rights of the user on whose behalf it was launched.
    • bulk load is always started on behalf of the hbase user, therefore it cannot read the files prepared for it, and crashes with this exception: org.apache.hadoop.security.AccessControlException: Permission denied: user = hbase


    Therefore, you must run Job on behalf of the hbase user or give rights to the output files (this is exactly what I did).

    • You must create the HBase table correctly. By default, it is created with one Region. This leads to the fact that only one reducer is created and the recording goes only to one node, loading it 100%, while the others smoke.
      Therefore, when creating a new table, you must do pre-split. In my case, the table was divided into 10 Regions uniformly scattered throughout the cluster.


    //Создаём таблицу и делаем пре-сплит
    HTableDescriptor descriptor = new HTableDescriptor(
            Bytes.toBytes(tableName)
    );
    descriptor.addFamily(
            new HColumnDescriptor(Constants.COLUMN_FAMILY_NAME)
    );
    HBaseAdmin admin = new HBaseAdmin(config);
    byte[] startKey = new byte[16];
    Arrays.fill(startKey, (byte) 0);
    byte[] endKey = new byte[16];
    Arrays.fill(endKey, (byte)255);
    admin.createTable(descriptor, startKey, endKey, REGIONS_COUNT);
    admin.close();
    


    • MapReduce Job writes to the output directory, which we tell him, but at the same time creates subdirectories of the same name as column family. Files are created there.


    In general, that’s all. I want to say that this is a rather crude test, without tricky optimizations, so if you have something to add, I will be glad to hear.

    All project code is available on GitHub: github.com/2anikulin/hbase-bulk-load

    Also popular now: