Introduction to reactive programming in Spring

Original author: Matt Raible
  • Transfer
Hi, Habr!

This week we are expecting a new book on Spring 5 from typography :


Among the interesting features of Spring 5, reactive programming deserves special mention, the implementation of which in this framework is briefly described by Matt Raible's proposed article. In the aforementioned book, the reactive patterns are discussed in Chapter 11.

Matt was co-authored by Josh Long, the author of another excellent book about Java and Spring, " Java in the Cloud, " published last summer.

Reactive programming is your way to creating systems that are resistant to high loads. Processing huge traffic is no longer a problem, since the server is non-blocking, and client processes do not have to wait for responses. The client can not directly observe how the program runs on the server, and synchronize with it. When an API finds it difficult to process requests, it must still give reasonable responses. Should not refuse and discard messages in an uncontrolled manner. Must report to the parent components that it is working under load so that they can partially release it from this load. This technique is called “backpressure” (backpressure), it is an important aspect of reactive programming.

This article we co-wrote with Josh Long. Josh is a Java champion, Spring Developer Advocate and generally a world guy who works at Pivotal. I've been working with Spring for a long time, but it was Josh who showed me the Spring Boot, it was at the Devoxx conference in Belgium. Since then we have become close friends, we are interested in Java and write cool applications.

Reactive programming or I / O, I / O, we go to work ...

Reactive programming is an approach to software development that actively uses asynchronous I / O. Asynchronous I / O is a small idea, fraught with big changes in programming. The idea itself is simple: to remedy the situation with the inefficient allocation of resources, freeing up those resources that would have been wasted without our intervention, waiting for the completion of input / output. Asynchronous I / O inverts the usual approach to handling I / O: the client is released and may be engaged in other tasks, waiting for new notifications.

Consider what is common between synchronous and asynchronous I / O, and what are the differences between them.

We write a simple program that reads data from a source (specifically, this is a link java.io.File). Let's start with the implementation, which uses the good oldjava.io.InputStream:

Example 1. Synchronous reading of data from a file

package com.example.io;
import lombok.extern.log4j.Log4j2;
import org.springframework.util.FileCopyUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.function.Consumer;
@Log4j2
classSynchronousimplementsReader{
    @Overridepublicvoidread(File file, Consumer<BytesPayload> consumer)throws IOException {
        try (FileInputStream in = new FileInputStream(file)) { //1byte[] data = newbyte[FileCopyUtils.BUFFER_SIZE];
            int res;
            while ((res = in.read(data, 0, data.length)) != -1) { //2
                    consumer.accept(BytesPayload.from(data, res)); //3
            }
        }
    }
}

  1. We provide the file for reading using the usual java.io.File
  2. We pull the results from the source one line at a time ...
  3. I wrote this code to receive Consumer<BytesPayloadgt;, called when new data is received

Simple enough, what do you say? Run this code - and you will see in the log output (to the left of each line), indicating that all actions occur in a single stream.
Here we pull the bytes from our data taken at the source (in this case we are talking about a subclass java.io.FileInputStreaminherited from java.io.InputStream). What is wrong with this example? In this case, we use the InputStream, pointing to the data located in our file system. If the file is there and the hard disk is functioning, then this code will work as expected.

But what will happen if we read data not from File, but from a network socket, and, moreover, we use a different implementationInputStream? There is nothing to worry about! Of course, there will be nothing to worry about, if the speed of the network is infinitely great. And if the network channel between this and another node will never fail. If these conditions are met, then the code will work perfectly.

And what will happen if the network starts to slow down or fall? In this case, I mean that we will increase the period to return the operationin.read(…). In fact, she may never return! This is a problem if we try to do something else with the stream from which we read data. Of course, you can always create another stream and read data through it. Up to a certain point, this can be managed, but in the end we will reach the limit at which simply adding flows for further scaling will not be enough. We will not have true competition over the number of cores that are on our machine. Dead end! In this case, we can increase the processing of input / output (here we mean reading) only through additional threads, and here we will sooner or later reach the limit.

In this example, the main piece of work falls on reading - almost nothing happens on other fronts. We are addicted to I / O. Consider how an asynchronous solution helps us partially overcome the monopolization of our flows.

Example 2. Asynchronous data read from file

package com.example.io;
import lombok.extern.log4j.Log4j2;
import org.springframework.util.FileCopyUtils;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
@Log4j2
classAsynchronousimplementsReader, CompletionHandler<Integer, ByteBuffer> {
    privateint bytesRead;
    privatelong position;
    private AsynchronousFileChannel fileChannel;
    private Consumer<BytesPayload> consumer;
    privatefinal ExecutorService executorService = Executors.newFixedThreadPool(10);
    publicvoidread(File file, Consumer<BytesPayload> c)throws IOException {
        this.consumer = c;
        Path path = file.toPath(); // 1this.fileChannel = AsynchronousFileChannel.open(path,
            Collections.singleton(StandardOpenOption.READ), this.executorService); //2
        ByteBuffer buffer = ByteBuffer.allocate(FileCopyUtils.BUFFER_SIZE);
        this.fileChannel.read(buffer, position, buffer, this); //3while (this.bytesRead > 0) {
                this.position = this.position + this.bytesRead;
                this.fileChannel.read(buffer, this.position, buffer, this);
        }
    }
    @Overridepublicvoidcompleted(Integer result, ByteBuffer buffer){ //4this.bytesRead = result;
        if (this.bytesRead < 0)
            return;
        buffer.flip();
        byte[] data = newbyte[buffer.limit()];
        buffer.get(data);
        //5
        consumer.accept(BytesPayload.from(data, data.length));
        buffer.clear();
        this.position = this.position + this.bytesRead;
        this.fileChannel.read(buffer, this.position, buffer, this);
    }
    @Overridepublicvoidfailed(Throwable exc, ByteBuffer attachment){
        log.error(exc);
    }
}

  1. This time we adapt java.io.Fileby makingJava NIO java.nio.file.Path
  2. When creating Channel, we, in particular, specify the service java.util.concurrent.ExecutorServicethat will be used to call the handler CompletionHandlerwhen the necessary data for this appears.
  3. We start reading by passing the link to CompletionHandler<Integer, ByteBuffer> (this)
  4. In the callback, we read the bytes from ByteBufferinto the container.byte[]
  5. Just like in the example with Synchronous, the data is byte[]transferred to the consumer.

Immediately make a reservation: this code turned goraaaazdo harder! Here there is such a bunch of things that immediately dizzy, but let me note ... this code reads data from Java NIO Channeland then processes this data in a separate thread responsible for callbacks. Thus, the stream in which the reading has begun is not monopolized. We return almost instantly after the call .read(..), and when, finally, we have the data at our disposal, the callback is made - in a different thread. If there is a delay between calls.read()You can move on to other matters by doing them in our stream. The duration of an asynchronous read operation, from the first byte to the last, is at best no more than that of a synchronous read operation. Usually an asynchronous operation is unimportantly longer. However, by going to such additional difficulties, we can more effectively handle our flows. Do more work, multiplex I / O in a pool with a finite number of threads.

I work for a cloud computing company. We would like you to get all the new application instances to solve problems with horizontal scaling! Of course, here I am a little crafty. Asynchronous I / O complicates the situation a bit, but I hope this example illustrates why reactive code is so useful: it allows you to handle more requests and do more work on existing hardware if performance is highly dependent on I / O. If the performance depends on the use of the processor (for example, talking about operations on Fibonacci numbers, mining bitcoins or cryptography), then reactive programming will not give us anything.

Currently, most of us do not use implementations ChannelorInputStreamwith everyday work! Problems have to reflect at the level of higher-level abstractions. It's about things like arrays or, rather, about hierarchy java.util.Collection. The collection is java.util.Collectionvery well displayed on the InputStream: both entities assume that you can operate with all the data at once, almost instantly. It is expected that you will be able to complete the reading from most InputStreamsearly, rather than later. Collection types become a bit awkward when moving to larger amounts of data. What to do if you are dealing with something potentially infinite (unlimited) - for example, with web sockets or server events? What if there is a delay between posts?

We need a better way to describe this kind of data. We are talking about asynchronous events, such that will occur in the end. It may seem that for such a purpose are well suited Future<T>or CompletableFuture<T>, but they describe just one thing that happens in the end. In fact, Java does not provide a suitable metaphor for describing this kind of data. Both Iteratortypes andStreamJava 8 may be unrelated, however, both are pull-oriented; you yourself are requesting the next entry, and not the type should send a callback to your code. It is assumed that, if push-based processing was supported in this case, allowing much more to be achieved at the thread level, the API would also provide threading and scheduling control. The implementations Iteratorsay nothing about threading, and all Java 8 threads share the same fork-join pool.

If IteratorandStreamdid support push processing, then we would face another problem that really escalates in the context of I / O: we will need some kind of backward penetration mechanism! Since the data consumer is processed asynchronously, we have no idea when the data will be in the pipeline and in what quantity. We do not know how much data will need to be processed on the next callback: one byte or one terabyte!

By pulling data from InputStream, you read as much information as you are willing to process, and no more. In the previous examples, we read the data into a buffer of a byte[]fixed and known length. In an asynchronous context, we need some way to tell the provider how much data we are ready to process.
Yes, sir. There is definitely something missing here.

Finding the missing metaphor

In this case, we are looking for a metaphor that would beautifully reflect the essence of asynchronous I / O, support such a mechanism of data transfer and allow controlling the flow of execution in distributed systems. In reactive programming, the ability of a client to signal what load he is able to handle is called “reverse flow”.

Now there are a number of good projects - Vert.x, Akka Streams and RxJava - supporting reactive programming. The Spring team is also leading a project called Reactor . Between these different standards there is a fairly wide general field, de facto allocated to the standard of the Reactive Streams initiative . In the Reactive Streams initiative, four types are defined:

Interface Publisher<T&gt; produces values ​​that may come eventually. Interface Publisher<T&gt; produces type values Tfor Subscriber<T>.

Example 3. Reactive threads: interfacePublisher<T> .

package org.reactivestreams;
publicinterfacePublisher<T> {
    voidsubscribe(Subscriber<? super Tgt; s);
}

The type Subscribersubscribes to Publisher<T>, receiving notifications of any new type values Tthrough its method onNext(T). If any errors occur, its method is onError(Throwable)called. When processing is completed normally, the onComplete subscriber method is called .

Example 4. Reactive threads: interface Subscriber<T>.

package org.reactivestreams;
publicinterfaceSubscriber<T> {
    publicvoidonSubscribe(Subscription s);
    publicvoidonNext(T t);
    publicvoidonError(Throwable t);
    publicvoidonComplete();
}

When Subscriberfirst connected to Publisher, it gets Subscriptionin the method Subscriber#onSubscribe. Subscription Subscriptionis perhaps the most important part of the entire specification; it is precisely it that provides the reverse flow. The subscriber Subscriberuses a method Subscription#requestto request additional data or a method Subscription#cancelto stop processing.

Example 5. Reactive threads: interfaceSubscription<T> .

package org.reactivestreams;
publicinterfaceSubscription{
    publicvoidrequest(long n);
    publicvoidcancel();
}

The specification of reactive streams provides another useful, albeit obvious type: Processor<A,B>it is just an interface that inherits both Subscriber<A>, and Publisher<B>.

Example 6. Reactive threads: interfaceProcessor<T> .

package org.reactivestreams;
publicinterfaceProcessor<T, R> extendsSubscriber&ltT>, Publisher<R> {
}

The specification is not positioned as a prescription for implementations; in fact, its goal is to define types to support interoperability. Obvious use types associated with jet streams - is that they still found a place in the release of Java 9, and semantically they are "one to one" appropriate interface of the class java.util.concurrent.Flow, eg .: java.util.concurrent.Flow.Publisher.

Meet: Reactor

Reactive stream types alone are not enough; higher order implementations are needed to support operations such as filtering and transformation. As such, the Reactor project is convenient; it builds on the Reactive Streams specification and provides two specializations Publisher<T>.

The first Flux<T>is to Publisherproduce zero or more values. The second Mono<T>isPublisher<T>producing zero or one value. Both of them publish values ​​and can handle them accordingly, however, their capabilities are much broader than the Reactive Streams specification. Both provide operators, allow processing of value streams. Reactor types are well packaged — the output of one of them can serve as input for the other, and if the type needs to work with other data streams, they rely on instances Publisher<T>.

How Mono<T>, and Flux<T>implement Publisher<T>; We recommend that your methods accept instances Publisher<T>, but return Flux<T>or Mono<T>; this will help the client to distinguish exactly which data he receives.

Suppose you were given Publisher<T>and asked to display a user interface for thisPublisher<T>. In this case, is it necessary to display a detail page for one record, so how can you get it CompletableFuture<T>? Or to display an overview page with a list or a grid, where all records are displayed page by page? It is hard to say.

In turn, Flux<T>and Mono<T>very specific. You know that you need to display an overview page, if received Flux<T>, and a detail page for one (or none) of the record when you receive it Mono<T>.

Reactor is an open source project launched by Pivotal; Now he has become very popular. Facebook uses it in its jet engine to call remote procedures , also used in Rsocket , under the direction of RxJava creator Ben Christensen. Salesforce uses it in hisgRPC reactive implementation . Reactor implements Reactive Streams types, so it can interact with other technologies that support these types, for example, with RxJava 2 from Netflix, Akka Streams from Lightbend and with the Vert.x project from the Eclipse Foundation. David Cairnok, head of RxJava 2, also actively collaborated with Pivotal in developing Reactor, making the project even better. Plus, of course, it is present in one form or another in the Spring Framework, starting with the Spring Framework 4.0.

Reactive programming with Spring WebFlux

For all its usefulness, Reactor is just the basis. Our applications must communicate with data sources. Must support authentication and authorization. Spring provides all this. If Reactor gives us the missing metaphor, then Spring helps us all speak a common language.

Spring Framework 5.0 was released in September 2017. It builds on Reactor and Reactive Streams specifications. It has a new reactive execution environment and a component model called Spring WebFlux .

Spring WebFlux does not depend on the Servlet API and does not require them to work. It comes with adapters that allow you to use it on top of the Servlet engine, if required, but this is not necessary. It also provides a completely new Netty-based runtime called Spring WebFlux. The Spring Framework 5, which works with Java 8 and Java EE 7 and higher, now serves as the basis for most of the Spring ecosystem, including Spring Data Kay, Spring Security 5, Spring Boot 2, and Spring Cloud Finchley.

Also popular now: