Problems of batch processing of requests and their solutions (part 1)

    Almost all modern software products consist of several services. Often, long inter-service channel response times become a source of performance problems. The standard solution to this kind of problem is to pack several interservice requests into one package, which is called batching.

    If you use batch processing, you may not be happy with its result in terms of performance or code comprehensibility. This method is not as easy for the caller as you might think. For different purposes and in different situations, decisions can vary greatly. On specific examples, I will show the pros and cons of several approaches.

    Demo project


    For clarity, consider an example of one of the services in the application that I am currently working on.

    An explanation of the choice of platform for examples
    The problem of poor performance is quite general and does not apply to any specific languages ​​and platforms. This article will use Spring + Kotlin code examples to demonstrate tasks and solutions. Kotlin is equally understandable (or incomprehensible) to Java and C # developers, in addition, the code is more compact and understandable than in Java. To facilitate understanding for pure Java developers, I will avoid Kotlin black magic and use only white (in the spirit of Lombok). There will be a few extension methods, but they are actually familiar to all Java programmers as static methods, so this will be a little sugar that will not spoil the taste of the dish.

    There is a document approval service. Someone creates a document and submits it for discussion, during which edits are made, and ultimately the document is consistent. The reconciliation service itself does not know anything about documents: it is just a chat of coordinators with small additional functions, which we will not consider here.

    So, there are chat rooms (corresponding to documents) with a predefined set of participants in each of them. Like in regular chats, messages contain text and files and can be replies and forwards: File and user links are links to other domains. It lives with us like this: User data is stored in Keycloak and obtained through REST. The same goes for files: files and meta-information about them live in a separate file storage service.

    data class ChatMessage(
      // nullable так как появляется только после persist
      val id: Long? = null,
      /** Ссылка на автора */
      val author: UserReference,
      /** Сообщение */
      val message: String,
      /** Ссылки на аттачи */
      // из-за особенностей связки JPA+СУБД проще поддерживать и null, и пустые списки
      val files: List<FileReference>? = null,
      /** Если является ответом, то здесь будет оригинал */
      val replyTo: ChatMessage? = null,
      /** Если является пересылкой, то здесь будет оригинал */
      val forwardFrom: ChatMessage? = null
    )




    typealias FileReference Long
    typealias UserReference Long



    All calls to these services are heavy requests . This means that the overhead for transporting these requests is much greater than the time it takes for them to be processed by a third-party service. On our test stands, the typical call time for such services is 100 ms, so in the future we will use these numbers.

    We need to make a simple REST controller to receive the last N messages with all the necessary information. That is, we believe that in the frontend, the message model is almost the same and we need to send all the data. The difference between the model for the frontend is that the file and the user need to be presented in a slightly decrypted form in order to make them links: We need to implement the following: Postfix UI means DTO models for the frontend, that is, what we must give through REST.

    /** В таком виде отдаются ссылки на сущности для фронта */
    data class ReferenceUI(
      /** Идентификатор для url */
      val ref: String,
      /** Видимое пользователю название ссылки */
      val name: String
    )
    data class ChatMessageUI(
      val id: Long,
      /** Ссылка на автора */
      val author: ReferenceUI,
      /** Сообщение */
      val message: String,
      /** Ссылки на аттачи */
      val files: List<ReferenceUI>,
      /** Если являтся ответом, то здесь будет оригинал */
      val replyTo: ChatMessageUI? = null,
      /** Если являтся пересылкой, то здесь будет оригинал */
      val forwardFrom: ChatMessageUI? = null
    )




    interface ChatRestApi {
      fun getLast(nInt): List<ChatMessageUI>
    }




    It may seem surprising here that we do not pass any chat identifier and even in the ChatMessage / ChatMessageUI model it is not. I did this on purpose, so as not to clutter up the code for the examples (the chats are isolated, so we can assume that we have one at all).

    Philosophical Retreat
    Both the ChatMessageUI class and the ChatRestApi.getLast method use the List data type, whereas this is actually an ordered Set. In the JDK, this is all bad, so declaring the order of elements at the interface level (maintaining order when adding and removing) will fail. So it’s common practice to use List in cases where you need an ordered Set (there is still a LinkedHashSet, but this is not an interface).

    An important limitation: we assume that there are no long chains of responses or forwards. That is, they are, but their length does not exceed three messages. The front-end message chain must be transmitted in its entirety.

    To receive data from external services, there are such APIs: It can be seen that batch processing is initially provided in external services, and in both cases: through Set (without preserving the order of elements, with unique keys) and through List (there may be duplicates - the order is preserved) .

    interface ChatMessageRepository {
      fun findLast(nInt): List<ChatMessage>
    }
    data class FileHeadRemote(
      val id: FileReference,
      val name: String
    )
    interface FileRemoteApi {
      fun getHeadById(idFileReference): FileHeadRemote
      fun getHeadsByIds(idSet<FileReference>): Set<FileHeadRemote>
      fun getHeadsByIds(idList<FileReference>): List<FileHeadRemote>
      fun getHeadsByChat(): List<FileHeadRemote>
    }
    data class UserRemote(
      val id: UserReference,
      val name: String
    )
    interface UserRemoteApi {
      fun getUserById(idUserReference): UserRemote
      fun getUsersByIds(idSet<UserReference>): Set<UserRemote>
      fun getUsersByIds(idList<UserReference>): List<UserRemote>
    }




    Simple implementations


    Naive implementation


    The first naive implementation of our REST controller will look something like this in most cases: Everything is extremely clear, and this is a big plus. We use batch processing and receive data from an external service in batches. But what is happening with performance? For each message, one call to UserRemoteApi will be made to obtain data on the author field and one call to FileRemoteApi to receive all attached files. It seems to be all. Assume that the forwardFrom and replyTo fields for ChatMessage are obtained so that this does not require extra calls. But turning them into ChatMessageUI will lead to recursion, that is, the performance of call counts can greatly increase. As we noted earlier, let’s say that we don’t have much nesting and the chain is limited to three messages.

    class ChatRestController(
      private val messageRepository: ChatMessageRepository,
      private val userRepository: UserRemoteApi,
      private val fileRepository: FileRemoteApi
    ) : ChatRestApi {
      override fun getLast(nInt) =
        messageRepository.findLast(n)
          .map it.toFrontModel() }
      
      private fun ChatMessage.toFrontModel(): ChatMessageUI =
        ChatMessageUI(
          id = id ?: throw IllegalStateException("$this must be persisted"),
          author = userRepository.getUserById(author).toFrontReference(),
          message = message,
          files = files?.let files ->
            fileRepository.getHeadsByIds(files)
              .map it.toFrontReference() }
          } ?: listOf(),
          forwardFrom = forwardFrom?.toFrontModel(),
          replyTo = replyTo?.toFrontModel()
        )
    }







    As a result, we get from two to six calls to external services per message and one JPA call to the entire message packet. The total number of calls will vary from 2 * N + 1 to 6 * N + 1. How much is this in real units? Suppose you need 20 posts to render a page. To get them, you need from 4 s to 10 s. Awful I would like to meet the 500 ms. And since the front-end dreamed of making a seamless scroll, the performance requirements of this endpoint can be doubled.

    Pros:

    1. The code is concise and self-documenting (support's dream).
    2. The code is simple, so there are almost no opportunities to shoot in the leg.
    3. Batch processing does not look alien and organically fit into the logic.
    4. Logic changes will be made easily and will be local.

    Minus:

    Terrible performance due to the fact that the packets are very small.

    This approach can often be seen in simple services or in prototypes. If speed of change is important, it is hardly worth complicating the system. At the same time, for our very simple service, the performance is terrible, so the scope of applicability of this approach is very narrow.

    Naive parallel processing


    You can start processing all messages in parallel - this will get rid of a linear increase in time depending on the number of messages. This is not a particularly good way, because it will lead to a large peak load on the external service.

    Implementing parallel processing is very simple: Using parallel processing of messages, we get 300–700 ms ideally, which is much better than with a naive implementation, but still not fast enough. With this approach, requests to userRepository and fileRepository will be executed synchronously, which is not very efficient. To fix this, you will have to change the logic of calls quite a lot. For example, through CompletionStage (aka CompletableFuture):

    override fun getLast(nInt) =
      messageRepository.findLast(n).parallelStream()
        .map it.toFrontModel() }
        .collect(toList())






    private fun ChatMessage.toFrontModel(): ChatMessageUI =
      CompletableFuture.supplyAsync {
        userRepository.getUserById(author).toFrontReference()
      }.thenCombine(
        files?.let {
          CompletableFuture.supplyAsync {
            fileRepository.getHeadsByIds(files).map it.toFrontReference() }
          }
        } ?: CompletableFuture.completedFuture(listOf())
      ) authorfiles ->
        ChatMessageUI(
          id = id ?: throw IllegalStateException("$this must be persisted"),
          author = author,
          message = message,
          files = files,
          forwardFrom = forwardFrom?.toFrontModel(),
          replyTo = replyTo?.toFrontModel()
        )
      }.get()!!


    It can be seen that the initially simple mapping code has become less clear. This is because we had to separate external service calls from where the results were used. This in itself is not bad. But the combination of calls does not look very elegant and resembles a typical reactive "noodle".

    If you use coroutines, everything will look more decent: Where: Theoretically, using such parallel processing, we will get 200-400 ms, which is already close to our expectations. Unfortunately, such a good parallelization does not happen, and the payback is pretty cruel: with just a few users working at the same time, a flurry of requests will fall on the services, which still will not be processed in parallel, so we will return to our sad 4 s.

    private fun ChatMessage.toFrontModel(): ChatMessageUI =
      join(
        userRepository.getUserById(author).toFrontReference() },
        files?.let fileRepository.getHeadsByIds(files)
          .map it.toFrontReference() } } ?: listOf() }
      ).let (author, files) ->
        ChatMessageUI(
          id = id ?: throw IllegalStateException("$this must be persisted"),
          author = author,
          message = message,
          files = files,
          forwardFrom = forwardFrom?.toFrontModel(),
          replyTo = replyTo?.toFrontModel()
        )
      }




    fun <ABjoin(a: () -> Ab: () -> B) =
      runBlocking(IO{
        awaitAll(async a() }async b() })
      }.let {
        it[0as to it[1as B
      }






    My result when using such a service is 1300-1700 ms for processing 20 messages. This is faster than in the first implementation, but still does not solve the problem.

    Alternative use of parallel queries
    What if batch processing is not provided in third-party services? For example, you can hide the lack of implementation of batch processing inside interface methods: This makes sense if there is hope for the appearance of batch processing in future versions.

    interface UserRemoteApi {
      fun getUserById(idUserReference): UserRemote
      fun getUsersByIds(idSet<UserReference>): Set<UserRemote> =
        id.parallelStream()
          .map getUserById(it}.collect(toSet())
      fun getUsersByIds(idList<UserReference>): List<UserRemote> =
        id.parallelStream()
          .map getUserById(it}.collect(toList())
    }



    Pros:

    1. Easy implementation of concurrent message processing.
    2. Good scalability.

    Minuses:

    1. The need to separate the receipt of data from their processing in parallel processing requests for different services.
    2. Increased load on third-party services.

    It can be seen that the scope of applicability is approximately the same as that of the naive approach. Using the parallel query method makes sense if you want to increase the performance of your service several times due to the merciless exploitation of others. In our example, productivity increased 2.5 times, but this is clearly not enough.

    Caching


    You can do JPA-style caching for external services, that is, store received objects within the session so as not to receive them again (including during batch processing). You can do these caches yourself, you can use Spring with its @Cacheable, plus you can always use a ready-made cache like EhCache manually.

    The general problem will be related to the fact that there is good sense from caches only if there are hits. In our case, hits on the author field (say, 50%) are very likely, and there will be no hits on files at all. This approach will bring some improvements, but the performance will not change radically (and we need a breakthrough).

    Intersession (long) caches require complex invalidation logic. In general, the later you get to the point that you will solve performance problems with intersessional caches, the better.

    Pros:

    1. Implement caching without changing the code.
    2. Performance increase several times (in some cases).

    Minuses:

    1. Possibility to reduce performance if used improperly.
    2. Large memory overhead, especially with long caches.
    3. Complex invalidation, errors in which will lead to difficult problems in runtime.

    Very often, caches are only used to quickly patch up design issues. This does not mean that they do not need to be used. However, it is always worthwhile to treat them with caution and first evaluate the resulting performance gain, and only then make a decision.

    In our example, the caches will have a performance increase of around 25%. At the same time, the caches have a lot of disadvantages, so I would not use them here.

    Summary


    So, we looked at the naive implementation of a service that uses batch processing, and some simple ways to speed it up.

    The main advantage of all these methods is simplicity, from which there are many pleasant consequences.

    A common problem with these methods is poor performance, primarily due to packet size. Therefore, if these solutions do not suit you, then it is worth considering more radical methods.

    There are two main areas in which you can look for solutions:

    • Asynchronous work with data (requires a paradigm shift, therefore, this article is not considered)
    • enlargement of packs while maintaining synchronous processing.

    The enlargement of the bundles will greatly reduce the number of external calls and at the same time keep the code synchronous. The next part of the article will be devoted to this topic.

    Also popular now: