Stream data transfer from REST service to MQ queue

Hi, Habr!

In this article, I will describe a method for developing a REST service that allows you to receive files and save them to the messaging system in streaming mode without having to store the entire file on the service side. A reverse scenario will also be described, in which the client will receive a file hosted in the messaging system as a response.

For clarity, I will provide code examples of the developed service on JEE7 for the IBM WebSphere Liberty Server application server, and the IBM MQ will act as a messaging system.
Nevertheless, the described method is also suitable for other similar platforms, i.e. Any JMS API provider can act as a messaging system, and any JEE server (for example, Apache Tomcat) can act as an application server.

Formulation of the problem


There was a need to implement a solution that would allow both receiving large files from the client (> 100 Mb) and transfer them to another geographically remote system, and in the opposite direction — transfer files from this system to the client as an answer. In view of the unreliable network channel between the client’s network and the application’s network, a messaging system is used to ensure guaranteed delivery between them.

The high-level solution includes three components:

  1. REST service - the task of which is to provide the client with the ability to transfer a file (or request).
  2. MQ - is responsible for sending messages between different networks.
  3. Application - the application responsible for storing files and issuing them on request.

image

In this article I describe how to implement a REST service, the tasks of which include:

  • Receiving a file from a client.
  • Transfer the resulting file to MQ.
  • File transfer from MQ to client as an answer.

Solution Method


In view of the large size of the file being transferred, there is no possibility of placing it completely in RAM, moreover, there is also a limitation imposed by MQ - the maximum size of a single message in MQ cannot exceed 100 Mb. Thus, my decision will be based on the following principles:

  • Receiving a file and storing it in the MQ queue should be performed in streaming mode, without storing the entire file in memory.
  • In the MQ queue, the file will be placed as a set of small messages.

Graphically, the location of the file on the client, REST service and MQ is shown below:

image

On the client side, the file is completely located on the file system, in the REST service only a portion of the file is stored in RAM, and on the MQ side, each portion of the file is placed as a separate message.

REST service development


For clarity, the proposed solution method will be developed demo REST service, containing two methods:

  • upload - receives a file from the client and writes it to the MQ queue, returns the message group identifier (in base64 format) as a response.
  • download - receives the message group identifier from the client (in base64 format) and returns the file stored in the MQ queue.

Method of receiving a file from the client (upload)


The task of the method is to receive the stream of the incoming file and then write it to the MQ queue.

Receive stream incoming file


To receive an incoming file from the client, the method expects as an input parameter an object with the com.ibm.websphere.jaxrs20.multipart.IMultipartBody interface, which provides the ability to get a link to the input file stream

@PUT@Path("upload")
public Response upload(IMultipartBody body){
	...
	IAttachment attachment = body.getAttachment("file");
	InputStream inputStream = attachment.getDataHandler().getInputStream();
	...
}

This interface (IMultipartBody) is in the JAR archive com.ibm.websphere.appserver.api.jaxrs20_1.0.21.jar, shipped with the IBM Liberty Server, and is located in the folder: < WLP_INSTALLATION_PATH > / dev / api / ibm.

Note:

  • WLP_INSTALLATION_PATH is the path to the WebSphere Liberty Profile directory.
  • It is expected that the client will transfer the file in a parameter named "file".
  • If you use another application server, you can use the alternative library from Apache CXF.

Stream file saving in MQ


The method receives as input a stream of the incoming file, the name of the MQ queue where the file should be written, and the identifier of the message group that will be used to link the messages. The group identifier is generated on the service side, for example, by the org.apache.commons.lang3.RandomStringUtils utility:

String groupId = RandomStringUtils.randomAscii(24);

The algorithm for saving an incoming file in MQ consists of the following steps:

  1. Initializing MQ Connection Objects.
  2. Cyclic reading of a portion of the incoming file until the file is completely read:
    1. A portion of the file data is recorded as a separate message in the MQ.
    2. Each message file has its own sequence number (property "JMSXGroupSeq").
    3. All file messages have the same group value (the “JMSXGroupID” property).
    4. The last message has a sign indicating that this message is final (property “JMS_IBM_Last_Msg_In_Group”).
    5. The constant SEGMENT_SIZE contains the portion size. For example, 1Mb.

publicvoidwrite(InputStream inputStream, String queueName, String groupId)throws IOException, JMSException {
	try (
		Connection connection = connectionFactory.createConnection();
		Session session = connection.createSession();
		MessageProducer producer = session.createProducer(session.createQueue(queueName));
	) {
		byte[] buffer = newbyte[SEGMENT_SIZE];
		BytesMessage message = null;
		for(int readBytesSize = 1, sequenceNumber = 1; readBytesSize > 0; sequenceNumber++) {
			readBytesSize = inputStream.read(buffer);
			if (message != null) {
				if (readBytesSize < 1) {
					message.setBooleanProperty("JMS_IBM_Last_Msg_In_Group", true);
				}	producer.send(message);
			}
			if (readBytesSize > 0) {
				message = session.createBytesMessage();
				message.setStringProperty("JMSXGroupID", groupId);
				message.setIntProperty("JMSXGroupSeq", sequenceNumber);
				if (readBytesSize == SEGMENT_SIZE) {
					message.writeBytes(buffer);
				} else {
					message.writeBytes(Arrays.copyOf(buffer, readBytesSize));
				}
			}
		}
	}
}

Method of sending file to client (download)


The method receives the group identifier of messages in base64 format, by which it reads messages from the MQ queue and sends it as a streaming response.

Retrieving message group id


As an input parameter, the method gets the ID of the message group.

@PUT@Path("download")
public Response download(@QueryParam("groupId") String groupId) {
	...
}

Stream client response


To transfer a file stored as a set of individual messages to MQ in streaming mode, create a class with the javax.ws.rs.core.StreamingOutput interface:

publicclassMQStreamingOutputimplementsStreamingOutput{
	private String groupId;
	private String queueName;
	publicMQStreamingOutput(String groupId, String queueName){
		super();
		this.groupId = groupId;
		this.queueName = queueName;
	}
	@Overridepublicvoidwrite(OutputStream outputStream)throws IOException, WebApplicationException {
		try {
			new MQWorker().read(outputStream, queueName, groupId);
		} catch(NamingException | JMSException e) {
			e.printStackTrace();
			new IOException(e);
		} finally {
			outputStream.flush();
			outputStream.close();
		}
	}
}

In the class, we implement the write method, which receives as input a link to the outgoing stream to which messages from MQ will be written. I added to the class another queue name and group identifier whose messages will be read.

An object of this class will be passed as a parameter to create a response to the client:

@GET@Path("download")
public Response download(@QueryParam("groupId") String groupId) {
	ResponseBuilder responseBuilder = null;
	try {
		MQStreamingOutput streamingOutput = new MQStreamingOutput(new String(Utils.decodeBase64(groupId)), Utils.QUEUE_NAME);
		responseBuilder = Response.ok(streamingOutput);	
	} catch(Exception e) {
		e.printStackTrace();
	responseBuilder.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage());
	}
	return responseBuilder.build();
}

Stream file read from MQ


The algorithm for reading messages from MQ to the outgoing stream consists of the following steps:

  1. Initializing MQ Connection Objects.
  2. Cyclic reading of messages from MQ until the message with the terminator in the group is read (property “JMS_IBM_Last_Msg_In_Group”):
    1. Before each reading of a message from the queue, a filter (messageSelector) is set, in which the identifier of the message group and the sequence number of the message in the group are specified.
    2. The content of the read message is written to the outgoing stream.


publicvoidread(OutputStream outputStream, String queueName, String groupId)throws IOException, JMSException {
	try(
		Connection connection = connectionFactory.createConnection();
		Session session = connection.createSession();
	) {
		connection.start();
		Queue queue = session.createQueue(queueName);
		int sequenceNumber = 1;
		for(boolean isMessageExist = true; isMessageExist == true; ) {
			String messageSelector = "JMSXGroupID='" + groupId.replaceAll("'", "''") + "' AND JMSXGroupSeq=" + sequenceNumber++;
			try(
				MessageConsumer consumer = session.createConsumer(queue, messageSelector);
					) {
				BytesMessage message = (BytesMessage) consumer.receiveNoWait();
				if (message == null) {
					isMessageExist = false;
				} else {
					byte[] buffer = newbyte[(int) message.getBodyLength()];
					message.readBytes(buffer);
					outputStream.write(buffer);
					if (message.getBooleanProperty("JMS_IBM_Last_Msg_In_Group")) {
						isMessageExist = false;
					}
				}
			}
		}
	}
}

Call REST service


To test the service, I will use the curl tool.

Sending file


curl -X PUT -F file=@<путь_к_файлу> http://localhost:9080/Demo/rest/service/upload

In response, a base64 string containing the message group identifier will be received, which we will indicate in the following method to get the file.

Receiving a file


curl -X GET http://localhost:9080/Demo/rest/service/download?groupId=<base64_строка_идентификатор_группы_сообщений> -o <путь_к_файлу_куда_запишется_ответ>

Conclusion


The article reviewed the approach to developing a REST service, which allows streaming both to receive and store large data in the messaging system queue, and to read them from the queue for return as a response. This method allows to reduce the use of resources, and thereby increase the throughput of the solution.

Additional materials


More information about the IMultipartBody interface used to receive the incoming file stream is a link .

An alternative library for streaming files in REST services is Apache CXF .

Interface StreamingOutput for streaming return REST response to the client - link .

Also popular now: