Apache Spark - advantages, disadvantages, wishes

    I have long wanted to express my impressions of Apache Spark, and then this article from the Pivotal employee Robert Bennett , just recently published on June 26, 2018, just caught my eye .

    This will not be a translation, but rather, my impressions and comments on the topic .

    What makes Spark popular?


    Quote:
    It's easy to see why Apache Spark is so popular. It does in-memory, distributed and iterative computation, which is particularly useful when working with machine learning algorithms. Other tools might require writing intermediate results to disk and reading them back into memory, which can make using iterative algorithms painfully slow.
    To begin with, this is for the most part not entirely true. In memory? Well, yes, Spark will try, but what is written here about other tools will also take place. In the end, memory, processor cores and the network are limited resources, so sooner or later any tool rests on their limits.

    In a sense, Spark is never more in-memory than any classic map-reduce. One way or another, the data should either end up on disk (among other things, it will allow you to more reliably survive errors and not start calculations from the very beginning), or transferred over the network (shuffle and other processes). I’m not saying that you, as a programmer, are not much trouble to persist, and save intermediate results to disk, if you suddenly want to. Do you want to save them in memory if we say terabytes of data? I doubt it.

    I would rather say that, unlike other tools (which are usually understood as classic map-reduce), Spark allows you to think a little less about the optimal use of resources, and optimizes this use itself more. And the final speed, ultimately, rather depends on the straightness of the hands of the person who writes the program.

    Further, the author lists such qualities of Spark that seem to him the best:

    Attractive APIs and Lazy Execution


    In general, I agree with this. Spark as a development tool is much more convenient than the classic map-reduce, and somewhat more convenient than tools like Apache Crunch and other tools from the conditional “second” generation. It is also somewhat more flexible than Hive, for example, and is not limited to SQL as such.

    Lazy performance is not always good. Sometimes it would be better if we say the differences in the Hive and DataSet circuits were diagnosed not when all the data had already been processed, but a little earlier, and everything would fall not after a couple of hours / day, but at startup.

    Easy Conversion


    Here the author mainly had in mind the transformations between the Spark and Python / Pandas structures. I am far from this, therefore I will not speak out. Perhaps I’ll tell you about pySpark below.

    Easy Transformations


    Another asset of Spark is the “map-side join” broadcast method. This method speeds up joins significantly when one of the tables is smaller than the other and can fit in its entirety on individual machines. The smaller one gets sent to all nodes so the data from the bigger table doesn't need to be moved around. This also helps mitigate problems from skew. If the big table has a lot of skew on the join keys, it will try to send a large amount of data from the big table to a small number of nodes to perform the join and overwhelm those nodes.
    I don’t know what they have in python, but in our area map-side join is easily done either with bare hands or with any of the tools like Crunsh. I don’t see any special advantages in this, many people know how, for example, Hive. With the de facto absence of indices in the Hadoop map side join ecosystem, perhaps one of the main join optimization tools in general.

    The API for transformation is quite convenient, although heterogeneous. Say, the “old” RDD API, being probably a little more flexible, at the same time gives more scope for making a mistake, especially if you are working not at the level of fixed structure classes (Java Beans), but Row and with a flexible data structure. The discrepancy between the real and the expected Spark schemes is quite common in this case.

    As for the DataSet API, I would say that it is very good. After some practice, it is quite possible to write everything on it as easily as on SQL, supplementing it with your UDF, and achieving greater flexibility. At the same time, UDFs themselves are written easier than for Hive, and some difficulties arise only when returning from them complex data structures (arrays, map, struct), and even then in Java, and rather because structures are expected for Scala.

    Let's say I managed to use such a thing as the Java port pymorphy2 quite easily in the form of UDF. Or geocoder. In essence, all you need to do is properly initialize your UDF, keeping in mind the features of Spark serialization.

    But the Spark ML API, on the other hand, looks like it was being designed by completely different people. This does not mean that he is bad - he is just different.

    Open source community


    Spark has a massive open-source community behind it. The community improves the core software and contributes practical add-on packages. For example, a team has developed a natural language processing library for Spark. Previously, a user would either have to use other software or rely on slow user-defined functions to leverage Python packages such as Natural Language Toolkit.
    Here in general there is nothing to add. The community is really big, skilled and friendly. A huge number of extensions are being written for Spark.

    We will leave the next passage about slow UDF to the conscience of a pythonist - Scala / Java UDF are not so slow at all, and at the same time very convenient.

    What I would add from myself:

    Development in different languages


    Probably one of the reasons for its popularity is the support of several development languages ​​(Scala, Java, Python and R). By and large, the API for different languages ​​is approximately equally convenient, but I would not call this support ideal. Say, by launching your Spark application, you immediately choose between Java / Scala and Python, and you cannot combine languages ​​in one run. Thus, the integration between the parts of the application on pySpark (on which ML or NLP parts are often written), and Java / Scala is really possible only through files / databases. Well, or something like Kafka, REST, etc. options.

    Streaming


    Spark Streaming (not to be confused with Hadoop Streaming, which is completely different), this is another attractive part of Spark's capabilities. If you describe it in one sentence, then this is the processing of streaming data coming from, say, Kafka, ZeroMQ, etc. by the same means as the data taken from the database.

    All the charm is precisely in the fact that the means are the same, i.e. You practically do not have to change anything in the program to start processing data from Kafka. Neither map reduce, nor Crunch, nor Cascading will allow you to do such a trick.

    disadvantages


    Each has its own shortcomings (c). What problems can you face when working with Spark?

    Cluster management


    Spark is notoriously difficult to tune and maintain. That means ensuring top performance so that it doesn't buckle under heavy data science workloads is challenging. If your cluster isn't expertly managed, this can negate “the Good” as we described above. Jobs failing with out-of-memory errors is very common and having many concurrent users makes resource management even more challenging.
    Did anyone promise? Actually, I wrote above that everything is wonderful and just can happen exactly in one case - if you have either not a very large task, or as many resources as you like - or in other words, the task is not too complicated.

    In other cases, which are most obvious, Spark applications need to be tuned, configured and maintained.
    Do you go with fixed or dynamic memory allocation? How many of your cluster's cores do you allow Spark to use? How much memory does each executor get? How many partitions should Spark use when it shuffles data? Getting all these settings right for data science workloads is difficult.
    Say, it would seem a relatively simple task of choosing the number of executors. In principle, knowing something about your data, you can safely calculate this number. But in a situation where not only you use resources, everything becomes much more fun. If your process also involves accessing other applications, then ...

    For example, I have an application, part of the functionality of which is reverse geocoding. And he is engaged in a separate ArcGIS server. At the same time, ArcGIS has only 4 cores at its disposal, and the Hadoop cluster where Spark is running has dozens of nodes; as a result, if we just select Spark with only 8 executors, the load curve of the ArcGIS processor jumps to 100%, where it remains at a couple of hours of application operation. And if we transfer this task to Spark (after rewriting the application code previously), then the operating time is reduced by a couple of orders of magnitude - due to the fact that we can use the cluster resources for this task too.

    That is, we often have a bottleneck where either a fixed amount of resources is allocated, or these resources are managed in a different way (which Spark cannot influence). Accordingly, it would be naive to expect from Spark that it optimizes the use of these resources.

    Debugging


    It's true. Expected, however. We have a distributed parallel system, debugging and monitoring of which is a non-trivial task. To some extent, SparkUI solves monitoring issues, and Spark Metrics solves performance measurements, but try, say, connecting to an executable application with a debugger - you do not know the host where it works, nor the port that is free to connect. The same metrics that for a normal application can be easily obtained, for example, from JMX, in the case of a distributed application, they must be transmitted over the network, and only then can be collected. Yes, this is all relatively bad.

    Poor UDF performance in PySpark (Slowness of PySpark UDFs)


    Well, what can I say here? For what they fought, they ran into something (s). As far as I understand, UDF in python leads to the fact that there is a double conversion of data between the application and UDF. Just because python is still an alien language for the JVM ecosystem that Spark runs on, and UDF runs outside of it.

    Here you can advise only one thing - do not write in python, write in Scala / Java. It is clear that this advice is not always wanted and can be followed, but I am afraid that only Graal can solve this problem globally when its version of python is brought to the industrial level.

    It is difficult to guarantee the maximum level of parallelism (Hard-to-Guarantee Maximal Parallelism)


    One of Spark's key value propositions is distributed computation, yet it can be difficult to ensure Spark parallelizes computations as much as possible. Spark tries to elastically scale how many executors a job uses based on the job's needs, but it often fails to scale up on its own. So if you set the minimum number of executors too low, your job may not utilize more executors when it needs them. Also, Spark divides RDDs (Resilient Distributed Dataset) / DataFrames into partitions, which is the smallest unit of work that an executor takes on. If you set too few partitions, then there may not be enough chunks of work for all the executors to work on. Also, fewer partitions means larger partitions, which can cause executors to run out of memory.
    If only it were that simple. Let's start with a simple one - the parameters for starting should be tuned for each particular cluster. A prod cluster can have an order of magnitude more nodes, and many times more memory available on each. Settings for the Dev cluster will probably be underestimated when launched on Prod. All this becomes even more complicated if you start to take into account the current cluster loading tasks. In general terms, this task of allocating cluster resources is an optimization task that is quite nontrivial and does not have a single correct solution.

    If there are few partitions, then parallelism is insufficient. And if there are too many of them, then the size of each may be lower than some conditional lower limit, such as the size of the HDFS block. Since each task is the resources spent on its launch, obviously there is a lower limit to the size of the task, below which you should not go down, because overhead costs grow faster than productivity.

    A simple example is an application that needs some significant amount of directories. If in the case of the “normal” map-reduce task on Hadoop we usually deliver the code to the data, i.e. copy our application + Spark parts to the nodes of the cluster where our file (files) are located, then the directories are already similar to map side join, and they need to be delivered together with the code. And suddenly, the size of the data delivered to each node grew by a couple of orders of magnitude - for example, 10 megabytes (a small Spark application, without Spark itself), for example 20 gigabytes (a very real case, directories needed to normalize addresses, phones, etc. data) quite pulled by such a volume). Well, here it is - the price of excessive parallelism is evident.

    Perhaps there is a certain natural number of partitions, which is determined by the number of blocks into which our input file is divided, taking into account the replication coefficient. It is likely that this number is close to optimal in terms of reading data. That is, if we have three blocks in the file, and each block has copies on 2 nodes of the cluster, then we can naturally process 6 threads in parallel, processing each replica on its node. Of course, Spark takes these parameters into account when dynamically allocating resources.

    Unfortunately or fortunately, Spark is not a cluster resource planner. It is for example Yarn. So Spark just might not have enough information to optimally plan the use of all resources.

    Not too good integration with Hive


    On the one hand, Spark works great with Hive data and metadata. I would say that most of the applications that I came across are exactly what this is doing. But not without annoying problems. Say, if you try to use its partitionBy and bucketBy tools in Spark, it is very likely that Hive will not see the results of your work. At the same time, all you get is an indistinct warning somewhere in the logs.

    Compatibility


    Unfortunately, my experience is rather bad on this topic. We came across multiple problems when trying to run applications on clusters where the version of Spark was different than expected. When developing on Spark 2.2.0, there were problems when starting on 2.1 and 2.3.

    Say, in our case, Spark for some reason could not find one of the codecs (namely snappy) when starting on version 2.3. This is not a very serious problem if you need to write data (you can specify the codec when recording, and select any, including not packed data), but if you need to read something that is snappy packed, then you are clearly out of luck.

    Some of the problems may have been caused by errors in the installer, but this is not much easier. Nevertheless, it seems to me that the migration between minor versions should have been smoother.

    Well, alas, but Spark does not imply a full-time parallel installation on one cluster of two different versions of the same line (the same 2.2 and 2.3).

    Horrible parties


    API awkwardness


    Since much of the Spark API is so elegant, the inelegant parts really stand out. For example, we consider accessing array elements to be an ugly part of Spark life.
    I would not say that working with arrays is so terrible. Some inconvenience is caused by the fact that the Spark API was originally made on Scala, and there it has its own collection structure, which, working from Java, has to be reduced to Skalov. And so, if you can write UDF, then you can do anything with arrays. Ah, yes - in python, everything is bad with UDF, I forget all the time.

    Not very convenient and not too effective - yes, maybe. This is trying to solve the new version of Spark 2.4, which introduced new higher-order functions for working with complex structures (which will avoid the use of explode / collect).

    In my opinion, a much more inconvenient side of the API is that looking at the code, it is far from always obvious which part will be executed on the driver and which parts will be executed on other nodes. At the same time, the mechanism for distributing code across nodes involves serialization (in one way or another), and the code that runs on executors must be serializable. Understanding serialization errors you can learn a lot of new and interesting information about your code :).

    Classloaders


    Unfortunately, the issue of isolating application code from Spark code is not well resolved. However, the same applies to the classic map-reduce Hadoop applications. At the same time, Hadoop code uses some ancient versions of such a library as Google Guava, and other libraries are far from new, frankly. If you recall that Guava authors like to introduce backward incompatibility into their API by removing deprecated methods, we get a completely stupid picture - you write your code under Guava with a fresh version, run it, and it crashes - either because you really work with the Guava version from Hadoop (much older), and your code does not find methods from the new version, or Hadoop crashes because it is incompatible with the new version. This is a fairly typical, unfortunately, problem that every second developer is likely to encounter.

    SQL without bind variables


    Alas, the typical code for executing a query on a pair looks like this:

    val sqlDF = spark.sql ("SELECT * FROM people WHERE id = 1")

    The API does not provide an option for executing an id =? and parameter substitution at each execution. Well, let's say the problem of SQL-injection does not bother the authors, but the developers should substitute the parameters in the query, and accordingly, the replacement of special characters is entirely up to you and me. For the sake of objectivity, Hive suffers from the same, where it is also impossible to define a query with parameters.

    However, even funnier, for JDBC sources, it is formally impossible to even write a query - you can only specify a table, but not columns. Informally, it turns out that you can write something like (select a, b, c from d) t instead of a table, but if this will work in all cases, no one will tell you for sure.

    Lack of Maturity and Feature Completeness


    Hmm. Another's head - darkness.
    Another example feature gap is difficulty creating sequential unique record identifiers with Spark. A sequential, unique index column is helpful for some types of analysis. According to the documentation, “monotonically_increasing_id ()” generates a unique ID for each row, but does not guarantee that the IDs are consecutive. If consecutive IDs are important to you, then you may need to use Spark's older RDD format.
    I do not understand such claims. Sources are available, and it’s quite possible to peek, and at least read the comments:

    Returns monotonically increasing 64-bit integers.

    • The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.
    • The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits
    • represent the record number within each partition. The assumption is that the data frame has
    • less than 1 billion partitions, and each partition has less than 8 billion records.

    Well, i.e., this function simply takes the partition number, and adds a counter to it. Naturally, you are not guaranteed that no one will call it between your two consecutive calls. One Spark application is potentially a lot of JVMs running on different nodes of the cluster, and probably a lot of threads within a single JVM.

    Now let the author think a little more and try to come up with a way to generate exactly the id he wants in this parallel and distributed system, without creating a single point of generation (which will be a deliberately bottleneck), and without blocking (which will be the same by itself).

    What do we expect from Spark 2.4


    Already mentioned higher order functions


    This is really good. The main thing is to work.

    In fact, this is a set of built-in functions for working with arrays or maps, as well as the ability to perform transformations on them using your own functions (lambda).

    here you can see some examples of use.

    New execution mode


    This is the so-called barier scheduler and runtime. The authors intend it for machine learning tasks, but the set of such tasks is of course somewhat wider. In fact, these are tasks that are not common for Spark map-reduce. As I understand it, these are mostly messaging components that run once, or if they crash.

    If the API to support such tasks is convenient, then there is definitely a need for it. Say, in our company, such components are designed as Yarn-applications, and work from Spark somewhat separately. Tighter and more convenient integration within Spark would be worthwhile.

    Improved Avro Support


    Support for Avro was generally good. Some additional data types are supported, namely the so-called "logical types" (in fact, some derived types), which include Decimal, Date, Time, Duration, and others.

    Frankly, I wait more when the authors of Hive (well, Spark at the same time) learn how to better support parquet, creating tables based on its layout. This is now possible, but with Avro it looks and works more conveniently.

    Here you can read in more detail .

    Support for Scala 2.12 (experimental)


    It would seem that to me, as a mainly Java programmer, this does not matter, but within the framework of this project they promised to improve interaction with Java 8, for example, serializing lambdas, which would be very, very nice.

    Also popular now: