RabbitMQ tutorials in C ++
The tutorials section on rabbitmq.com provides examples of implementations in various languages, but there are no C ++ among them. Under the cat, links to translated manuals, materials and code under the spoiler are collected.
If it’s more convenient to view the code from under the GitHub interface, you can immediately go to the repository .
This material uses the client implementation of AMQP-CPP and POCO C ++ to work with the socket.
"RabbitMQ tutorial 1 - Hello World"
“RabbitMQ tutorial 2 - Task Queue”
“RabbitMQ tutorial 3 - Publish / Subscribe”
"RabbitMQ tutorial 4 - Routing"
"RabbitMQ tutorial 5 - Topics"
"RabbitMQ tutorial 6 - Remote Procedure Call"
UPD: 2015.04.09 fix: prefetch count is installed correctly; work tutorial 2; code is built under g ++ 4.7
If it’s more convenient to view the code from under the GitHub interface, you can immediately go to the repository .
This material uses the client implementation of AMQP-CPP and POCO C ++ to work with the socket.
"RabbitMQ tutorial 1 - Hello World"
receive.cpp
#include<iostream>#include"SimplePocoHandler.h"intmain(void){
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
channel.declareQueue("hello");
channel.consume("hello", AMQP::noack).onReceived(
[](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
std::cout <<" [x] Received "<<message.message() << std::endl;
});
std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
handler.loop();
return0;
}
send.cpp
#include<iostream>#include"SimplePocoHandler.h"intmain(void){
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
channel.onReady([&]()
{
if(handler.connected())
{
channel.publish("", "hello", "Hello World!");
std::cout << " [x] Sent 'Hello World!'" << std::endl;
handler.quit();
}
});
handler.loop();
return0;
}
“RabbitMQ tutorial 2 - Task Queue”
worker.cpp
#include<iostream>#include<algorithm>#include<thread>#include<chrono>#include"SimplePocoHandler.h"intmain(void){
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
channel.setQos(1);
channel.declareQueue("task_queue", AMQP::durable);
channel.consume("task_queue").onReceived(
[&channel](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
constauto body = message.message();
std::cout<<" [x] Received "<<body<<std::endl;
size_t count = std::count(body.cbegin(), body.cend(), '.');
std::this_thread::sleep_for (std::chrono::seconds(count));
std::cout<<" [x] Done"<<std::endl;
channel.ack(deliveryTag);
});
std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
handler.loop();
return0;
}
new_task.cpp
#include<iostream>#include"SimplePocoHandler.h"#include"tools.h"intmain(int argc, constchar* argv[]){
conststd::string msg =
argc > 1 ? join(&argv[1], &argv[argc], " ") : "Hello World!";
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
AMQP::QueueCallback callback =
[&](conststd::string &name, int msgcount, int consumercount)
{
AMQP::Envelope env(msg);
env.setDeliveryMode(2);
channel.publish("", "task_queue", env);
std::cout<<" [x] Sent '"<<msg<<"'\n";
handler.quit();
};
channel.declareQueue("task_queue", AMQP::durable).onSuccess(callback);
handler.loop();
return0;
}
“RabbitMQ tutorial 3 - Publish / Subscribe”
receive_logs.cpp
#include<iostream>#include"SimplePocoHandler.h"intmain(void){
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
auto receiveMessageCallback = [](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
std::cout <<" [x] "<<message.message() << std::endl;
};
AMQP::QueueCallback callback =
[&](conststd::string &name, int msgcount, int consumercount)
{
channel.bindQueue("logs", name,"");
channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);
};
AMQP::SuccessCallback success = [&]()
{
channel.declareQueue(AMQP::exclusive).onSuccess(callback);
};
channel.declareExchange("logs", AMQP::fanout).onSuccess(success);
std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
handler.loop();
return0;
}
emit_log.cpp
#include<iostream>#include"SimplePocoHandler.h"#include"tools.h"intmain(int argc, constchar* argv[]){
conststd::string msg =
argc > 1 ? join(&argv[1], &argv[argc], " ") : "info: Hello World!";
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
channel.declareExchange("logs", AMQP::fanout).onSuccess([&]()
{
channel.publish("logs", "", msg);
std::cout << " [x] Sent "<<msg<< std::endl;
handler.quit();
});
handler.loop();
return0;
}
"RabbitMQ tutorial 4 - Routing"
receive_logs_direct.cpp
#include<iostream>#include<algorithm>#include"SimplePocoHandler.h"intmain(int argc, constchar* argv[]){
if(argc==1)
{
std::cout<<"Usage: "<<argv[0]<<" [info] [warning] [error]"<<std::endl;
return1;
}
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
channel.declareExchange("direct_logs", AMQP::direct);
auto receiveMessageCallback =
[](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
std::cout <<" [x] "
<<message.routingKey()
<<":"
<<message.message()
<< std::endl;
};
AMQP::QueueCallback callback = [&](conststd::string &name,
int msgcount,
int consumercount)
{
std::for_each(&argv[1],
&argv[argc],
[&](constchar* severity)
{
channel.bindQueue("direct_logs","", severity);
channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);
});
};
channel.declareQueue(AMQP::exclusive).onSuccess(callback);
std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
handler.loop();
return0;
}
emit_log_direct.cpp
#include<iostream>#include"SimplePocoHandler.h"#include"tools.h"intmain(int argc, constchar* argv[]){
conststd::string severity = argc > 2 ? argv[1] : "info";
conststd::string msg =
argc > 2 ? join(&argv[2], &argv[argc], " ") : "Hello World!";
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
channel.declareExchange("direct_logs", AMQP::direct).onSuccess([&]()
{
channel.publish("direct_logs", severity, msg);
std::cout << " [x] Sent "<<severity<<":"<<msg<< std::endl;
handler.quit();
});
handler.loop();
return0;
}
"RabbitMQ tutorial 5 - Topics"
receive_logs_topic.cpp
#include<iostream>#include<algorithm>#include"SimplePocoHandler.h"intmain(int argc, constchar* argv[]){
if(argc==1)
{
std::cout<<"Usage: "<<argv[0]<<" [binding_key]..."<<std::endl;
return1;
}
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
channel.declareExchange("topic_logs", AMQP::topic);
auto receiveMessageCallback =
[](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
std::cout <<" [x] "
<<message.routingKey()
<<":"
<<message.message()
<< std::endl;
};
AMQP::QueueCallback callback = [&](conststd::string &name,
int msgcount,
int consumercount)
{
std::for_each(&argv[1],
&argv[argc],
[&](constchar* bindingKeys)
{
std::cout<<bindingKeys<<std::endl;
channel.bindQueue("topic_logs",name, bindingKeys);
channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);
});
};
channel.declareQueue(AMQP::exclusive).onSuccess(callback);
std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
handler.loop();
return0;
}
emit_log_topic.cpp
#include<iostream>#include"SimplePocoHandler.h"#include"tools.h"intmain(int argc, constchar* argv[]){
conststd::string msg =
argc > 1 ? join(&argv[2], &argv[argc], " ") : "Hello World!";
conststd::string routing_key = argc > 1 ? argv[1] : "anonymous.info";
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
channel.declareExchange("topic_logs", AMQP::topic).onSuccess([&]()
{
channel.publish("topic_logs", routing_key, msg);
std::cout << " [x] Sent "<<routing_key<<":"<<msg<< std::endl;
handler.quit();
});
handler.loop();
return0;
}
"RabbitMQ tutorial 6 - Remote Procedure Call"
rpc_server.cpp
#include<iostream>#include<algorithm>#include<thread>#include<chrono>#include"SimplePocoHandler.h"intfib(int n){
switch (n)
{
case0:
return0;
case1:
return1;
default:
return fib(n - 1) + fib(n - 2);
}
}
intmain(void){
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
channel.setQos(1);
channel.declareQueue("rpc_queue");
channel.consume("").onReceived([&channel](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
constauto body = message.message();
std::cout<<" [.] fib("<<body<<")"<<std::endl;
AMQP::Envelope env(std::to_string(fib(std::stoi(body))));
env.setCorrelationID(message.correlationID());
channel.publish("", message.replyTo(), env);
channel.ack(deliveryTag);
});
std::cout << " [x] Awaiting RPC requests" << std::endl;
handler.loop();
return0;
}
rpc_client.cpp
#include<iostream>#include"tools.h"#include"SimplePocoHandler.h"intmain(int argc, constchar* argv[]){
conststd::stringcorrelation(uuid());
SimplePocoHandler handler("localhost", 5672);
AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
AMQP::Channel channel(&connection);
AMQP::QueueCallback callback = [&](conststd::string &name,
int msgcount,
int consumercount)
{
AMQP::Envelope env("30");
env.setCorrelationID(correlation);
env.setReplyTo(name);
channel.publish("","rpc_queue",env);
std::cout<<" [x] Requesting fib(30)"<<std::endl;
};
channel.declareQueue(AMQP::exclusive).onSuccess(callback);
auto receiveCallback = [&](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
if(message.correlationID() != correlation)
return;
std::cout<<" [.] Got "<<message.message()<<std::endl;
handler.quit();
};
channel.consume("", AMQP::noack).onReceived(receiveCallback);
handler.loop();
return0;
}
UPD: 2015.04.09 fix: prefetch count is installed correctly; work tutorial 2; code is built under g ++ 4.7