Reactive programming with JAX-RS

Original author: Mert Çaliskan
  • Transfer
Hello!

The latest Java Enterprise Developer course this year has been successfully launched and we have the latest material on this topic that we want to share with you about using asynchronous approach and styling to develop responsive reactive applications.

Go.

Reactive programming first sounds like the name of the emerging paradigm, but in fact, refers to a programming method that uses an event-oriented approach to work with asynchronous data streams. Based on constantly running data, reactive systems respond to them by performing a series of events.
Reactive programming follows the “Observer” design pattern, which can be defined as follows: if a state change occurs in one object, all other objects are notified and updated accordingly. Therefore, instead of polling events for changes, events are pushed asynchronously so that observers can process them. In this example, observers are functions that are executed when an event is sent. And the mentioned data stream is the actual observable.

Almost all languages ​​and frameworks use this approach in their ecosystem, and the latest versions of Java are no exception. In this article, I will explain how you can apply reactive programming using the latest version of JAX-RS in Java EE 8 and Java 8 functionality.



Reactive Manifesto

The Reactive Manifesto lists four fundamental aspects that an application needs to be more flexible, loosely coupled and simple to scale, and therefore also capable of being reactive. It says that the application must be responsive, flexible (and therefore scalable), robust and message-driven.

The underlying goal is a truly responsive application. Suppose there is an application in which one large stream deals with the processing of user requests, and after performing the work, this stream sends the answers back to the original requesters. When an application receives more requests than it can handle, this thread becomes a bottleneck, and the application loses its former responsiveness. To maintain responsiveness, the application must be scalable and sustainable. Sustainable can be considered an application in which there is functionality for auto-recovery. According to the experience of most developers, only the message-driven architecture allows the application to be scalable, stable and responsive.

Reactive programming was introduced in Java 8 and Java EE 8. The Java language introduced concepts such asCompletionStage, and its implementation CompletableFuture, and Java began to use these functions in specifications such as the Reactive Client API in JAX-RS.

JAX-RS 2.1 Reactive Client API

Let's see how reactive programming can be used in Java EE 8 applications. To understand the process, you need a basic knowledge of the Java EE API.

JAX-RS 2.1 introduced a new way to create a REST client with support for reactive programming. The default invoker implementation offered by JAX-RS is synchronous, which means that the client being created will send a blocking call to the server’s endpoint. An example implementation is presented in Listing 1.

Listing 1

Response response =
        ClientBuilder.newClient()
            .target("http://localhost:8080/service-url")
            .request()
            .get();

Starting with version 2.0, JAX-RS provides support for creating an asynchronous invoker on the client API using a simple method call async(), as shown in Listing 2.

Listing 2

Future<Response> response =
        ClientBuilder.newClient()
            .target("http://localhost:8080/service-url")
            .request()
            .async()
            .get();

Using asynchronous invoker on the client returns an instance Future with a type javax.ws.rs.core.Response. This may lead to polling a response, with a call future.get(), or registering a callback, which will be called when there is an available HTTP response. Both implementations are suitable for asynchronous programming, but are usually complicated if you want to group callbacks or add conditional cases to these asynchronous execution minima.

JAX-RS 2.1 provides a reactive way to overcome these problems with the new JAX-RS Reactive Client API for building a client. This is as simple as calling a rx()method during client assembly. In Listing 3, the rx()method returns the reactive invoker, which exists during the execution of the client, and the client returns a response with the typeCompletionStage.rx(), which allows the transition from synchronous invoker to asynchronous with a simple call.

Listing 3

CompletionStage<Response> response =
        ClientBuilder.newClient()
            .target("http://localhost:8080/service-url")
            .request()
            .rx()
            .get();

CompletionStage<Т>- A new interface introduced in Java 8. It represents a calculation, which can be a step in a larger calculation, as the name suggests. This is the only representative of Java 8 reactivity that got into JAX-RS.
After receiving the response instance, I can call AcceptAsync()where I can provide a snippet of code that will be executed asynchronously when the answer becomes available, as shown in Listing 4.

Listing 4

response.thenAcceptAsync(res -> {
    Temperature t = res.readEntity(Temperature.class);
    //do stuff with t
});

Adding reactivity to the endpoint REST The

reactive approach is not limited to the client side of JAX-RS; it can also be used on the server side. For example, first I’ll create a simple script where I can query the list of locations for one destination. For each position, I will make a separate call with the location data to another point to get the temperature values. The interaction of the destination points will be as shown in Figure 1.


Figure 1. Interaction between the destination points

First, I simply define the model of the definition area and then the services for each model. Listing 5 shows how a class is defined Forecastthat wraps the classes Location and Temperature.

Listing 5

publicclassTemperature{
    private Double temperature;
    private String scale;
    // getters & setters
}
publicclassLocation{
    String name;
    publicLocation(){}
    publicLocation(String name){
        this.name = name;
    }
    // getters & setters
}
publicclassForecast{
    private Location location;
    private Temperature temperature;
    publicForecast(Location location){
        this.location = location;
    }
    public Forecast setTemperature(
            final Temperature temperature){
        this.temperature = temperature;
        returnthis;
    }
    // getters
}

For wrapping a list of predictions, the class is ServiceResponse implemented in Listing 6.

Listing 6

publicclassServiceResponse{
    privatelong processingTime;
    private List<Forecast> forecasts = new ArrayList<>();
    publicvoidsetProcessingTime(long processingTime){
        this.processingTime = processingTime;
    }
    public ServiceResponse forecasts(List<Forecast> forecasts){
        this.forecasts = forecasts;
        returnthis;
    }
    // getters
}	

LocationResource, shown in Listing 7, defines three sample locations returned from a path /location.

Listing 7

@Path("/location")
publicclassLocationResource{
    @GET@Produces(MediaType.APPLICATION_JSON)
    public Response getLocations(){
        List<Location> locations = new ArrayList<>();
        locations.add(new Location("London"));
        locations.add(new Location("Istanbul"));
        locations.add(new Location("Prague"));
        return Response.ok(new GenericEntity<List<Location>>(locations){}).build();
    }
}

TemperatureResource, shown in Listing 8, returns a randomly generated temperature value between 30 and 50 for a given location. A 500 ms delay is added to the implementation to simulate the sensor readout.

Listing 8

@Path("/temperature")
publicclassTemperatureResource{
    @GET@Path("/{city}")
    @Produces(MediaType.APPLICATION_JSON)
    public Response getAverageTemperature(@PathParam("city") String cityName) {
        Temperature temperature = new Temperature();
        temperature.setTemperature((double) (new Random().nextInt(20) + 30));
        temperature.setScale("Celsius");
        try {
            Thread.sleep(500);
        } catch (InterruptedException ignored) {
            ignored.printStackTrace();
        }
        return Response.ok(temperature).build();
    }
}

First I show the synchronous implementation ForecastResource (see Listing 9), which gives all the locations. Then, for each position, it calls temperature service to get the values ​​in degrees Celsius.

Listing 9

@Path("/forecast")
publicclassForecastResource{
    @Uri("location")
    private WebTarget locationTarget;
    @Uri("temperature/{city}")
    private WebTarget temperatureTarget;
    @GET@Produces(MediaType.APPLICATION_JSON)
    public Response getLocationsWithTemperature(){
        long startTime = System.currentTimeMillis();
        ServiceResponse response = new ServiceResponse();
        List<Location> locations = locationTarget
                .request()
                .get(new GenericType<List<Location>>(){});
        locations.forEach(location -> {
            Temperature temperature = temperatureTarget
                .resolveTemplate("city", location.getName())
                .request()
                .get(Temperature.class);
            response.getForecasts().add(
                    new Forecast(location).setTemperature(temperature));
        });
        long endTime = System.currentTimeMillis();
        response.setProcessingTime(endTime - startTime);
        return Response.ok(response).build();
    }
}

When the forecast destination point is queried as /forecast, you will receive a conclusion similar to that shown in Listing 10. Note that the request processing time took 1.533 ms, which is logical, since a synchronous request for temperature values ​​from three different locations adds up to 1.5 ms.

Listing 10

{
  "forecasts": [
    {
      "location": {
        "name": "London"
      },
      "temperature": {
        "scale": "Celsius",
        "temperature": 33
      }
    },
    {
      "location": {
        "name": "Istanbul"
      },
      "temperature": {
        "scale": "Celsius",
        "temperature": 38
      }
    },
    {
      "location": {
        "name": "Prague"
      },
      "temperature": {
        "scale": "Celsius",
        "temperature": 46
      }
    }
  ],
  "processingTime": 1533
}

As long as everything goes according to plan. It is time to introduce reactive programming on the server side, where calls to each location can be executed in parallel after receiving all the locations. This clearly can improve the synchronous flow shown earlier. This is done in Listing 11, where the definition of the reactive version of the prediction service is shown.

Listing 11

@Path("/reactiveForecast")
publicclassForecastReactiveResource{
    @Uri("location")
    private WebTarget locationTarget;
    @Uri("temperature/{city}")
    private WebTarget temperatureTarget;
    @GET@Produces(MediaType.APPLICATION_JSON)
    publicvoidgetLocationsWithTemperature(@Suspended final AsyncResponse async){
        long startTime = System.currentTimeMillis();
        // Создать этап (stage) для извлечения местоположений
        CompletionStage<List<Location>> locationCS =
            locationTarget.request()
                .rx()
                .get(new GenericType<List<Location>>() {});
        // Создав отдельный этап на этапе местоположений,// описанном выше, собрать список прогнозов,//как в одном большом CompletionStagefinal CompletionStage<List<Forecast>> forecastCS =
            locationCS.thenCompose(locations -> {
                // Создать этап для получения прогнозов// как списка СompletionStage
                List<CompletionStage<Forecast>> forecastList =
                // Стрим местоположений и обработка каждого// из них по отдельности
                    locations.stream().map(location -> {
                        // Создать этап для получения// значений температуры только одного города// по его названиюfinal CompletionStage<Temperature> tempCS =
                            temperatureTarget
                                .resolveTemplate("city", location.getName())
                                .request()
                                .rx()
                                .get(Temperature.class);
                                // Затем создать CompletableFuture, в котором// содержится инстанс прогноза// с местоположением и температурным значениемreturn CompletableFuture.completedFuture(
                                        new Forecast(location))
                                            .thenCombine(tempCS,
                                                Forecast::setTemperature);
                            }).collect(Collectors.toList());
                    // Вернуть финальный инстанс CompletableFuture,// где все представленные объекты completable future// завершеныreturn CompletableFuture.allOf(
                            forecastList.toArray(
                                    new CompletableFuture[forecastList.size()]))
                            .thenApply(v -> forecastList.stream()
                                    .map(CompletionStage::toCompletableFuture)
                                    .map(CompletableFuture::join)
                                    .collect(Collectors.toList()));
            });
        // Создать инстанс ServiceResponse,// в котором содержится полный список прогнозов// вместе со временем обработки.// Создать его future и объединить с// forecastCS, чтобы получить прогнозы// и вставить в ответ сервиса
        CompletableFuture.completedFuture(
            new ServiceResponse())
                .thenCombine(forecastCS,
                        ServiceResponse::forecasts)
                .whenCompleteAsync((response, throwable) -> {
                    response.setProcessingTime(
                            System.currentTimeMillis() - startTime);
                    async.resume(response);
                });
    }
}

A reactive implementation may seem complicated at first glance, but after a closer look, you will notice that it is quite simple. In the implementation, ForecastReactiveResourceI first create a client call to location services using the JAX-RS Reactive Client API. As I mentioned above, this is an add-on for Java EE 8, and it helps to create a reactive call simply by using a method rx().

Now I am creating a new location-based milestone to compile a list of predictions. They will be stored as a list of predictions in one large completion stage, called forecastCS. Ultimately, I will create a service call response using only forecastCS.

And now, let's collect the predictions in the form of a list of the completion stages defined in the variableforecastList. To create a completion stage for each forecast, I transmit the data by location, and then create a variable tempCS, again using the JAX-RS Reactive Client API, which calls the temperature service with the name of the city. Here I use the method to build the client resolveTemplate(), and this allows me to pass the name of the city to the collector as a parameter.

As the last step of streaming, I make a call CompletableFuture.completedFuture(), passing the new instance Forecast as a parameter. I combine this future with a tempCSstage so that I have a temperature value for the iterated locations.

The method CompletableFuture.allOf()in Listing 11 converts the completion stage list toforecastCS. Performing this step returns a large instance of a completable future when all the objects provided are completable future.

The service response is an instance of the class ServiceResponse, so I create a completed future, and then combine the forecastCScompletion stage with the list of predictions and calculate the response time of the service.

Of course, reactive programming forces only the server side to run asynchronously; the client side will be blocked until the server sends the response back to the requester. To overcome this problem, Server Sent Events (SSEs) can be used to partially send a response as soon as it is available, so that the temperature values ​​for each location are transmitted to the client one by one. ConclusionForecastReactiveResource will be similar to that presented in Listing 12. As shown in the output, the processing time is 515 ms, which is the ideal execution time for obtaining temperature values ​​from one location.

Listing 12

{
  "forecasts": [
    {
      "location": {
        "name": "London"
      },
      "temperature": {
        "scale": "Celsius",
        "temperature": 49
      }
    },
    {
      "location": {
        "name": "Istanbul"
      },
      "temperature": {
        "scale": "Celsius",
        "temperature": 32
      }
    },
    {
      "location": {
        "name": "Prague"
      },
      "temperature": {
        "scale": "Celsius",
        "temperature": 45
      }
    }
  ],
  "processingTime": 515
}

Conclusion

In the examples of this article, I first showed the synchronous method of obtaining forecasts using location and temperature services. Then, I switched to the reactive approach so that asynchronous processing was performed between service calls. When you use the JAX-RS Reactive Client API in Java EE 8 along with the classes CompletionStage and CompletableFutureavailable in Java 8, the power of asynchronous processing is pulled free, thanks to reactive programming.

Reactive programming is more than just an implementation of an asynchronous model from a synchronous one; it also simplifies working with concepts such as the nesting stage. The more it is used, the easier it will be to manage complex scenarios in parallel programming.

THE END

Thanks for attention. As always, we are waiting for your comments and questions.

Also popular now: