Stream API & ForkJoinPool

    We continue the series of utilities that we share with you. Now again on Java.

    If you are already familiar with the Stream API and used it, then you know that this is a convenient way to process data. Using various built-in operations, such as map, filter, sort, and others, you can convert the input data and get the result. Before the advent of streams, the developer was forced to imperatively describe the processing process, that is, create a for loop on the elements, then compare, analyze and sort if necessary. The Stream API allows you to declaratively describe what you want to get without having to describe how to do it. Something like SQL when working with databases.



    Streams made Java code more compact and readable. Another idea when creating the Stream API was to provide the developer with an easy way to parallelize tasks so that you can get performance gains on multi-core machines. In this case, it was necessary to avoid the complexity inherent in multi-threaded programming. And it was possible to do this, in the Stream API there are BaseStream :: parallel and Collection.parallelStream () methods that return a parallel stream.

    That is, if we had a code:

    Collection.stream().operation()

    then it is easy to parallelize if you change one call

    Collection.parallelStream().operation()

    or in the general case for an arbitrary stream:

    Source.stream().parallel().operation()

    Like any simple API, parallelStream () hides a complex mechanism for parallelizing operations. And the developer will have to face the fact that using a parallel stream may not improve performance, but even worsen it, so it is important to understand what happens after the call to parallelStream (). There is an article by Doug Lea about when parallel streams give a positive effect. Attention should be paid to the following factors:

    F - operation that will be applied to each element of the stream. It must be independent - that is, it does not affect other elements except the current one and does not depend on other elements (stateless non-interfering function)

    S - data source (collection) is efficiently splittable. For example, ArrayList is an efficiently separable source; it is easy to calculate indices and intervals that can be processed in parallel. Also efficiently handle HashMap. BlockingQueue, LinkedList, and most IO sources are poor candidates for parallel processing.

    Assessing the benefits of parallel processing. On modern machines, it makes sense to parallelize tasks whose execution time exceeds 100 microseconds.

    Thus, before using this tool, you need to understand how your task fits into the described limitations.

    While experimenting with parallel (), I came across another interesting point related to the current implementation. Parallel () tries to execute your code in several threads and it becomes interesting who creates these threads and how it manages them.

    Let's try to run this code:

    void parallel() {
      int result = IntStream.range(0, 3)
    	.parallel()
    	.peek(it -> System.out.printf("Thread [%s] peek: %d\n",
    		Thread.currentThread().getName(), it))
    	.sum();
      System.out.println("sum: " + result);
    }
    

    Thread [ForkJoinPool.commonPool-worker-1] peek: 0
    Thread [main] peek: 1
    Thread [ForkJoinPool.commonPool-worker-0] peek: 2
    sum: 3

    Already interesting, it turns out, by default, parallel stream uses ForkJoinPool.commonPool. This pool is created statically, that is, the first time it accesses ForkJoinPool, it does not respond to shutdown () / shutdownNow () and lives until System :: exit is called. If tasks are not specified with a specific pool, then they will be executed within the framework of commonPool.

    Let's try to find out what is the size of commonPool and look at the jdk1.8.0_111 sources. For readability, some calls that are not related to parallelism have been removed.

    ForkJoinPool::makeCommonPool
    private static ForkJoinPool makeCommonPool() {
      int parallelism = -1;
      try {  // ignore exceptions in accessing/parsing properties
      	String pp = System.getProperty
              ("java.util.concurrent.ForkJoinPool.common.parallelism");
            if (pp != null)
              parallelism = Integer.parseInt(pp);
      } catch (Exception ignore) {
      }
      if (parallelism < 0 && // default 1 less than #cores
           (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
        parallelism = 1;
      if (parallelism > MAX_CAP)
        parallelism = MAX_CAP;
      return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
            "ForkJoinPool.commonPool-worker-");
    }
    

    From the same class constant:

    static final int MAX_CAP      = 0x7fff;        // max #workers - 1


    We are interested in parallelism, which is responsible for the number of workers in the pool. By default, the pool size is Runtime.getRuntime (). AvailableProcessors () - 1, that is, 1 less than the number of available cores. When you create custom FJPool, you can set the desired level of parallelism through the constructor. And for commonPool, you can set the level through the JVM parameters:

    -Djava.util.concurrent.ForkJoinPool.common.parallelism=n

    The property above is limited to 32767 (0x7fff);

    This can be useful if you do not want to give all the cores to ForkJoinPool tasks, perhaps your application normally uses 4 out of 8 CPUs, then it makes sense to give the remaining 4 cores to FJ.

    The question is why the number of workers is 1 less than the number of cores. The answer can be seen in the documentation for ForkJoinPool.java:

    When external threads submit to the common pool, they can perform subtask processing (see externalHelpComplete and related methods) upon joins. This caller-helps policy makes it sensible to set common pool parallelism level to one (or more) less than the total number of available cores, or even zero for pure caller-runs

    That is, when a thread sends a task to the common pool, the pool can use the calling thread (caller-thread) as a worker. That is why in the output of the program we saw main! The answer is found, ForkJoinPool is trying to load its tasks and the calling thread. In the code above, this is main, but if we call the code from another thread, we will see that this also works for an arbitrary thread:

    Thread t = new Thread(() -> {
       parallel();
    }, "MyThread");
    t.start();
    t.join();
    

    Thread [ForkJoinPool.commonPool-worker-1] peek: 0
    Thread [MyThread] peek: 1
    Thread [ForkJoinPool.commonPool-worker-0] peek: 2
    sum: 3

    Now we know a little more about the ForkJoinPool device and parallel stream. It turns out that the number of parallel stream workers is limited and these general-purpose workers, that is, can be used by any other tasks that run on commonPool. Let's try to understand what this is fraught with for us in development.

    Consider the following code. For clarity, we start with -Djava.util.concurrent.ForkJoinPool.common.parallelism = 2, that is, 2 workers and a calling thread are available for FJPool.commonPool.

    final long ms = System.currentTimeMillis();
    ForkJoinPool commonPool = ForkJoinPool.commonPool();
    System.out.println("Parallelism: " + commonPool.getParallelism());
    IntStream.range(0, commonPool.getParallelism() + 1).forEach((it) -> commonPool.submit(() -> {
      try {
        System.out.printf("[%d sec] [%s]: #%d start()\n",
            TimeUnit.SECONDS.convert(System.currentTimeMillis() - ms, TimeUnit.MILLISECONDS),
            Thread.currentThread().getName(), it);
        TimeUnit.SECONDS.sleep(5);
      } catch (Exception e) {e.printStackTrace();}
    System.out.printf("[%d sec] [%s]: #%d finish()\n",
        TimeUnit.SECONDS.convert(System.currentTimeMillis() - ms, TimeUnit.MILLISECONDS),
        Thread.currentThread().getName(), it);
    }));
    int result = IntStream.range(0, 3)
        .parallel()
        .peek(it -> System.out.printf("Thread [%s] peek: %d\n",
            Thread.currentThread().getName(), it))
        .sum();
    System.out.println("sum: " + result);
    commonPool.awaitTermination(100, TimeUnit.SECONDS);

    Parallelism: 2
    [0 sec] [ForkJoinPool.commonPool-worker-1]: #0 start()
    Thread [main] peek: 1
    [0 sec] [ForkJoinPool.commonPool-worker-0]: #1 start()
    Thread [main] peek: 2
    Thread [main] peek: 0
    sum: 3
    [0 sec] [main]: #2 start()
    [5 sec] [ForkJoinPool.commonPool-worker-0]: #1 finish()
    [5 sec] [ForkJoinPool.commonPool-worker-1]: #0 finish()
    [5 sec] [main]: #2 finish()

    The following happens in the code: we are trying to completely occupy the pool by sending parallelism + 1 task there (i.e. 3 pieces in this case). After that, we start the parallel processing of the stream from the first example. The logs show that the parallel stream is executed in one thread, since all resources of the pool are exhausted. Without knowing such a feature, it will be difficult to understand if your program will increase the processing time of some request through BaseStream :: parallel.

    What to do if you want to be sure that your code will really be parallelized? There is a solution, we need to run parallel () on the custom pool, for this we have to slightly modify the code from the example above and run the data processing code, like Runnable on custom FJPool:

    ForkJoinPool custom = new ForkJoinPool(2);
    custom.submit(() -> {
               int result = IntStream.range(0, 3)
                       .parallel()
                       .peek(it -> System.out.printf("Thread [%s] peek: %d\n", Thread.currentThread().getName(), it))
                       .sum();
               System.out.println("sum: " + result);
           });

    Parallelism: 2
    [0 sec] [ForkJoinPool.commonPool-worker-1]: #0 start()
    Thread [ForkJoinPool-1-worker-0] peek: 0
    Thread [ForkJoinPool-1-worker-1] peek: 1
    [0 sec] [main]: #2 start()
    [0 sec] [ForkJoinPool.commonPool-worker-0]: #1 start()
    Thread [ForkJoinPool-1-worker-0] peek: 2
    sum: 3
    [5 sec] [ForkJoinPool.commonPool-worker-1]: #0 finish()
    [5 sec] [ForkJoinPool.commonPool-worker-0]: #1 finish()
    [5 sec] [main]: #2 finish()

    Okay, now we have achieved our goal and are confident that our calculations are under control and no one can influence them from the outside.

    Before applying any, even the simplest tool, it is necessary to find out its features and limitations. There are a lot of such features for parallel stream and it is necessary to consider how suitable your task is for parallelization. Parallel stream works well if the operations are independent and do not store state, the data source can be easily divided into segments for parallel processing and it really makes sense to execute the task in parallel. In addition, you need to take into account the features of the implementation and make sure that for important calculations you are using a separate thread pool, and not share it with the general application pool.

    Questions and suggestions, as always welcome, as this is part of our Java course and we are interested in an opinion on the material.

    Also popular now: