When the Old MapReduce is Better than the New Tez



    As everyone knows, the amount of data in the world is growing, it is becoming increasingly difficult to collect and process the flow of information. For this, the popular Hadoop solution with the idea of ​​simplifying the development and debugging methods of multithreaded applications using the MapReduce paradigm is used. This paradigm is not always successful in its tasks, and after a while there is a "superstructure" over Hadoop: Apache Tez with the DAG paradigm . The appearance of Tez also adjusts to the Hive HDFS-SQL handler. But not always the new is better than the old. In most cases, HiveOnTez is significantly faster than HiveOnMapReduce, but some pitfalls can greatly affect the performance of your solution. Here I want to tell you what nuances I encountered. Hope this helps you speed up ETL or another Hadoop UseCase.

    MapReduce, Tez and Hive


    As I said earlier, there are more and more data in the world. And for their storage and processing come up with increasingly tricky solutions, among them Hadoop. To make the process of processing data stored on HDFS easy even for an average analyst, there are several SQL add-ons for Hadoop. The oldest and “simplest” of them is Hive. The essence of Hive is this: we have the data in some intelligible column-store format, we enter information about them in the metadata, we write standard SQL with a number of restrictions, and it generates a chain of MapReduce-jobs that solve our problem. Great, comfortable, but slow. For example, here is a simple query:

    select 
        t1.column1, 
        t2.column2 
    from 
        table1 t1
        inner join table2  t2 on t1.column1 = t2.column1
    union 
    select 
        t3.column1, 
        t4.column2 
    from 
        table3 t3
        inner join table4 t4 on t3.column1 = t4.column1
    order by 
        column1;
    

    This query spawns four jobs:

    • table1 inner join table2;
    • table3 inner join table4;
    • union;
    • sort.



    The steps are performed sequentially, and each of them ends with writing data to HDFS. It looks very suboptimal. For example, steps 1 and 2 could be executed in parallel. And there are situations when it’s wise to apply the same Mapper at several steps, and then apply several types of Reducers to the results of these Mappers. But the concept of MapReduce within the framework of one job doesn’t allow to do this. To solve this problem, Apache Tez with the DAG concept quickly enough appears. The essence of DAG is that instead of a pair of Mapper-Reducer (+ epsilon) we construct a non-cyclic directed graph, each vertex of which is a Mapper.Class or Reduser.Class, and the edges indicate data flows / execution order. In addition to DAG, Tez provided several more bonuses: accelerated launch of jobs (you can send DAG jobs through the already running Tez-Engine), the ability to hold resources in the node’s memory between steps, run parallelization on your own, etc. Naturally, the corresponding add-on for Hive came out with Tez. With this add-in, our query will turn into a DAG-job of approximately the following structure:

    1. Mapper reads table1.
    2. Mapper reads table2 and joins it with the result of step 1.
    3. Mapper reads table3 and filters column1 IS NOT NULL.
    4. Mapper reads table4 and filters column1 IS NOT NULL.
    5. Reducer joins the results of steps 3 and 4.
    6. Reducer doing union.
    7. Reducer Group By and Sort.
    8. Collects the result.



    In fact, steps 1 and 2 are the first join, and 2, 3 and 4 are the second join (I specially selected tables of different sizes so that the joins were processed differently). In this case, two blocks are independent of each other and can be performed in parallel. This is already very cool. Tez really provides a significant increase in the speed of processing complex queries. But sometimes Tez can be worse than MapReduce, and so before sending it to production, you should try the query with both s set hive.execution.engine=tezand c set hive.execution.engine=mr.

    So what is Tez?


    All you need to know about Tez: it changes MapReduce logic to DAG logic (directed acyclic graph), providing the ability to simultaneously execute several different processes within the same DataFlow, be it Mapper or Reducer. The main thing is that its input is ready. Data can be stored locally on the nodes between steps, and sometimes just in the node’s RAM, without resorting to disk operations. You can optimize the number and location of Mappers and Reducers in order to minimize data transfer over the Web even taking into account multi-step calculations, reuse containers that have already worked in neighboring processes within the framework of one Tez-Job, and adjust parallel execution to statistics, collected in the previous step. Moreover, the engine allows the end user to create DAG tasks with the same simplicity as MapReduce, while he himself will be engaged in resources, restarts and DAG management on the cluster. Tez is very mobile, adding Tez support does not break already running processes, and testing the new version is possible locally "on the client side" when the old version of Tez will work in all cluster tasks. Last but not least: note that Tez can run on a cluster as a service and run in the background, which allows it to send tasks for execution much faster than it does when MapReduce was launched normally. If you have not tried Tez and you still have doubts, then look at the speed comparison published in Restarts and DAG management on the cluster. Tez is very mobile, adding Tez support does not break already running processes, and testing the new version is possible locally "on the client side" when the old version of Tez will work in all cluster tasks. Last but not least: note that Tez can run on a cluster as a service and run in the background, which allows it to send tasks for execution much faster than it does when MapReduce was launched normally. If you have not tried Tez and you still have doubts, then look at the speed comparison published in Restarts and DAG management on the cluster. Tez is very mobile, adding Tez support does not break already running processes, and testing the new version is possible locally "on the client side" when the old version of Tez will work in all cluster tasks. Last but not least: note that Tez can run on a cluster as a service and run in the background, which allows it to send tasks for execution much faster than it does when MapReduce was launched normally. If you have not tried Tez and you still have doubts, then look at the speed comparison published in Last but not least: note that Tez can run on a cluster as a service and run in the background, which allows it to send tasks for execution much faster than it does when MapReduce was launched normally. If you have not tried Tez and you still have doubts, then look at the speed comparison published in Last but not least: note that Tez can run on a cluster as a service and run in the background, which allows it to send tasks for execution much faster than it does when MapReduce was launched normally. If you have not tried Tez and you still have doubts, then look at the speed comparison published inHortonWorks presentations :



    And paired with Hive:



    But for all the beauty of the graphs and descriptions, there are problems with HiveOnTez.

    Tez is less resistant to uneven data distribution than MapReduce


    The first and biggest problem lies in the difference between creating a DAG-job and a MapReduce-job. They have one principle: the number of Mappers and Reducers is calculated at the time the job starts. Only when a query is executed by a chain of MapReduce-jobs does Hadoop calculate the required number of tasks based on the results of the previous steps and collected analytics by sources, and in the case of DAG-job this happens before all steps are calculated, only based on analytics.

    I will explain with an example. Somewhere in the middle of the query, as we execute the nested queries, we have two tables. According to statistics, each has n rows and k unique join key values. At the output, we expect approximately n * k lines. And suppose this quantity fits well in one container, and Tez will highlight one Reducer in the next step (say, sorting). And this number of Reducers will not change during the execution process regardless of anything. Now suppose that in fact these tables have a very bad skew: for one value there is n - k + 1 row, and all the rest - for one row. Thus, at the output we get n ^ 2 + k ^ 2 - 2kn - k + 2n rows. That is, (n + 2 - 2k) / k + (k - 1) / n is twice as large as n / k. And already such quantity one Reducer will carry out an eternity. And in the case of MapReduce, having received at the output of this step n ^ 2 + k ^ 2 - 2kn - k + 2n, Hadoop will objectively evaluate his strength and give out the necessary number of Mappers and Reducers. As a result, with MapReduce everything will work much faster.

    Dry calculations may seem far-fetched, but in reality this situation is real. And if it did not happen, then consider yourself lucky. I came across a similar Tez-DAG effect when using lateral view in complex queries or custom Mappers.

    Features tuning Tez


    Ironically, the last important Tez feature I know that can do much harm is its DAG power. Most often, a cluster is not just a repository of information. It is also a system in which data is processed post-processing, and it is important that the rest of the activity does not affect this part of the cluster. Since nodes are a resource, usually the number of your containers is not unlimited. So, when you run a job, it’s better not to clog all the containers, so as not to significantly slow down the regular processes. And here DAG can put you a pig. DAG requires (on average over the chamber) fewer containers due to their reuse, smoother loading, etc. But when there are many quick steps, the containers begin to multiply exponentially. The first Mappers have not yet been finalized, but the data is already being distributed to other Mappers, containers stand out for all this, and - boom! Your cluster is clogged in the ceiling, no one else can start a single job. There are not enough resources, and you watch how slowly the numbers on the progress bar change. Due to its consistency, MapReduce is spared this effect, but you pay for it, as always, with speed.

    We have long known how to deal with the fact that the standard MapReduce takes up too many containers. Adjust the parameters:

    • mapreduce.input.fileinputformat.split.maxsize: decreasing - increasing the number of Mappers;
    • mapreduce.input.fileinputformat.split.minsize: increasing - reducing the number of Mappers;
    • mapreduce.input.fileinputformat.split.minsize.per.node, mapreduce.input.fileinputformat.split.minsize.per.rack: finer tuning to control local (in the sense of node or rack) partitions;
    • hive.exec.reducers.bytes.per.reducer: increasing - reducing the number of Reducers;
    • mapred.tasktracker.reduce.tasks.maximum: set the maximum number of Reducers;
    • mapred.reduce.tasks: set a specific number of Reducers.

    Caution! In DAG, all reduce steps will have as many processes as you specify here! But Tez’s parameters are more cunning, and the parameters that we set for MapReduce do not always affect it. Firstly, Tez is very sensitive to hive.tez.container.size, and the Internet advises taking a value between yarn.scheduler.minimum-allocation-mband yarn.scheduler.maximum-allocation-mb. Secondly, take a look at the retention parameters of an unused container:

    • tez.am.container.ide.release-timeout-max.millis;
    • tez.am.container.ide.release-timeout-min.millis.

    The option tez.am.container.reuse.enabledactivates or deactivates the reuse of containers. If it is disabled, the previous two parameters do not work. And thirdly, look at the grouping options:

    • tez.grouping.split-waves;
    • tez.grouping.max-size;
    • tez.grouping.min-size.

    The fact is that for the sake of parallelizing the reading of external data, Tez changed the process of forming tasks: first, Tez estimates how many waves (w) can be run on the cluster, then this number is multiplied by a parameter tez.grouping.split-waves, and the product (N) is divided by the number of standard splits per task. If the result of actions is between tez.grouping.min-sizeand tez.grouping.max-size, then everything is fine and the task starts in N tasks. If not, the number adapts to the frame. The Tez documentation advises “only as an experiment” to set a parameter tez.grouping.split-countthat cancels all the above logic and groups the splits into the number of groups specified in the parameter. But I try not to use this property, it does not give flexibility to Tez and Hadoop in general for optimization for specific input data.

    Nuances of Tez


    In addition to major problems, Tez is not free from small flaws. For example, if you use http Hadoop ResourceManager, then you will not see in it how many Tez-jobs take up containers, and even more so - in what state are its Mappers and Reducers. To monitor cluster status, I use this little python script:

    import os
    import threading
    result = []
    e = threading.Lock()
    def getContainers(appel):
        attemptfile = os.popen("yarn applicationattempt -list " + appel[0])
        attemptlines = attemptfile.readlines()
        attemptfile.close()
        del attemptlines[0]
        del attemptlines[0]
        for attempt in attemptlines:
            splt = attempt.split('\t');
            if ( splt[1].strip()  == "RUNNING" ):
                containerfile = os.popen("yarn container -list " + splt[0] )
                containerlines = containerfile.readlines()
                containerfile.close()
                appel[2] += int( containerlines[0].split("Total number of containers :")[1].strip()  )
        e.acquire()
        result.append(appel)
        e.release()
    appfile = os.popen("yarn application -list -appStates RUNNING")
    applines = appfile.read()
    appfile.close()
    apps = applines.split('application_')
    del apps[0]
    appsparams = []
    for app in apps:
        splt = app.split('\t')
        appsparams.append(['application_' + splt[0],splt[3], 0])
    cnt = 0
    threads = []
    for app in appsparams:
        threads.append(threading.Thread(target=getContainers, args=(app,)))
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    result.sort( key=lambda x:x[2] )
    total = 0
    for app in result:
        print(app[0].strip() + '\t' + app[1].strip() + '\t' + str(app[2]) )
        total += app[2]
    print("Total:",total)
    

    Despite the assurances of HortonWorks, our practice shows that when you do a simple SELECT smth FROM table WHERE smth in Hive, most often MapReduce will work faster, though not by much. In addition, at the beginning of the article I deceived you a little: parallelization in HiveOnMapReduce is possible, but not so smart. It is enough to do set hive.exec.parallel=trueand configure set hive.exec.parallel.thread.number=... - and the independent steps (Mapper + Reducer pairs) will be executed in parallel. Yes, there is no possibility that at the output of one Mapper several Reducers or the next Mappers will be launched. Yes, parallelization is much more primitive, but it also speeds up the work.

    Another interesting feature of Tez: it runs its engine on the cluster and keeps it on for a while. On the one hand, this really speeds up the work, since the task runs on nodes much faster. But on the other hand, there is an unexpected minus: important processes cannot be started in this mode, because the TEZ-engine spawns too many classes over time and crashes with GC-overflow. And it happens like this: you started at night nohup hive -f ....hql > hive.log &, came in the morning, and it fell somewhere in the middle, the highway ended, temporary tables are gone, and everything needs to be counted again. Unpleasant.

    It adds to the piggy bank of minor problems that the good old MapReduce is already included in a stable release, and TEZ, despite its popularity and progressiveness, is still in version 0.8.4, and bugs in it can be encountered at any step. The worst bug for me is deleting information, but I haven’t seen this. But we encountered an incorrect calculation on Tez, and MapReduce considers it correct. For example, my colleague used two tables - table1 and table2, which have a unique EntityId field. I made a request through Tez:

    select 
        table1.EntityId, count(1)
    from 
        table1
        left join table2 on table1.EntityId = table2.EntityId
    group by 
        EntityId 
    having 
        count(1) > 1
    

    And I got some lines at the output! Although MapReduce expectedly returned an empty result. There is a bugreport about a similar problem .

    Conclusion


    Tez is an unconditional benefit that in most cases makes life easier, allows you to write more complex queries in Hive and expect quick answers to them. But, like any good, it requires a cautious approach, prudence and knowledge of some nuances. And as a result, sometimes using the old, proven, reliable MapReduce is better than using Tez. I was very surprised that I could not find a single article (neither in RuNet nor in English) about the minuses of HiveOnTez, and decided to fill this gap. I hope that the information will be useful to someone. Thanks! Good luck to everyone and bye!

    Also popular now: