Google protocol buffers random message manager

A free day appeared, and I decided to play around with the google :: protobuf library. This library provides the ability to encode and decode structured data. Based on this library, I will build a simple dispatcher that can process any messages. The peculiarity of this dispatcher is that it will not know the types of messages sent, and will only process messages using registered handlers.

Short description of protobuf library


So, first, briefly consider the google :: protobuf library, it comes in two components:
in fact, the library itself + header files
* .proto file compiler generates a class from the message description (there is also the possibility of generation for other programming languages: Java, Python etc.)
A message description is created in a separate file from which the class will be generated, the syntax is very simple:
package sample.proto;
message ServerStatusAnswer {
    optional int32 threadCount = 1;
    repeated string listeners = 2;
}
Here we describe the ServerStatusAnswer message, which has two optional fields:
  • threadCount - optional integer parameter
  • listeners - an optional string that can be repeated several times
For example, the following message satisfies this description:
ServerStatusAnswer {
    threadCount = 3
    listeners = {
        "one",
        "two"
    }
}
In fact, the protobuf format is binary, here I have given a message in a readable format only for ease of perception.

The compiler automatically generates C ++ code for serializing and deserializing such messages. The protobuf library also provides additional features: serialization to a file, to a stream, to a buffer.

I use CMake as a build system and it already has protobuf support:
cmake_minimum_required(VERSION 2.8)
project(ProtobufTests)
find_package(Protobuf REQUIRED)
include_directories(${PROTOBUF_INCLUDE_DIRS})
include_directories(${CMAKE_CURRENT_BINARY_DIR})
#...
set (ProtobufTestsProtoSources
    Message.proto
    ServerStatus.proto
    Echo.proto
)
#...
PROTOBUF_GENERATE_CPP(PROTO_SRCS PROTO_HDRS ${ProtobufTestsProtoSources})
add_executable(ProtobufTests ${ProtobufTestsSources} ${PROTO_SRCS} ${PROTO_HDRS})
target_link_libraries(ProtobufTests
    #...
    ${PROTOBUF_LIBRARY}
)
PROTOBUF_GENERATE_CPP - this macro calls the protoc compiler for each * .proto file, and generates the corresponding cpp and h files that are added to the assembly.
Everything is done automatically, and you don’t need to do any additional squats (Under * nix you may need an additional Threads package and the corresponding flag for the linker).

Manager Description


I decided to try writing a message manager that takes some kind of message, calls the appropriate handler and sends a response to the received message. In this case, the dispatcher should not know the types of messages sent to him. This may be necessary if the dispatcher adds or removes appropriate handlers during operation (for example, by loading the appropriate extension model, * .dll, * .so).

In order to process arbitrary messages, we must have a class that processes an abstract message. Obviously, if we have message descriptions in the * .proto file, then the compiler will generate the corresponding classes for us, but unfortunately they will all be inherited from google :: protobuf :: Message. For this class, it is problematic to extract all the data from the message (this can be done in principle, but then we will do a lot of unnecessary work), and we won’t know how to form the answer.
The statement comes to the rescue: "Any problem can be solved by introducing an additional level of abstraction, except for the problem of too many levels of abstraction."
We need to separate the definition of the type of message from the message itself, we can do this in the following way:
package sample.proto;
message Message {
    required string id = 1;
    optional bytes data = 2;
}
We will pack our message inside another message:
  • required id field contains a unique identifier for the message
  • optional data field contains our message
Thus, our dispatcher will search for the corresponding message handler in the id field:
#ifndef MESSAGEDISPATCHER_H
#define MESSAGEDISPATCHER_H
#include 
#include 
#include 
#include 
#include "message.pb.h"
class MessageProcessingError: public std::runtime_error
{
public:
    MessageProcessingError(const std::string & e): std::runtime_error(e)
    {
    }
};
class MessageProcessorBase: private boost::noncopyable
{
public:
    virtual ~MessageProcessorBase()
    {
    }
    virtual std::string id() const = 0;
    virtual sample::proto::Message process(const sample::proto::Message & query) = 0;
};
typedef boost::shared_ptr MessageProcessorBasePtr;
class MessageDispatcher
{
public:
    MessageDispatcher();
    void addProcessor(MessageProcessorBasePtr processor);
    sample::proto::Message dispatch(const sample::proto::Message & query);
    typedef std::map DispatcherImplType;
    const DispatcherImplType & impl() const;
private:
    DispatcherImplType mImpl;
};
#endif // MESSAGEDISPATCHER_H
But now we get that each handler must unpack the sample :: proto :: Message message into its own message. And this process will be duplicated for each such handler. We want to avoid code duplication, so let's take the Type Erasure pattern . This pattern allows you to hide the type of entity being processed behind a common interface, however, each handler will work with a specific type known only to it.

So the implementation is very simple:
template 
class ProtoMessageProcessor: public MessageProcessorBase
{
public:
    virtual sample::proto::Message process(const sample::proto::Message & query)
    {
        ProtoQueryT underlyingQuery;
        if (!underlyingQuery.ParseFromString(query.data()))
        {
            throw MessageProcessingError("Failed to parse query: " +
                query.ShortDebugString());
        }
        ProtoAnswerT underlyingAnswer = doProcessing(underlyingQuery);
        sample::proto::Message a;
        a.set_id(query.id());
        if (!underlyingAnswer.SerializeToString(a.mutable_data()))
        {
            throw MessageProcessingError("Failed to prepare answer: " +
                underlyingAnswer.ShortDebugString());
        }
        return a;
    }
private:
    virtual ProtoAnswerT doProcessing(const ProtoQueryT & query) = 0;
};
We define the virtual function process , but also add the virtual function doProcess , which already works with our specific messages ! This technique is based on the mechanism of instantiation of templates: types are substituted at the time of actual use of the template, and not at the time of declaration. And since this class is inherited from MessageProcessorBase, we can safely pass the descendants of this class to our dispatcher. It is also necessary to note that this class serializes and deserializes our specific messages and throws exceptions in case of errors.

Well, finally, I will give an example of using this dispatcher, for example, we have two types of messages:
package sample.proto;
message ServerStatusQuery {
}
message ServerStatusAnswer {
    optional int32 threadCount = 1;
    repeated string listeners = 2;
}

package sample.proto;
message EchoQuery {
    required string msg = 1;
}
message EchoAnswer {
    required string echo = 1;
}
As you can see from the description, these messages request the server's internal state (ServerStatus), and simply returns the received request (Echo). The implementation of the handlers themselves is trivial, I will give an implementation of only ServerStatus:
#ifndef SERVERSTATUSMESSAGEPROCESSOR_H
#define SERVERSTATUSMESSAGEPROCESSOR_H
#include "MessageDispatcher.h"
#include "ServerStatus.pb.h"
class ServerStatusMessageProcessor:
        public ProtoMessageProcessor
{
public:
    typedef sample::proto::ServerStatusQuery query_type;
    typedef sample::proto::ServerStatusAnswer answer_type;
    ServerStatusMessageProcessor(MessageDispatcher * dispatcher);
    virtual std::string id() const;
private:
    MessageDispatcher * mDispatcher;
    virtual answer_type doProcessing(const query_type & query);
};
#endif // SERVERSTATUSMESSAGEPROCESSOR_H
The implementation itself:
#include "ServerStatusMessageProcessor.h"
using namespace sample::proto;
ServerStatusMessageProcessor::ServerStatusMessageProcessor(MessageDispatcher * dispatcher)
    : mDispatcher(dispatcher)
{
}
std::string ServerStatusMessageProcessor::id() const
{
    return "ServerStatus";
}
ServerStatusAnswer ServerStatusMessageProcessor::doProcessing(const ServerStatusQuery & query)
{
    ServerStatusAnswer s;
    s.set_threadcount(10);
    typedef MessageDispatcher::DispatcherImplType::const_iterator md_iterator;
    const MessageDispatcher::DispatcherImplType & mdImpl = mDispatcher->impl();
    for (md_iterator it = mdImpl.begin(); it != mdImpl.end(); ++it)
    {
        s.add_listeners(it->first);
    }
    return s;
}
Here's how it works:
#include "MessageDispatcher.h"
#include "ServerStatusMessageProcessor.h"
#include "EchoMessageProcessor.h"
#include 
#include 
using namespace sample::proto;
int main()
{
    try
    {
        MessageDispatcher md;
        md.addProcessor(boost::make_shared(&md));
        md.addProcessor(boost::make_shared());
        Message q;
        q.set_id("ServerStatus");
        Message ans = md.dispatch(q);
        std::cout << "query:  " << q.DebugString() << std::endl;
        std::cout << "answer: " << ans.DebugString() << std::endl;
    }
    catch (const std::exception & e)
    {
        std::cerr << e.what() << std::endl;
    }
    return 0;
}

PS To write this article were used:
  • gcc-4.4.5-linux
  • cmake-2.8.2
  • boost-1.42
  • protobuf-2.3.0

Example posted on github

Also popular now: