Asynchronous data exchange with a remote application via SSH

    Good day, friends and colleagues. My name is still Dmitry Smirnov, and I still, to my great pleasure, am the developer of the ISPsystem. Some time ago I started working on a completely new project that inspired me a lot, because the new one is in our case the absence of Legacy code and the support of old compilers. Hello, Boost, C ++ 17 and all the other joys of modern development.

    It so happened that all my past projects were multithreaded, respectively, I had very little experience with asynchronous solutions. This was the most pleasant for me in this development, in addition to modern powerful tools.

    One of the last related tasks was the need to write a wrapper over the libssh2 library .in the realities of an asynchronous application using Boost.Asio , and capable of spawning no more than two threads. About this and tell.



    Note: the author assumes that the reader is familiar with the basics of asynchronous development and boost :: asio.

    Task


    In general, the task was as follows: connect to a remote server using rsa-key or login and password; download a script to the remote machine and run it; read his answers and send him commands through the same connection. In this case, of course, not blocking the flow (which is half of the total possible pool).

    Disclaimer : I know that work with SSH is implemented in Poco, but I did not find a way to marry him with Asio, and writing something of my own was more interesting :-).

    Initialization


    To initialize and minimize the library, I decided to use the usual singleton:

    Init ()
    classLibSSH2 {public:
      staticvoidInit(){
         static LibSSH2 instance;
      }
    private:
      explicitLibSSH2(){
         if (libssh2_init(0) != 0) {
            throwstd::runtime_error("libssh2 initialization failed");
         }
      }
      ~LibSSH2() {
         std::cout << "shutdown libssh2" << std::endl;
         libssh2_exit();
      }
    };



    There are, of course, in this decision also pitfalls, according to my favorite reference book “A Thousand and One Way to Shoot Yourself in the Foot in C ++”. If someone generates a stream that will be forgotten to start eating, and the main one ends earlier, interesting special effects may well arise. But in this case, I will not take into account this possibility.

    Main entities


    After analyzing the example , it becomes clear that for our small library we need three simple entities: a socket, a session, and a pipe. As it is not bad to have synchronous instruments, for the time being we will leave Asio aside.

    Let's start with a simple socket:

    Socket
    classSocket {public:
      explicitSocket() : m_sock(socket(AF_INET, SOCK_STREAM, 0)){
         if (m_sock == -1) {
            throwstd::runtime_error("failed to create socket");
         }
      }
      ~Socket() { close(m_sock); }
    private:
      int m_sock = -1;
    }


    Now the session:

    Session
    classSession {public:
      explicitSession(constbool enable_compression) : m_session(libssh2_session_init()){
         if (m_session == nullptr) {
            throwstd::runtime_error("failed to create libssh2 session");
         }
         libssh2_session_set_blocking(m_session, 0);
         if (enable_compression) {
            libssh2_session_flag(m_session, LIBSSH2_FLAG_COMPRESS, 1);
         }
      }
      ~Session() {
         conststd::string desc = "Shutting down libssh2 session";
         libssh2_session_disconnect(m_session, desc.c_str());
         libssh2_session_free(m_session);
      }
    private:
      LIBSSH2_SESSION *m_session;
    }
    


    Since we now have a socket and a session, it would be nice to write a wait function for the socket in the realities of libssh2:

    Waiting for socket
    intWaitSocket()const{
      pollfd fds{};
      fds.fd = sock;
      fds.events = 0;
      if ((libssh2_session_block_directions(session) & LIBSSH2_SESSION_BLOCK_INBOUND) != 0) { 
         fds.events |= POLLIN;
      }
      if ((libssh2_session_block_directions(session) & LIBSSH2_SESSION_BLOCK_OUTBOUND) != 0) {
         fds.events |= POLLOUT;
      }
      return poll(&fds, 1, 10);
    }


    Actually, this is practically no different from the above example, except that it uses select instead of poll.

    There is a channel. There are several types of channels in libssh2: idle, SCP, direct tcp. We are interested in the simplest, basic channel:

    Channel
    classSimpleChannel {public:
      explicitSimpleChannel(session){
         while ((m_channel = libssh2_channel_open_session(session) == nullptr &&
               GetSessionLastError() == LIBSSH2_ERROR_EAGAIN) {
            WaitSocket();
         }
         if (m_channel == nullptr) {
            throwstd::runtime_error("Critical error while opening simple channel");
         }
      }
      void SendEof() {
         while (libssh2_channel_send_eof(m_channel) == LIBSSH2_ERROR_EAGAIN) {
            WaitSocket();
         }
         while (libssh2_channel_wait_eof(m_channel) == LIBSSH2_ERROR_EAGAIN) {
           WaitSocket();
         }
      }
      ~SimpleChannel() {
         CloseChannel();
      }
    private:
      void CloseChannel() {
         int rc;
         while ((rc = libssh2_channel_close(m_channel)) == LIBSSH2_ERROR_EAGAIN) {
            WaitSocket();
         }
         libssh2_channel_free(m_channel);
      }
      LIBSSH2_CHANNEL *m_channel;
    };


    Now that all the basic tools are ready, it remains to establish a connection with the host and perform the manipulations we need. Asynchronous recording to the channel and synchronous, of course, will be very different, but the process of establishing a connection is not.

    Therefore, we write the base class:

    Base connection
    classBaseConnectionImpl {protected:
      explicitBaseConnectionImpl(const SshConnectData &connect_data)///< Это любая удобная структура, содержащая информацию о подключении
            : m_session(connect_data.enable_compression)
            , m_connect_data(connect_data){
         LibSSH2::Init();
         ConnectSocket();
         HandShake();
         ProcessKnownHosts();
         Auth();
      }
      ///Следующие три метода понадобятся нам чуть позжеboolCheckSocket(int type)const{
         pollfd fds{};
         fds.fd = m_sock;
         fds.events = type;
         return poll(&fds, 1, 0) == 1;
      }
      boolWantRead()const{
         return CheckSocket(POLLIN);
      }
      boolWantWrite()const{
         return CheckSocket(POLLOUT);
      }
      /*Я не был уверен, что реализация соединения, которая почти полностью взята из примера
     * будет кому-то интересна.
     */voidConnectSocket(){...}
      voidHandShake(){...}
      voidAuth(){...}
      classSocketm_sock;classSessionm_session;classSimpleChannel;
      SshConnectData m_connect_data;
    }; 


    Now we are ready to write the simplest class for connecting to a remote host and executing some command on it:

    Synchronous connection
    classConnection::Impl : public BaseConnectionImpl {
    public:
      explicitImpl(const SshConnectData &connect_data)
            : BaseConnectionImpl(connect_data){}
      template <typename Begin>
      voidWriteToChannel(LIBSSH2_CHANNEL *channel, Begin ptr, size_t size){
         do {
            int rc;
            while ((rc = libssh2_channel_write(channel, ptr, size)) == LIBSSH2_ERROR_EAGAIN) {
               WaitSocket();
            }
            if (rc < 0) {
               break;
            }
            size -= rc;
            ptr += rc;
         } while (size != 0);
      }
      voidExecuteCommand(conststd::string &command, conststd::string &in = ""){
         SimpleChannel channel(*this);
         int return_code = libssh2_channel_exec(channel, command.c_str());
         if (return_code != 0 && return_code != LIBSSH2_ERROR_EAGAIN) {
            throwstd::runtime_error("Critical error while executing ssh command");
         }
         if (!in.empty()) {
            WriteToChannel(channel, in.c_str(), in.size());
            channel.SendEof();
         }
         std::string response;
         for (;;) {
            int rc;
            do {
               std::array<char, 4096> buffer{};
               rc = libssh2_channel_read(channel, buffer.data(), buffer.size());
               if (rc > 0) {
                  boost::range::copy(boost::adaptors::slice(buffer, 0, rc), std::back_inserter(response));
               } elseif (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN) {
                  throwstd::runtime_error("libssh2_channel_read error (" + std::to_string(rc) + ")");
               }
            } while (rc > 0);
            if (rc == LIBSSH2_ERROR_EAGAIN) {
               WaitSocket();
            } else {
               break;
            }
         }
       }
    };


    Until now, everything that we wrote was simply to bring the examples of libssh2 to a more civilized form. But now, having all the simple tools for synchronous writing data to the channel, we can go to Asio.

    Having a standard socket is good, but not very practical if you need to wait for its readiness to read / write asynchronously, while you are doing your own thing. Here comes boost :: asio :: ip :: tcp :: socket, which has a wonderful method:

    async_wait(wait_type, WaitHandler)

    It is wonderfully constructed from a normal socket, for which we have already established a connection in advance and boost :: asio :: io_context - the execution context of our application.

    Asynchronous Connection Designer
    classAsyncConnection::Impl : public BaseConnectionImpl,
     publicstd::enable_shared_from_this<AsyncConnection::Impl> {
    public:
    Impl(boost::asio::io_context &context, const SshConnectData &connect_data)
         : BaseConnectionImpl(connect_data)
         , m_tcp_socket(context, tcp::v4(), m_sock.GetSocket()) {
      m_tcp_socket.non_blocking(true);
    }
    };



    Now we need to start executing any command on a remote host and, as data arrives from it, give it to some callback.

    voidAsyncRun(conststd::string &command, CallbackType &&callback){
      m_read_callback = std::move(callback);
      auto ec = libssh2_channel_exec(*m_channel, command.c_str());
      TryRead();
    }

    Thus, by running the command, we transfer control to the TryRead () method.

    voidTryRead(){
      if (m_read_in_progress) {
         return;
      }
      m_tcp_socket.async_wait(tcp::socket::wait_read, [this, self = shared_from_this()](auto ec) {
         if (WantRead()) {
            ReadHandler(ec);
         }
         if (m_complete) {
            return;
         }
         TryRead();
      });
    }
    

    First of all, we check if the reading process has already been started by some previous call. If not, we begin to expect the readiness of the socket for reading. As a wait handler, a regular lambda with the capture of shared_from_this () is used.

    Note the WantRead () call. Async_wait, as it turned out, also has its flaws, and can just return on timeout. In order not to perform unnecessary actions in this case, I decided to check the socket through poll without a timeout - whether the socket really wants to read now. If not, then we simply run TryRead () again and wait. Otherwise, we immediately proceed to reading and transmitting data to the callback.

    voidReadHandler(const boost::system::error_code &error){
      if (error != boost::system::errc::success) {
         return;
      }
      m_read_in_progress = true;
      int ec = LIBSSH2_ERROR_EAGAIN;
      std::array<char, 4096> buffer {};
      while ((ec = libssh2_channel_read(*m_channel, buffer.data(), buffer.size())) > 0) {
         std::string tmp;
         boost::range::copy(boost::adaptors::slice(buffer, 0, ec), std::back_inserter(tmp));
         if (m_read_callback != nullptr) {
            m_read_callback(tmp);
         }
      }
       m_read_in_progress = false;
    }
    

    Thus, an infinite asynchronous read loop from the running application starts. The next step for us is to send instructions to the application:

    voidAsyncWrite(conststd::string &data, WriteCallbackType &&callback){
      m_input += data;
      m_write_callback = std::move(callback);
      TryWrite();
    }
    

    The data transferred to the asynchronous recording and callback will be stored inside the connection. And run the next cycle, only this time the record:

    Write cycle
    voidTryWrite(){
      if (m_input.empty() || m_write_in_progress) {
         return;
      }
      m_tcp_socket.async_wait(tcp::socket::wait_write, [this, self = shared_from_this()](auto ec) {
         if (WantWrite()) {
            WriteHandler(ec);
         } 
         if (m_complete) {
            return;
         }
         TryWrite();
      });
    }
    voidWriteHandler(const boost::system::error_code &error){
      if (error != boost::system::errc::success) {
         return;
      }
      m_write_in_progress = true;
      int ec = LIBSSH2_ERROR_EAGAIN;
      while (!m_input.empty()) {
         auto ptr = m_input.c_str();
         auto read_size = m_input.size();
         while ((ec = libssh2_channel_write(*m_channel, ptr, read_size)) > 0) {
            read_size -= ec;
            ptr += ec;
         }
         AssertResult(ec);
         m_input.erase(0, m_input.size() - read_size);
         if (ec == LIBSSH2_ERROR_EAGAIN) {
            break;
         }
      }
      if (m_input.empty() && m_write_callback != nullptr) {
         m_write_callback();
      }
      m_write_in_progress = false;
    }
    


    Thus, we will write data to the channel until all of them are successfully transmitted. Then we will return control to the caller so that you can transfer a new piece of data. So you can not only send instructions to some application on the host, but also, for example, download files of any size in small portions without blocking the thread, which is important.

    With the help of this library, I was able to successfully run a script on a remote server that monitors file system changes, at the same time reading its output and sending various commands. In general: a very valuable experience in adapting the C-style library for a modern C ++ project using Boost.

    I would be happy to read the tips of more experienced users of Boost.Asio to learn more and improve your solution :-).

    Also popular now: