Async Symphony: JavaFX Tasks and Netty Sockets

Original author: Carl
  • Transfer
Good Friday everyone!

We finally got our hands on the book about Netty, which was recommended to us including by grateful readers of our habroblog.



Frankly, for a long time nothing narrow-topic on Java came out. But the Netty topic arouses the most lively interest on Habré, so we decided to post a review on it (the author got the idea of ​​a post from this book) and arrange the most indicative survey. Come in, speak out!


This article describes how to integrate the Netty client / server framework into a JavaFX application. Although it is possible to write a distributed application using simple protocols that work according to the request / response model, for example, HTTP or RMI, these protocols are often inefficient and insufficiently functional for applications that require constant server updates, push notifications that perform long-term operations. Netty uses an efficient network implementation based on asynchronous processing and state-dependent connections. This structure allows you to do without additional tricks, for example, does not require a survey to update the client code.

By integrating Netty with JavaFX, you need to ensure that you interact with the UI in the interaction FX stream without blocking the UI. Thus, you need to wrap Netty calls in the Task class from FX. The FX Task class provides a thread for long-term operations, and in most cases you can let Netty just wait for a response ( wait()). This is done by calling sync (), which provides a lock but does not freeze the application.

This example is based on a program for exchanging echo requests between a client and a server, which I found in the book " Netty in Action " by Norman Maurer and Marvin Allen Wolftal. After the connection is established, the client collects the string java.lang.Stringand sends it to the server. The server converts this string usingtoUpperCase()and sends the received string back to the client. The client displays a string in the user interface.

All the code for this post is on GitHub .

Project

For convenience, I packed all the server and client code into one Maven project. The following UML class diagram shows which classes are in our program.



The class diagram of the echo client on FX

in EchoServerand EchoClientcontains functions main(), is the entry point for the server and client processes. It EchoServercontains Netty code for loading, linking, and creating a pipeline with a special handler EchoServerHandler. EchoClientcreates a user interface objectEchoClientControllerwhich contains Netty code for creating a connection, disconnecting, sending, and receiving. The controller EchoClientControlleralso creates a client pipeline using EchoClientHandler.

The diagram shows the connection / send / receive / disconnect sequence. It is not normalized, therefore some operations (“Enter Text”, “Netty Connect”) are nominal and are absent in the code. Data exchange in the program is mainly implemented using the standard binding of JavaFX and Netty Futures.



So, here is how our sequence looks schematically.

  1. The user clicks the Connect button.
  2. The controller EchoClientControllerbootstraps and connects to EchoServer.
  3. The user enters text and clicks the Send button.
  4. An operation is called in the channel writeAndFlush(). Methods channelRead()and channelReadComplete()handlers are EchoServerHandlercalled.
  5. The channelRead() handler method EchoServerHandlerexecutes its own method write(), and the method channelReadComplete()executes flush().
  6. EchoClientHandler receives data
  7. EchoClientHandlersets the property StringPropertyassociated with the UI. The field TextFieldin the UI is automatically updated .
  8. The user clicks the Disconnect button.
  9. The EchoClientController closes its Channel and disables the EventGroup (missing from the diagram).


Client Code

Since all the code is on GitHub, I will focus on the interaction of client JavaFX and Netty interaction in this article. I omit the trivial subclass of EchoClient JavaFX Application that creates the Stage and loads the EchoClient.fxml file. The client code that interests us is in the class EchoClientController.

connect ()

The method connect()takes the host and port from the UI and creates a Netty channel, which is then saved as a field EchoClientController.

From EchoClientController.java

@FXML
HBox hboxStatus;
@FXML
ProgressIndicator piStatus;
@FXML
Label lblStatus;
private BooleanProperty connected = new SimpleBooleanProperty(false);
private StringProperty receivingMessageModel = new SimpleStringProperty("");
private Channel channel;
@FXML
public void connect() {
 if( connected.get() ) {
  return;  // соединение уже установлено; предотвратить и отключить
 }
 String host = tfHost.getText();
 int port = Integer.parseInt(tfPort.getText());
 group = new NioEventLoopGroup();
 Task task = new Task() {
  @Override
  protected Channel call() throws Exception {
   updateMessage("Bootstrapping");
   updateProgress(0.1d, 1.0d);
   Bootstrap b = new Bootstrap();
   b
   .group(group)
   .channel(NioSocketChannel.class)
   .remoteAddress( new InetSocketAddress(host, port) )
   .handler( new ChannelInitializer() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
     ch.pipeline().addLast(new EchoClientHandler(receivingMessageModel));
    }
   });
   ChannelFuture f = b.connect();
   Channel chn = f.channel();
   updateMessage("Connecting");
   updateProgress(0.2d, 1.0d);
   f.sync();
   return chn;
  }
  @Override
  protected void succeeded() {
   channel = getValue();
   connected.set(true);
  }
  @Override
  protected void failed() {
   Throwable exc = getException();
   logger.error( "client connect error", exc );
   Alert alert = new Alert(AlertType.ERROR);
   alert.setTitle("Client");
   alert.setHeaderText( exc.getClass().getName() );
   alert.setContentText( exc.getMessage() );
   alert.showAndWait();
   connected.set(false);
  }
 };
 hboxStatus.visibleProperty().bind( task.runningProperty() );
 lblStatus.textProperty().bind( task.messageProperty() );
 piStatus.progressProperty().bind(task.progressProperty());
 new Thread(task).start();
}


Netty calls for boot and connection are wrapped in a JavaFX task. The task is a key concept when programming in JavaFX, and I have a rule: put in the task any code that could potentially run longer than a second. Thus, almost everything falls into my tasks, with the exception of manipulating Java objects in RAM.

The challenge provides several properties: runningProperty, messageProperty, progressProperty. I associate them with UI elements: HBox container, Label, ProgressIndicator indicator. JavaFX binding eliminates the need to register listeners and invoke setter () methods in user interface controls.

Methodcall()returns the channel. In this implementation, I don’t care about the asynchronous behavior of Netty - after all, I'm already working in the new one Thread()- so I can wait until the call returns sync(). The returned channel value is set in the method field succeeded(). If Netty throws an exception, the method is called failed(), the message is logged and displayed to the user in the dialog box.

Methods of succeeded(), failed(), updateMessage()and updateProgress()are performed in the FX flow and call()- not. The method call()should in no way update the UI. The call () method should only deal with the long-term operation of Netty.

send ()

The method send()uses the stored object Channelto call the method writeAndFlush(). This one writeAndFlush()will be launched using the delegate.EchoClientHandlerthrough the Netty framework.

Also from EchoClientController.java

@FXML
public void send() {
 if( !connected.get() ) {
  return;
 }
 final String toSend = tfSend.getText();
 Task task = new Task() {
  @Override
  protected Void call() throws Exception {
   ChannelFuture f = channel.writeAndFlush( Unpooled.copiedBuffer(toSend, CharsetUtil.UTF_8) );
   f.sync();
   return null;
  }
  @Override
  protected void failed() {
   Throwable exc = getException();
   logger.error( "client send error", exc );
   Alert alert = new Alert(AlertType.ERROR);
   alert.setTitle("Client");
   alert.setHeaderText( exc.getClass().getName() );
   alert.setContentText( exc.getMessage() );
   alert.showAndWait();
   connected.set(false);
  }
 };
 hboxStatus.visibleProperty().bind( task.runningProperty() );
 lblStatus.textProperty().bind( task.messageProperty() );
 piStatus.progressProperty().bind(task.progressProperty());
 new Thread(task).start();
}


Note the resemblance to connect(). The newly minted task is associated with the same three progress objects. The method succeeded()is not, and the method failed()contains the same logic as the handler implementation errors connect().

The task does not return anything (return type Void). In an optimistic scenario, the call should work immediately, but if it did not work, then you should wait for the error. Since call()I already have the method in the new thread, I can afford to wait in the method sync().

disconnect ()

The method disconnect()works with the Task task in the same way as the two previous methods. The other two methods use the same updateMessage / Progress pair. In this method, wrapping a connection to Netty takes place in two separate steps. To fulfillsync()close () doesn’t need much time. The method shutdownGracefully()takes much longer. However, in my experiments, the UI never hung.

@FXML
public void disconnect() {
 if( !connected.get() ) {
  return;
 }
 Task() {
  @Override
  protected Void call() throws Exception {
   updateMessage("Disconnecting");
   updateProgress(0.1d, 1.0d);
   channel.close().sync();     
   updateMessage("Closing group");
   updateProgress(0.5d, 1.0d);
   group.shutdownGracefully().sync();
   return null;
  }
  @Override
  protected void succeeded() {
   connected.set(false);
  }
  @Override
  protected void failed() {
   connected.set(false);
   Throwable t = getException();
   logger.error( "client disconnect error", t );
   Alert alert = new Alert(AlertType.ERROR);
   alert.setTitle("Client");
   alert.setHeaderText( t.getClass().getName() );
   alert.setContentText( t.getMessage() );
   alert.showAndWait();
  }
 };
 hboxStatus.visibleProperty().bind( task.runningProperty() );
 lblStatus.textProperty().bind( task.messageProperty() );
 piStatus.progressProperty().bind(task.progressProperty());
 new Thread(task).start();
}


Read

Reading from the server is mediated through the object EchoClientHandler. When creating this object, a reference is made to the property StringProperty, which is an element of the model, with which the user interface is also associated. I could pass the UI elements directly to the handler, but this violates the principle of separation of responsibilities and it becomes more difficult to apply this notification to several views at once. Thus, the property StringPropertycan be associated with any number of UI elements, and when an update arrives from the handler, all these UI elements are updated.

Here is the EchoClientHandler.java code. Pay attention to the protection of FX Thread in the method channelRead0().

@Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler {
 private Logger logger = LoggerFactory.getLogger( EchoClientHandler.class );
 private final StringProperty receivingMessageModel;
 public EchoClientHandler(StringProperty receivingMessageModel) {
  this.receivingMessageModel = receivingMessageModel;
 }
 @Override
 protected void channelRead0(ChannelHandlerContext arg0, ByteBuf in) 
              throws Exception {
  final String cm = in.toString(CharsetUtil.UTF_8);
  Platform.runLater( () -> receivingMessageModel.set(cm) );
 }
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  logger.error( "error in echo client", cause );
  ctx.close();
 } 
}


One final note about the binding sequence ... we don’t know when it will be called
channelRead0()(in this case, we rely on Netty asynchronous), but when such a call occurs, we will set up the model object. I am finishing updating the model object, providing some protection for FX Thread. FX - as this framework for binding - will update all the elements of the UI, for example TextField.

Closing remarks about client code

When integrating Netty with JavaFX, the most important thing is to use tasks. Thanks to the tasks, the UI does not freeze; thanks to the provided properties, all work can be tracked visually. Thanks to tasks, there is no need for asynchronous processing by Netty (at least at the application level), so tasks can be blocked as much as necessary without blocking the user interface. When receiving notifications of new data, try using the JavaFX binding mediated through the selected model object and update the UI in this way, rather than making separate calls to specific objects.

Server-

side code I just bring all the server-side code here without comment, as this article focuses on client aspects of Netty. A very similar example is in the book Manning

From EchoServer.java

public class EchoServer {
 private final int port;
 public EchoServer(int port) {
  this.port = port;
 }
 public static void main(String[] args) throws Exception {
  if( args.length != 1 ) {
   System.err.println("usage: java EchoServer port");
   System.exit(1);
  }
  int port = Integer.parseInt(args[0]);
  new EchoServer(port).start();
 }
 public void start() throws Exception {
  final EchoServerHandler echoServerHandler = new EchoServerHandler();
  EventLoopGroup group = new NioEventLoopGroup();
  try {
   ServerBootstrap b = new ServerBootstrap();
   b
    .group(group)
    .channel(NioServerSocketChannel.class)
    .localAddress(new InetSocketAddress(port))
    .childHandler(new ChannelInitializer() {
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline().addLast( echoServerHandler );
     }     
    });
   ChannelFuture f = b.bind().sync();
   f.channel().closeFuture().sync();
  } finally {
   group.shutdownGracefully().sync();
  }
 }
}
Из EchoServerHandler.java
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
 private Logger logger = LoggerFactory.getLogger( EchoServerHandler.class );
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  ByteBuf in = (ByteBuf)msg;
  String in_s = in.toString(CharsetUtil.UTF_8);
  String uc = in_s.toUpperCase();
  if( logger.isInfoEnabled() ) {
   logger.info("[READ] read " + in_s + ", writing " + uc);
  }
  in.setBytes(0,  uc.getBytes(CharsetUtil.UTF_8));
  ctx.write(in);  // записывает байты обратно к адресату (не сбрасывает)
 }
 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 
  if( logger.isDebugEnabled() ) {
   logger.debug("[READ COMPLETE]");
  }
  ctx.flush();
 }
 @Override
 public void channelActive(ChannelHandlerContext ctx) throws Exception {
  super.channelActive(ctx);
  if(logger.isDebugEnabled() ) {
   logger.debug("[CHANNEL ACTIVE]");
  }
  ctx.channel().closeFuture().addListener(f -> logger.debug("[CLOSE]"));
 }
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  logger.error( "error in echo server", cause);
  ctx.close();
 }
}


Although modern high-speed computers may well spend some of the cycles polling, thanks to the effective network level, your application will respond quickly and turn out to be dynamic, as well as save the server from unnecessary work.

Only registered users can participate in the survey. Please come in.

Book about Netty

  • 84.6% This 55 is of interest
  • 9.2% Not Interested 6
  • 6.1% Interested in another 4

Also popular now: