Camel pull or Camel integration. Part 2
Apache Camel Integration Scenarios

How many application integration patterns (EIP) do you know? How many of them can you use?
The pretty "camel" is here again, which means I present to you the continuation of a series of articles about Apache Camel. In this article, you will find both the most necessary and very interesting integration patterns. I’ll tell you how they fall on our integration.
If you are familiar with templates, but decide whether to contact the “camel”, then our examples will help you figure it out. If you are interested in the path from usage scenarios to the implementation of integration, then this article is just about that. I ask for cat.
Let me remind you that we built our service bus on Apache Camel. The background was described in the previous part.. Now Camel is already a given for us to fight against. In our “zoo” has been added, in addition to the latter, we have two systems. The first is our main system, which is built on the classic three-tier “client-server” architecture. It is a BPMS system, the complexity of which is due to the long-term process of “finishing” small “Wishlist”. The second is a small and simple, like boots, system of the same architecture. We will call it the sales office. We will analyze the use cases just by her example.
Questionnaire system with the need for integration.
Our sales office is an application that does not have complex logic. It implements the process of registering customers and entering the documents necessary for the order. This system is open to customers after registration, so the requirements for response time and bandwidth are of a higher order than for the main system. Limitations of security, bandwidth, independent configuration and the need to maintain a separate development cycle led to the separation of the sales office in a standalone application. To service the sales process, we needed its integration into our customer’s IT infrastructure.
We will begin our acquaintance with the integration with the use of the sales office. We identified two main roles in the business process of our application: the buyer and the administrator. It is these user groups that work with him.

Detailed use case diagram

The diagram shows the main use cases. Pay attention to the right side of the diagram, several precedents go beyond the sales office and require certain actions from the main system. It is these precedents that we will analyze further.
To form a holistic view of our office, we will analyze the scheme. Our buyers apply for auctions. Holding auctions is the main process of the sales office. To participate in them, buyers must register and fill out applications by filling out a large number of boring registration forms. The administrator performs the functions of preparing and conducting auctions. To do this, a special application template is prepared for each auction - a questionnaire, so the class of such systems is sometimes called questionnaire. The auction process ends for the sales office by transferring information on submitted applications to the main system to select the winner and complete the transaction.
Qualitative requirements for integration processes:
- Losses of reference information are permissible and should not lead to disruption of the functioning of the subsystems;
- Contacting external systems should not interfere with the auction process in the event of failure of external systems and / or communication channels;
- Applications must be guaranteed to be transmitted and delivered between subsystems;
- Files must be guaranteed to be transferred and delivered between subsystems;
- The user load on the auctions should fall entirely on the sales office, offloading the main system. Therefore, interactions between systems should be minimized;
- The sales office user should not have mechanisms for influencing the main system.
When parsing the indicated use cases, the following architectural problems arose:
- Organization of transport;
- Distribution of functions between subsystems;
- Directory synchronization;
- Transmission of dependent entities;
- Monitoring the exchange process;
- File transfer.
Let us analyze the solutions to these architectural problems using usage scenarios as examples and correlate them with EIP templates.
Organization of transport.
The transport includes both of the above systems and Camel. A lot has already been said about the latter in the previous part of the article, so let's move on.
All three systems are connected by ActiveMQ broker via AMQP protocol.
Let me remind you, systems exchange packets using JMS. They decided to make the payload of these packages XML and serialize the objects of one of the systems into it using JAXB. But which objects should be taken as a basis, while minimizing the time required to create integration? There are two systems, which means there can be two formats. We decided to dwell on the objects of the sales office, this system is more lightweight, the domain area objects are connected with other architectural layers of this application only with JPA annotations. Allocation of transport objects on its basis was not difficult. An alternative solution (to use the objects of the main system) seemed almost impossible due to the presence of a large amount of metadata and complex relationships with other business entities that went beyond the boundaries of the process of servicing the sales office we were interested in. Another remaining option is the creation of new transport facilities. He was not even considered, since he demanded the implementation of the procedure for both import and export in both systems, which in the first case could be abandoned for the subsystem of the sales office.

Perhaps you were surprised that we chose XML as the payload of our packages? But, I assure you, there were reasons for this. Serialization using the JAXB standard is included in the JVM - this simplifies working with it and does not require additional modules. At the time of the integration development, we already had experience working with JAXB, so no overhead was required for familiarization. Another “bun” is that XML is a text format, which means that in case of emergency, you can intervene in its structure and make the necessary adjustments. But there were drawbacks: it is known that the structure of the XML format noticeably increases the amount of data. However, the information that we planned to exchange, according to initial estimates, did not exceed 100 mb for reference information, and 1 mb for applications for applications. These, you see, are not very big numbers. Besides,
A few details about the architecture of the formation and analysis of messages.

Before that, we described the mechanisms for transmitting and working with messages, without directly touching Camel. It's time to tell what functions were assigned to him, and how he was built into the integration system.
So the functions are:
- Independent configuration of message routing;
- Messaging from office queues to main system queues and vice versa;
- Harmonization of formats for unification of endpoints.
To simplify the binding of systems by a single message broker, the names of queues and topics must be unified. We use such an agreement:
[наименование системы].[наименование функционального блока].[направление передачи данных в канале]
However, it soon turned out that the fewer points in a complex system, the easier the setup. All the functionality for parsing messages and routing is easily transferred to the service bus. So for our main system, very soon there was one endpoint left for sending data:
bpms.office.export
Distribution of functions between subsystems.
Let's look at this distribution in terms of integration architecture. The sales office is responsible for:
- processing, configuration and placement of lots;
- all tasks of generating and checking packet data with information about applications;
- preparation and transfer of additional files of registered applications;
- synchronization of reference information.
Service Bus:
- transfer of reference information;
- transfer of information about lots and auctions;
- transfer of application data.
The main system:
- preparation and sending of reference information;
- initiation and submission of an auction task;
- building a further business process for processing applications;
Go ahead, now real examples.
Directory synchronization.
The process of synchronizing directories is based on the precedents given above. The scheme is as follows:


The diagram shows that two routes are used in the service bus. Messages pass through a single output channel of the main system (BPMS), are filtered and transmitted to a specialized channel responsible for working with reference information. Here, the message format is consistent with the office system, they are combined into catalogs (aggregated) and transferred to the sales office system through another JMS channel.
The diagram uses several EIP patterns :
- Message channel - message channel;
- Endpoint - the endpoint;
- Message filter - message filter;
- Message translator - a template for the message format conversion component;
- Aggregator is a template that allows you to combine several messages into one.
Why is it so hard? Let's go in order. The use of a message filter is necessary to simplify the configuration and appeared here after combining the output endpoints into one. Cons of the approach:
- mutual blocking of message transmission is possible. High messaging of one of the types served (for example, with reference information) can overflow the buffer of a single channel and block the transmission of messages of another type. It turns out that the exchange of messages of one type depends on messages of another type.
This possibility cannot be completely ruled out, but the effect can be reduced by creating additional buffer channels for each type of message. The ActiveMQ broker supports separate configuration of resource allocation for each such channel, so you can take care of the required memory and disk space, taking into account the exchange rate.
Let us return to our route, then the messages are transmitted just through such a buffer JMS channel. The next interesting point is the component that combines packages. It is necessary due to the fact that synchronization requires a complete set of directory elements. The easiest way to ensure completeness is to collect all the elements in one message. The critical minus of message aggregation is the large amount of message received, which can become a problem for both the broker and the service bus.
Fortunately, our directories are not so large, and simplicity came first. But this problem should not be underestimated, in the broker's settings for each channel a limited amount of RAM is allocated, exceeding which can lead to a critical error.
Route example:
from("jms:topic:bpms.office.request").routeId("catalog-synchronization-filter")
.filter( header("destination").isEqualTo( "portal.export.catalog" ) )
.setHeader("catalogCode", simple("${header.catalog}"))
.inOnly("jms:topic:catalog.synchronized");
from("jms:topic:catalog.synchronized").routeId("catalog-synchronization-topic")
.filter( header("catalogCode").in(
...
"GRNTI","OKATO","OKFS","OKOGU","OKOPF","OKVED",
… ) )
.setHeader("catalogCode")
.groovy( "switch( request.getHeaders().get('catalogCode') ){" +
...
" case \"GRNTI\": return \"GRNTICatalog\"" +
" case \"OKATO\": return \"okatoCatalog\"" +
" case \"OKFS\": return \"okfsCatalog\"" +
" case \"OKOGU\": return \"okoguCatalog\"" +
" case \"OKOPF\": return \"okopfCatalog\"" +
" case \"OKVED\": return \"okvedCatalog\"" +
...
"}")
.inOnly( "direct:office.synchronization");
from("direct:office.synchronized").routeId("catalog-import-office-filter")
.aggregate(header("catalogCode"), new CatalogItemAggregationStrategy()).completionTimeout(3000)
.inOnly("jms:topic:office.catalog.synchronization?timeToLive=200000");
the details
private static class CatalogItemAggregationStrategy implements AggregationStrategy
{
public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
{
String newBody = newExchange.getIn().getBody(String.class);
if (oldExchange != null)
{
String oldCode = (String)oldExchange.getIn().getHeader("catalogCode");
String newCode = (String)newExchange.getIn().getHeader("catalogCode");
if( StringUtils.equals( oldCode, newCode) )
{
String oldBody = oldExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(
StringUtils.substringBeforeLast( oldBody, "\n") +
StringUtils.substringAfter( newBody, "?>") + "\n" );
return oldExchange;
}
}
StringBuilder builder = new StringBuilder( newBody );
builder.insert( builder.indexOf("\n") + 1, "\n");
builder.append( " \n" );
newExchange.getIn().setBody( builder.toString());
return newExchange;
}
}
Transmission of dependent entities.
To begin with, we will draw up a diagram of the process of publishing a lot with an eye on the use-case diagram.

As you can see, the transfer process is relatively simple. You can transfer the process scheme to EIP, you get the scheme already known from the previous part:

There is a weakness in simplicity; pay attention to the detailed scheme for publishing lots in BPMN notation.

So it turns out that you can unload the lot only after the auction is created. The process of publishing an auction is shown in gray in the first diagram of this section. To solve the problem of related processes, two solutions were proposed:
- Implement support for distributed transactions and wrap a group of messages with a transaction;
- Transfer all data in one message.
The second option seemed to be simpler and faster in the implementation, and we stopped at it. We needed to create stubs and transmit minimal information about missing objects. Example messages:
fs000000000is3ro0d708me9ms 2011-1.5-051-001
Тема Цель тестового лота fs000000000gtk9f5oa05h426o Название аукциона
An example shows that the message contains information about the lot and minimum information about the auction.
As a result, the solution we used to transfer related objects to preserve the completeness of changes became a good alternative to distributed transactions. Move on.
Monitoring the exchange process.
We will analyze this task using the example of the process of obtaining and recording results, which remains key for both integration and the sales office. The process diagram is shown in the figure:

The process begins in the main system by sending a request for proposals by lot, this part is not presented in the diagram because a similar route was already discussed in the previous section. Further, when the request arrives at the remote office, the collection and verification of the applications submitted by customers by lot begins. Only fully filled and verified applications are processed at the time of receipt of the request from the main system. Applications are divided into fragments. Splitting into fragments allows you to use less memory and speed up the processing process. When the package with applications falls into the main system, it begins to be parsed and the entities of the main system created on its basis (import). We did not experiment with parallelism here, so as not to encounter database consistency problems in parallel transactions.
Scheme of transferring applications in EIP patterns.

Route tincture example:
from("jms:topic:bpms.office.request").routeId("bpms-request-order")
.filter( header("destination").isEqualTo( "office.order.request" ) )
.inOnly("jms:topic:bpms-to-office.order.request");
…
from("jms:queue:office.order.export").routeId("bpms-responce-order")
.log("going to bpms import: ${headers.JMSDestination}")
.wireTap("direct:order.audit")
.choice()
.when( header("customer").isEqualTo("bpms.import"))
.log("filling conumer trying recieve: ${headers.JMSDestination}, ${headers.importType}")
.inOnly("jms:queue:from.office.to.bpms.order.import")
.otherwise()
.to("log:office?multiline=true");
The process of transmitting a request is generally the same as in other cases. The new template used in it is " WireTap ". This component allows you to add an observer to the messaging process; in our example, it forwards the package with the application to the audit channel. Example:
from("direct:order.audit")
.split( xpath("//*[local-name()='demand']") )
.process( new Processor() {
@Override
public void process(Exchange item ) throws Exception {
Message in = item.getIn();
Message out = item.getOut();
out.setHeaders( in.getHeaders() );
out.setHeader("cliendId", in.getMessageId());
out.setHeader("level", "DEBUG");
out.setBody( String.format(
"Получена заявка №%s\n " +
"Передаётся на:%s\n " +
"Зарегистрированный callback:%s\n\n",
xpath("//*[local-name()='fullNumber']/text()").evaluate(item, String.class),
in.getHeader("customer"), in.getHeader("callbackUUID")) );
}
})
.inOnly("jms:topic:system.audit");
This route is much more interesting than the previous one, the messages here are applications serialized in XML. We split the incoming package into separate applications using a special component splitter and xpath expression. Further, we no longer need the entire application, so we leave only the information necessary for the audit and forward the package to the JMS general audit channel
jms:topic:system.audit
. In this channel, notifications about the status of the transfer and import of applications, and about all emergency situations are accumulated. Messages are returned to the main system and associated with the initiator of receiving results by the “ callbackUUID ” property . Example of a route returning part of audit messages to the main system:from("jms:topic:system.audit")
.filter( PredicateBuilder.and(
header("callbackUUID").isNotNull(),
header("fcntp.audit").isNull() ) )
.setHeader("system.audit", simple( "true", Boolean.class ))
.inOnly("fcntpJms:topic:fcntp.audit?timeToLive=10000");
Advantages of the approach:
- all messages are accumulated in one channel and processed uniformly.
- only one channel needs to be configured - this means less settings in the message broker.
Newly Used EIP Templates:
- Wire Tap - routing a copy of a message;
- Splitter is a component that allows you to split a large message into fragments.
File transfer.
File transfer is not a very simple task, especially if solved by means of JMS. Let's analyze this case. There was nothing formalized in the requirements for this task, it was necessary to transmit data of unlimited volume. In Camel there was no ready-made solution in order to overtake a file from one server to another. The implementation of splitting files into fixed pieces with subsequent gluing met a number of difficulties:
- it was necessary to control the order of the fragments;
- implement your own mechanism for restoring a file from fragments;
- debug the transfer.
We decided to go the other way: in Camel, copying files between local folders is simple and flexible, this mechanism suited us. The complete file processing scheme is as follows: a file is placed by one system in a network folder, Camel finds it and copies it to another folder, and after copying it tells the main system that the file was transferred. Here is the diagram:

For completeness, here is the route we use:
RouteDefinition fileExport = (RouteDefinition)from("file:{{office.transport.dir}}?delete=true&exclude=.*\\.tmp")
.onException(IOException.class)
.maximumRedeliverise(1)
.handled(true)
.useOriginalMessage()
.wireTap("jms:topic:system.audit")
.transform(exceptionMessage())
.end()
.to("file:{{office.failed.dir}}").end()
.process( TimsetampProcsseor.newInstance("sendtimsetamp") )
.wireTap("jms:queue:office.files.import")
.newExchangeHeader("fileName", simple("${headers.CamelFileName}"))
.newExchangeBody( constant("progress") )
.end()
.to("file:{{bpms.transport.dir}}");
// notify bpms by topic:office.files.import
fileExport
.transform().constant("ok")
.delay( 20 )
.process( TimsetampProcessor.newInstance("sendtimsetamp") )
.setHeader("fileName", simple("${headers.CamelFileName}"))
// send notify
.inOnly("jms:queue:office.files.import");
As you can see in the picture above, files are copied in three stages. On the first - files are copied from the file storage to the transport folder of the sales office. Routing in the service bus is explained by the route; it is launched only when files become available in the transport folder of the remote office subsystem. The service bus constantly scans this folder and, as soon as the file gets there, immediately moves it to the transport folder of the main system. Next, the service bus creates a notification about the file’s movement and sends it to the queue
office.files.import
. In this route we use an exception handling mechanism, it ensures that if the file is in the transport folder of the sales office, the main system will receive an alert regardless of the success or failure of the file transfer. It's time to take stock.
The results.
Using one of our systems as an example, we became acquainted with the main integration scenarios. All examples of excerpts from a real application given here are used by us now. Our examples show how easy Camel routes are for integration patterns. The main architectural tasks presented in the article are covered by the means offered by Camel. Even difficult tasks such as copying files can be resolved using Camel. Let me remind you that the article focuses on message-based integration. I hope that, after reading the article, you were able to make sure that “Camel” is really good.
I plan to write about the errors and how to solve them in the next article (a kind of “To be continued ...”). Therefore, until we meet again.