Reactive programming principles using a simple RSS aggregator using ReactiveX for Python


In recent years, reactive programming in general, and ReactiveX technology in particular, has become increasingly popular among developers. Some are already actively using all the advantages of this approach, while others have only “heard something”. For my part, I will try to help you imagine how some of the concepts of reactive programming are able to change the view of seemingly familiar things.

There are two fundamentally different ways of organizing large systems: in accordance with the objects and states that live in the system, and in accordance with the data flows that pass through it. The reactive programming paradigm implies the ease of expression of data streams, as well as the propagation of changes through these streams. For example, in imperative programming, the assignment operation means the finiteness of the result, while in reactive programming it will be recalculated upon receipt of new input data. The stream of values ​​undergoes a series of transformations in the system that are necessary to solve a specific problem. Operating with threads allows the system to be extensible and asynchronous, and the correct response to errors that occur is fault tolerant.

ReactiveX is a library that allows you to create asynchronous and event-oriented programs using observable sequences. It extends the Observer template to support data sequences, adds operators for declaratively connecting them, eliminating the need to take care of synchronization and thread safety, shared data structures and non-blocking I / O.

One of the main differences between the ReactiveX library and functional reactive programming is that it operates not with continuously changing, but with discrete values ​​that are "emitted" for a long time.

It is worth a little talk about what is Observer, Observable, Subject. The Observable model is a data source and allows you to handle streams of asynchronous events in a similar way to the one you use for data collections, such as arrays. And all this instead of callbacks, which means that the code is more readable and less error prone.

In ReactiveX, an Observer subscribes to an Observable and subsequently responds to an element or sequence of elements that it sends. Each Observer subscribed to Observable calls the Observer.on_next () method on each element of the data stream, after which both Observer.on_complete () and Observer.on_error () can be called. Often Observable is applied in such a way that it does not begin to give data until someone subscribes to it. These are the so-called “lazy calculations” - values ​​are calculated only when there is a need for them.

observer

There are tasks for which you need to connect Observer and Observable in order to receive messages about events and report them to your subscribers. For this, there is Subject, which, in addition to the standard, has several more implementations:

  • ReplaySubject has the ability to cache all the data received in it, and when a new subscriber appears, give this entire sequence first, working further in normal mode.

  • BehaviorSubject stores the last value, similar to ReplaySubject giving it to the appeared subscriber. Upon creation, it receives the default value that each new subscriber will receive if the last value has not yet been received.

  • AsyncSubject also stores the last value, but does not return data until the entire sequence is complete.

Observable and Observer are just the beginning of ReactiveX. They do not carry in themselves all the power that operators are that allow you to transform, combine, manipulate sequences of elements that give Observable.

In the ReactiveX documentation, operator descriptions include the use of the Marble Diagram. For example, here is how these diagrams represent Observable and their transformations:

observable

Looking at the diagram below, it is easy to see that the map operator transforms the elements returned by Observable by applying a function to each of them.

map

A good illustration of the capabilities of ReactiveX is the RSS aggregator application. Here there is a need for asynchronous data loading, filtering and transformation of values, maintaining the current state through periodic updates.

In this article, examples for representing the core principles of ReactiveX are written using the rx library for the Python programming language. Here, for example, looks like an abstract observer implementation:

class Observer(metaclass=ABCMeta):
    @abstractmethod
    def on_next(self, value):
        return NotImplemented
    @abstractmethod
    def on_error(self, error):
        return NotImplemented
    @abstractmethod
    def on_completed(self):
        return NotImplemented

Our application in real time will exchange messages with the browser via web sockets. The ability to easily implement this is provided by Tornado .

The program starts with starting the server. When the browser accesses the server, a web socket opens.

The code

import json
import os
import feedparser
from rx import config, Observable
from rx.subjects import Subject
from tornado.escape import json_decode
from tornado.httpclient import AsyncHTTPClient
from tornado.platform.asyncio import AsyncIOMainLoop
from tornado.web import Application, RequestHandler, StaticFileHandler, url
from tornado.websocket import WebSocketHandler
asyncio = config['asyncio']
class WSHandler(WebSocketHandler):
    urls = ['https://lenta.ru/rss/top7',
            'http://wsrss.bbc.co.uk/russian/index.xml']
    def open(self):
        print("WebSocket opened")
        # здесь будет основная логика нашего приложения
    def on_message(self, message):
        obj = json_decode(message)
        # Отправляет сообщение, которое получает user_input
        self.subject.on_next(obj['term'])
    def on_close(self):
        # Отписаться от Observable; по цепочке остановит работу всех observable
        self.combine_latest_sbs.dispose()
        print("WebSocket closed")
class MainHandler(RequestHandler):
    def get(self):
        self.render("index.html")
def main():
    AsyncIOMainLoop().install()
    port = os.environ.get("PORT", 8080)
    app = Application([
        url(r"/", MainHandler),
        (r'/ws', WSHandler),
        (r'/static/(.*)', StaticFileHandler, {'path': "."})
    ])
    print("Starting server at port: %s" % port)
    app.listen(port)
    asyncio.get_event_loop().run_forever()


To process the request entered by the user, a Subject is created, upon subscribing to which it sends the default value (in our case, an empty string), and then once a second sends what the user entered and satisfies the conditions: length 0 or greater than 2, the value has changed.

   # Subject одновременно и Observable, и Observer
        self.subject = Subject()
        user_input = self.subject.throttle_last(
            1000  # На заданном временном промежутке получать последнее значение
        ).start_with(
            ''  # Сразу же после подписки отправляет значение по умолчанию
        ).filter(
            lambda text: len(text) == 0 or len(text) > 2
        ).distinct_until_changed()  # Только если значение изменилось

Also, for periodic news updates, an Observable is provided, which returns a value every 60 seconds.


 interval_obs = Observable.interval(
            60000  # Отдает значение раз в 60с (для периодического обновления)
        ).start_with(0)

These two streams are connected by the combine_latest operator, Observable is built into the chain to get a list of news. After which a subscription is created for this Observable, the whole chain starts working only at this moment.


        # combine_latest собирает 2 потока из запросов пользователя и временных
        # интервалов, срабатывает на любое сообщение из каждого потока
        self.combine_latest_sbs = user_input.combine_latest(
            interval_obs, lambda input_val, i: input_val
        ).do_action(  # Срабатывает на каждый выпущенный элемент
            # Отправляет сообщение для очистки списка на фронтенд
            lambda x: send_response('clear')
        ).flat_map(
            # В цепочку встраивается Observable для получения списка
            self.get_data
        ).subscribe(send_response, on_error)
        # Создается подписка; вся цепочка начинает работать только в этот момент

It is necessary to dwell in more detail on what “Observable for obtaining a list of news” is. From the url list for receiving news, we create a data stream whose elements come into the function, where using the Tornado AsyncHTTPClient HTTP client, asynchronously loads the data for each element of the urls list. They also create a data stream, which is filtered by the request entered by the user. From each stream we take 5 news items, which lead to the desired format for sending to the frontend.

The code

    def get_rss(self, rss_url):
        http_client = AsyncHTTPClient()
        return http_client.fetch(rss_url, method='GET')
    def get_data(self, query):
        # Observable создается из списка url
        return Observable.from_list(
            self.urls
        ).flat_map(
            # Для каждого url создается Observable, который загружает данные
            lambda url: Observable.from_future(self.get_rss(url))
        ).flat_map(
            # Полученные данные парсятся, из них создается Observable
            lambda x: Observable.from_list(
                feedparser.parse(x.body)['entries']
            ).filter(
                # Фильтрует по вхождению запроса в заголовок или текст новости
                lambda val, i: query in val.title or query in val.summary
            ).take(5)  # Берем только по 5 новостей по каждому url
        ).map(lambda x: {'title': x.title, 'link': x.link,
                         'published': x.published, 'summary': x.summary})
        # Преобразует данные для отправки на фронтенд


After the output data stream is formed, its subscriber begins to receive data element by element. The send_response function sends the received values ​​to the frontend, which adds news to the list.


        def send_response(x):
            self.write_message(json.dumps(x))
        def on_error(ex):
            print(ex)

In feeder.js file

The code
        ws.onmessage = function(msg) {
            var value = JSON.parse(msg.data);
            if (value === "clear") {$results.empty(); return;}
            // Append the results
            $('
  • ' + value.title +'

    ' + value.published + '

    ' + value.summary + '

  • ' ).appendTo($results); $results.show(); }


    Thus, push-technology is implemented, in which data comes from the server to the front-end, which only sends a user-entered request to search the news.

    In conclusion, I propose to think about what kind of implementation would have turned out with the usual approach using callbacks instead of Observable, without the ability to easily combine data streams, without the ability to instantly send data to the front-end consumer and with the need to track changes in the query string. The technology is practically not widespread among Python developers, however, I already see several possibilities to apply it on current projects.

    You can find an example of using ReactiveX for Python in github repositories with an RSS aggregator demo project.

    Also popular now: