Reactive Stream Log Processing with RxJava - Part 1

image
Reactive log stream processing with RxJava - Part l


In a previous post, the author considered cases of using the ELK stack and collecting logs.
Taking into account the movement towards microservices and containerization of applications, centralized processing of logs and their storage is becoming the de facto standard .


Maybe we should try to take the next step and use the information received more actively in order to find the causes of a number of problems long before they occur. *


Footnote - streams and data streams in this translation are interchangeable words. Also, the word log can mean a log, although in most cases in the text we use a different meaning


If we considered the event log as a data stream of what is happening in real time on your system, it would be very interesting to analyze the data and all possible options for their use in real time , for example, detect fraudulent behavior by aggregating various information streams directly during the "attack" , and immediately block the attacker, instead of "traditionally" collecting the data log and investigating after the incident.


Or another example, we can filter ( filter ) only those events that correspond to a certain type of events, group them ( group by ) by a common key as userID and calculate the total number in the time window, getting the number of events of this type that the user makes in a certain period of time.


 failedLogStream()
     .window(5,TimeUnit.SECONDS)
     .flatMap(window ->
                window
                .groupBy(propertyStringValue("remoteIP"))
                .flatMap(grouped -> grouped
                    .count()
                    .map( failedLoginsCount -> {
                          final String remoteIp = grouped.getKey();
                          returnnew Pair<>(remoteIp, failedLoginsCount);
                    }))
     )
     .filter(pair -> pair.get > 10)
     .forEach(System.out::println);           

We can initiate requests in other systems and work with their responses as data streams, which we can subscribe to and use several familiar operators to work with streams (data streams), which are presented in the reactive streams frameworks .


Learning a new development paradigm


It would be nice to understand what reactive programming of streams is , for this we do not need to deploy something big, such as Kafka Streams Spark or Flink .


Reactive programming is a non-blocking event - driven application that scales even with a small number of threads with counter load (feedback mechanism in which the amount of data from manufacturers does not exceed the amount of data received by consumers).


The biggest topic Spring5 will bring is the support for Jet Programming . The new spring-web-reactive module is a framework similar to spring-web-mvc , which allows sending asynchronous (non-blocking) responses for REST services and a reactive web client, which implies the possibility of using this solution for microservice architecture. The concept of reactive streams is not specific to Spring, since there is a general specification reactive-streams-jvm , agreed upon by most reactive frameworks (for the time being, maybe there is no identical name for it, but the concept should be simple enough to become a replacement for frameworks).


Historically, the jet stream model was introduced by Rx.NET, and then ported to java using Netflix, called RxJava. At the same time, the concept was also successfully implemented in other languages, called Reactive EXtensions . Since then, companies have moved in the same direction as the specification of jet streams. Now RxJava , since it was a pioneer, needs significant refactoring (rewriting the code) - accordingly, version 2.x is better suited to the specification, and while Spring reactor is still new, it will not be difficult for the company to rewrite the implementation according to the specification. We recommend reading more about how they are related.


Doug Lea said that he wants to include jet streams in the java.util.concurrent.Flow object, which means that jet streams will be delivered as part of Java 9 .


Performance Benefits


Also, another buzzword is now microservice architecture with the mandatory ability to make requests for many different services. Ideally, it is best to perform non-blocking requests without waiting for the complete response to complete the next request. Think, instead of waiting for the moment when a service returns a large list of results, it may be worthwhile to send a new request to another system when receiving the first fragment.


Never block


If we consider the response from the remote request as a Stream (Stream data stream), a subscription to which launches an action (action) when a response is received, then instead of blocking the stream waiting for its response, we can use a smaller number of streams in general, which, in turn, , reduce the cost of resources (for example, processor time to switch context between threads and memory for each stack of threads).


Thus, the use of reactive programming will allow us to process more event logs on a standard hardware than usual.


Example: A service such as Gmail needs to display user emails. However, emails, in turn, can have many people in copy (CC). It would be fun to display a photo for those users who are in your contacts, which means calling REST - ContactService.


It turns out like this:


Future<List<Mail>> emailsFuture = mailstoreService.getUnreadEmails();  
List<Mail> emails = emailsFuture.get(); //блокировка текущего потока  //возможно долгое ожидание, пока не будет получен полный список данных//можно ли запустить следующий процесс, как только первый фрагмент будет получен?
Future<List<Contacts>> contacts = getContactsForEmails(emails);  
for(Mail mail : emails) {  
  streamRenderEmails(mail, contacts); //push(отправить) emails клиенту
}

Part of the problem was solved with the advent of reactive programming support in Java 8 with CompletableFuture (with its thenCompose, thenCombine, thenAccept and 50 more methods, although this does not negate the fact that you need to remember everything that they do, but this does not help in reading code).


CompletableFuture<List<Mail>> emailsFuture = mailstoreService.getUnreadEmails();
CompletableFuture<List<Contact>> emailsFuture  
  .thenCompose(emails -> getContactsForEmails(emails)) //нам все еще нужно ожидать List<Mail> 
  .thenAccept(emailsContactsPair -> streamRenderEmails(emailsContactsPair.getKey(), emailsContactsPair.getValue()))

We can switch to Iterator instead of List, and at the same time there are no methods telling to perform any action when new values ​​appear. In SQL there is such a possibility, for example, ResultSet (in which you can execute rs.next ()) instead of loading all the data into memory.


publicinterfaceIterator<E> {  
    /**
     * Возвращает {@code true}, если в итерации больше элементов.
     */booleanhasNext();
    /**
     * Возвращает следующий элемент итерации.
     */E next();
}

But we still need to constantly ask, "Do you have a new meaning?"


Iterable<Mail> emails = mailstoreService.getUnreadEmails();  
Iterator<Mail> emailsIt = emails.iterator();
while(emailsIt.hasNext()) {  
  Mail mail = emailsIt.next(); //неблокирующее действие все равно тратит много процессорного времени на получение новых значенийif(mail != null) {
      ....
  }
}

What we need is a reactive iterator, a type of data that can subscribe and perform an action as soon as a new value is received. This is where reactive stream programming begins.


So what is a Stream?


Everything is a stream


Stream is simply a sequence of time-ordered events ( event X occurs after event Y, so events do not compete with each other ).


Stream is modeled to produce 0..N events and one of two terminal operations :


  • completion event through which subscribers are informed that the data release is over
  • error event informing about the end of a stream with an error (exception)

We can describe this visually with the help of ' marble diagrams '.


Marble diagram for observable


Thus, we can imagine that the Stream is everything, not just the event log. Even a single value can be expressed as a Stream releasing a value followed by an event of completion.


Infinite stream - a stream that issues events, but without a single terminal event (completion | error).


RxJava defines an Observable data type for modeling Stream events of type. In Spring Reactor, it is equal to the Flux type .

Observable is a stream of temperatures taken at various intervals.

Observable is a stream of products purchased from our web store.

Observable represents one user (User), who returned upon request to the database.

public Observable<User> findByUserId(String userId) {...}
    //пусть будет Single для большей наглядности public Single<User> findByUserId(String userId) {...}

But Observable is just a data type, therefore, as with the Publish / Subscriber design pattern, we need a Subscriber (Subscriber) to handle 3 types of events

        Observable<CartItem> cartItemsStream = ...;
        Subscriber<CartItem> subscriber = new Subscriber<CartItem>() {
            @OverridepublicvoidonNext(CartItem cartItem){
                System.out.println("Cart Item added " + cartItem);
            }
            @OverridepublicvoidonCompleted(){
            }
            @OverridepublicvoidonError(Throwable e){
                e.printStackTrace();
            }
        };
        cartItemsStream.subscribe(subscriber);

Reactive Operators


But this is just part of Stream-a, and so far we have not used anything unusual, only the classic Observer design pattern.


The Reactive part means that we can define some Function (operators - functions) that will be executed when stream fires an event.


This means that another stream ( immutable streams ) will be created , to which we can subscribe another operator, etc.


Observable<CartItem> filteredCartStream = cartStream.filter(new Func1<CartItem, Boolean>() {  
            @OverridepublicBoolean call(CartItem cartItem) {
                return cartItem.isLaptop();
            }
        });
Observable<Long> laptopCartItemsPriceStream = filteredCartStream.map(new Func1<CartItem, Long>() {  
            @OverridepublicLong call(CartItem cartItem) {
                try {
                    return priceService.getPrice(cartItem.getId());
                } catch(PriceServiceException e) {
                    thrown new RuntimeException(e);
                }
            }
        });

Since the operators (methods) of the Observable class (filter, map, groupBy, ...) return Observable, this means that we can use a chain of operators to combine them with lambda syntax and write something beautiful.


Observable<BigDecimal> priceStream = cartStream  
                        .filter((cartItem) -> cartItem.isLaptop()).
                        .map((laptop) -> {
                             try {
                                  return priceService.getPrice(cartItem.getId());
                            } catch(PriceServiceException e) {
                                 thrown new RuntimeException(e);
                            }
                        });

Please note that above, when created priceStream, nothing happens - priceService.getPrice()it is not called until there is an element passing through the chain of statements. This means that we created a semblance of a plan through the rx operator, how managed data will go down the chain (signing is recorded).


When asked to explain reactive programming, they usually jokingly give an example with Excel sheets, where the formulas called up when the cell is updated are written in columns, which, in turn, updates another cell, which, in turn, updates another and so on in the chain.


Just like an rx-operator that does nothing, these formulas simply control the data and each of them gets its chance to do something until the data has gone down the chain.


To better understand how events travel together with a chain of operators, I found a useful analogy, in the example of moving from one house to another, movers act as operators, along with which things from your house move - as Thomas Nild put it.


His sample code is:


Observable<Item> mover1 = Observable.create(s -> {  
   while (house.hasItems()) {
    s.onNext(house.getItem());
   }
   s.onCompleted();
});
Observable<Item> mover2 = mover1.map(item -> putInBox(item));
Subscription mover3 = mover2.subscribe(box -> putInTruck(box),  
   () -> closeTruck()); //это сработает на событие OnCompleted()


"Loader 1 on one side of the source Observable. He creates emissions by taking things out of the house. He calls Loader 2 with the method onNext()that performs the map()operation. When his method is onNext()called, he takes the thing and transfers it to the box. Then he calls Loader 3 , the final Subscriber(subscriber), with a method onNext()that loads the box into the car. "


The magic of RxJava is a large set of available operators, your job is to combine them all together to control the flow of data.



Many Stream operators help compose a glossary of terms to indicate actions performed with streams that can be implemented in popular languages ​​(RxJava, RxJS, Rx.NET, etc) from among the ReactiveX framework (Reactive Extensions).


You need to know these concepts even when using various frameworks for working with reactive streams, such as Spring Reactor (in the hope of having some operators common to these frameworks).


So far, we have seen only simple operators, such as filtering:


** Filter **


Which only skip the elements that fall under the filter condition (one loader will transfer only those things that cost less than $ 100, instead of immediately transferring everything to another loader)


However, there are operators that can split a stream into many separate streams - Observable<Observable<T>>(Stream streams) - these are operators such asgroupBy


> ** group by **


        Observable<Integer> values = Observable.just(1,4,5,7,8,9,10);
        Observable<GroupedObservable<String, Integer>> oddEvenStream = values.groupBy((number) -> number % 2 == 0 ? "odd":"even");
        Observable<Integer> remergedStream = Observable.concat(oddEvenStream);
        remergedStream.subscribe(number -> System.out.print(number +" "));

//Выводит//1 5 7 9 4 8 10 

and a fairly simple operator concat, which again from even and odd streams creates a single stream, and establishes a subscription to it.
> Concat **


We see that the operator concatexpects the stream to complete before adding another stream, again creating one stream. Thus, the odd numbers are displayed first.


We also have the opportunity to combine many streams together, as, for example, this is done by the zipoperator
> ** Zip operator **


Zip This is not named because it works as an archiver, but rather because it, like a zipper (on a jacket), combines events from two streams.


> Zipper (latch)


It takes one event from one stream and connects it to an event from another (making a pair). Once this is done, he applies the gluing operator before moving down the chain.


PS: this also works for more streams.


So, even if one stream issues events faster, then the listener will see only a combined event that will be released from the slower stream.


Having the ability to “wait” for a response from the many remote calls we receive from streams is actually very useful.


On the other hand, the operator combineLatestdoes not expect the release of a pair of events, but instead, uses the latest issued events from a slower stream before applying the gluing function and transferring it further down the chain.


> Combine latest


We are moving towards thinking based on the Push approach


Let's look at a few examples of how Observables is actually created . The longest creation option:


        log("Before create Observable");
        Observable<Integer> someIntStream = Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    publicvoid subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        log("Create");
                        emitter.onNext(3);
                        emitter.onNext(4);
                        emitter.onNext(5);
                        emitter.onComplete();
                        log("Completed");
                    }
                });
        log("After create Observable");
        log("Subscribing 1st");
        someIntStream.subscribe((val) -> LOGGER.info("received " + val));
        //мы можем опустить реализацию
        //другие методы(for onError and onComplete) если мы ее хотим сделать, что-то особенное
        log("Subscribing 2nd");
        someIntStream.subscribe((val) -> LOGGER.info("received " + val));

Events are sent to the subscriber as soon as he subscribed .
Not that we use this design, we just passed a new object ObservableOnSubscribethat demonstrates what to do when someone subscribes to it.


Until we subscribe to Observable, there is no output and nothing happens, the data does not move.


When someone signs up, the method is called call()and 3 messages are pushed down the chain, followed by a signal that the stream has ended.


We signed up twice, the code inside the method call(...)will also be called twice. So it effectively forwards the same values ​​as soon as someone else subscribes and then receives the following values ​​for output:


mainThread: Beforecreate Observable  
mainThread: Aftercreate Observable  
mainThread: Subscribing 1st  
mainThread: Create  
mainThread: received 3  
mainThread: received 4  
mainThread: received 5  
mainThread: Completed  
mainThread: Subscribing 2nd  
mainThread: Create  
mainThread: received 3  
mainThread: received 4  
mainThread: received 5  
mainThread: Completed  

It is important to note that rx operators do not necessarily mean multithreading. RxJava does not implement default competition between Observable and Subscriber . Therefore, all calls occur on the " main " thread.


The type Observablethat begin to spread when someone is signed up is called cold observables(cold watchers). Another view is hot observables(hot watchers), they can issue events even when no one is following them.


  • Cold Observablesonly begin to spread events when someone signs up. Each subscriber receives the same events. For example, as a CD on which they play the same songs for someone who turned on the cd in the player to listen.
  • Hot Observablesevents spread even when no one has subscribed to them yet. Like a radio station that plays songs through broadcasting, even when no one turned it on . And just like when you turn on the radio, you skip past events. Hot observable models events whose distribution you cannot control. Like in the case of writing events to the log (event log).

Subjectsthis is such a special kind Observable, which is also Observer(as Subscriber- which decides that it can push data (by calling onNext()) to it) and make the implementation of hotter Observableseasier. There are also many implementations, such ReplaySubjectas those that save selected events in the buffer and play them by subscription (of course, you can specify the size of the buffer to prevent an error OutOfMemory), while PublishSubjectonly skipping events that happened after signing.
And of course, there are many static methods for creating Observablesfrom other sources.


Observable.just("This", "is", "something")  
Observable.from(Iterable<T> collection)  
Observable.from(Future<T> future) - передает значение после того, как `future` выполнится 

Adding to our ELK stack RabbitMQ emitter of data sent by push


By tradition, when working with the ELK stack, we use ElasticSearch to request data from the event log, so we can say that they are in the pull-based polling style.


Can we instead have push-based, where we are going to inform 'immediately' when an event appears in the journal, in order to further reduce the response time to the event, from the moment it happened to before we start to process it.


One of the many possible solutions can be RabbitMq , as an experienced solution in battles with a very good reputation for its performance, for its ability to process a huge number of messages. Despite this, Logstash already supports the RabbitMQ plugin (there is also another plugin for FluentD) so that we can easily integrate it into our existing ELK stack and write logs to ElasticSearch and RabbitMQ.


Perhaps you remember that Logstash can behave like a controller, and choose how it works, and where to send / save logged events. This means that we can filter out the events that we want to process or indicate where to send them, for example, to other RabbitMQ queues.


There is even the option of directly sending data to RabbitMQ through the Logback Appender if you want to omit the use of Logstash .


By the way: So-called AmqpAppender, so far , is rather a specific implementation of RabbitMQ AMQP (with the protocol version AMQP 0-9-1, 0-9).


For example, ActiveMQ (while also supporting the AMQP connector) seems to implement the protocol version AMQP 1.0, while the spring-amqp library has protocol versions 0-9-1, 0-9, which are completely different from 1.0), so you may encounter errors by type 'org.apache.activemq.transport.amqp.AmqpProtocolException: Connection from client using unsupported AMQP attempted'


However, our solution was to use logstash-logback-encoder and send formatted JSON with the event log to Logstash . We will redirect logstash output to the exchange point RabbitMQ (exchange).


We will use docker-compose to start the logstash-rabbitmq cluster .
You can clone the repository


docker-compose -f docker-compose-rabbitmq.yml up
and then you can use
./event-generate.sh
to generate a number of random events that will be sent to logstash .


In order to determine where to send the data, use the file settings logstash . We use rabbitmq-output-plugin as a link:


output {  
    rabbitmq {
        exchange => logstash
        exchange_type => direct
        host => rabbitmq
        key => my_app
    }
}

RabbitMQ is not a classic JMS server; instead, it uses the AMQP protocol, which has a very different concept for queues.


amqp


The publisher sends messages to the named exchange point (exchange) and the consumer picks up messages from the queue.


A message has a standard 'routing-key' header, which is used in a process called associative binding to bind queued messages. Queues can filter what messages they receive through the binding key, and you can also use substituted characters in the binding like these ' logstash ' . ''


For a more detailed explanation, AMQPyou can read here and here . So we set up Springconnection cRabbitMq


@BeanConnectionFactory connectionFactory(){
        returnnew CachingConnectionFactory(host, port);
    }
    @BeanRabbitAdmin rabbitAdmin(){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
        rabbitAdmin.declareQueue(queue());
        rabbitAdmin.declareBinding(bindQueueFromExchange(queue(), exchange()));
        return rabbitAdmin;
    }
    @BeanSimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }
    @BeanQueue queue(){
        returnnew Queue(queueName, false);
    }
    DirectExchange exchange(){
        returnnew DirectExchange("logstash");
    }
    private Binding bindQueueFromExchange(Queue queue, DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("my_app");
    }
    @BeanMessageListenerAdapter listenerAdapter(Receiver receiver){
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver,
                new MessageConverter() {
            public Message toMessage(Object o, MessageProperties messageProperties)throws MessageConversionException {
                thrownew RuntimeException("Unsupported");
            }
            public String fromMessage(Message message)throws MessageConversionException {
                try {
                    returnnew String(message.getBody(), "UTF-8");
                } catch (UnsupportedEncodingException e) {
                    thrownew RuntimeException("UnsupportedEncodingException");
                }
            }
        });
        messageListenerAdapter.setDefaultListenerMethod("receive"); //the method in our Receiver classreturn messageListenerAdapter;
    }
    @BeanReceiver receiver(){
        returnnew Receiver();
    }

We have defined the queue and associated with the exchange service 'logstash' to receive messages with the routing key 'my_app'. The MessageListenerAdapter above defines that the 'receive' method should be called on the bean Receiverevery time a new message arrives from the queue.


Since we expect a continuous stream to the event log, we do not have control over it, we can think about the uses hot observablethat distribute events to all subscribers after they subscribe, so we use PublishSubject to work .


publicclassReceiver {  
    private PublishSubject<JsonObject> publishSubject = PublishSubject.create();
    publicReceiver() {
    }
    /**
     * Method invoked by Spring whenever a new message arrives
     * @param message amqp message
     */publicvoidreceive(Object message) {
        log.info("Received remote message {}", message);
        JsonElement remoteJsonElement = gson.fromJson ((String) message, JsonElement.class);
        JsonObject jsonObj = remoteJsonElement.getAsJsonObject();
        publishSubject.onNext(jsonObj);
    }
    public PublishSubject<JsonObject> getPublishSubject() {
        return publishSubject;
    }
}

We should be aware that the SimpleMessageListenerContainer event supports the presence of more than one thread that consumes from the queue (and skips events down the chain). However Observable contract says that we can not release competitive events (calls onNext, onComplete, the onError should be serialized)


// ТАК ДЕЛАТЬ НЕ НАДО
Observable.create(s -> {  
                    // Thread A
                    new Thread(() -> {
                        s.onNext("one");
                        s.onNext("two");
                    }).start();
                    // Thread B
                    new Thread(() -> {
                        s.onNext("three");
                        s.onNext("four");
                    }).start();
                });
// ТАК ДЕЛАТЬ НЕ НАДО//ДЕЛАЙТЕ ТАК
Observable<String> obs1 = Observable.create(s -> {  
                    // Thread A
                    new Thread(() -> {
                        s.onNext("one");
                        s.onNext("two");
                    }).start();
                  });
Observable<String> obs2 = Observable.create(s -> {  
                    // Thread B
                    new Thread(() -> {
                        s.onNext("three");
                        s.onNext("four");
                    }).start();
                });
Observable<String> c = Observable.merge(obs1, obs2); 

We can get around this problem by causing Observable.serialize()or Subject.toSerialized(), but we are left with a default value of 1 Threadin ListenerContainer, there is no need to do this. You should also be aware of what if you plan to use Subjectsevents that span multiple threads as a bus. Read the detailed explanation .


Now you can take a look at the code and the repository as a continuation of this long Part II post (Part 2) or go to the Rx Playground there you will find more examples.
Link to the translator's site


Also popular now: