PHP and Erlang interaction through RabbitMQ


The more you program in php, the more often you come across tasks for which you need a daemon on the server. Yes, of course there is phpDaemon, cron or crutches, which, with every nth script run, cause a certain set of operations. But when we talk about projects with a load more than on a regular site, we begin to get upset.

In one of the projects, we decided to use a bunch of php + RabbitMQ + erlang to solve this problem. The necessary functionality was already written in php, we only had to distribute the calls in time and on different machines. Specifically, the task was as follows: write a user parser from an external data store and, most importantly, maintain the data up-to-date, and in case of change, send notifications.

Initial data

- Rest-function in php, adding a user to favorites. From this list in the future, users will be formed whose information we will maintain up to date, and in case of change, send notifications to the client.
- An application on erlange, which should coordinate which users we are currently processing, etc. Information processing from an external source is carried out in two ways - this is parsing html pages or, where possible, api requests. To process responses, a rest function written in php is used.
- We must support the ability to easily scale to a large number of servers. The list of users to parse are in the queues formed by RabbitMQ.

Task number 1 - configure php extension to work with RabbitMQ

First of all, we install additional software packages:

apt-get install gcc
apt-get install php5-dev

Further, the installation itself, found on the Internet:

#download and install the rabbitmq c amqp lib
tar -zxvf rabbitmq-c-0.5.1.tar.gz
cd rabbitmq-c-0.5.1/
sudo make install
cd ..
#download and compile the amqp
tar -zxvf amqp-1.4.0.tgz
cd amqp-1.4.0/
phpize && ./configure --with-amqp && make && sudo make install
#Add amqp extension to php mods-availabile directory
echo "" > /etc/php5/mods-available/amqp.ini
#Enabled it in cli
cd /etc/php5/cli/conf.d/
ln -s ../../mods-available/amqp.ini 20-amqp.ini
php -m | grep amqp
#Enabled it in cli
cd /etc/php5/apache2/conf.d/
ln -s ../../mods-available/amqp.ini 20-amqp.ini
#restart Apache and than check phpinfo on web
service apache2 restart

If you are lucky, then everything is set correctly.

Task number 2 - install RabbitMQ and its web-based control panel

sudo apt-get install rabbitmq-server
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl stop
rabbitmq-server -detached

Then at this address you get access to queue management, by default login (guest) and password (guest)
ip.addres : 15672 /

Task number 3 - through php influence RabbitMQ queues

//создание точки обмена
$rabbit = new AMQPConnection(array('host' => '', 'port' => '5672', 'login' => 'guest', 'password' => 'guest'));
$testChannel = new AMQPChannel($rabbit);
$exchange = new AMQPExchange($testChannel);
//создания очереди
$testChannel = new AMQPChannel($rabbit);
$queue = new AMQPQueue($testChannel); 
//привязываем очередь к точки обмена
$testChannel = new AMQPChannel($rabbit);
$queue = new AMQPQueue($testChannel); 
//отправить сообщение на точку
$testChannel = new AMQPChannel($rabbit);
$exchange = new AMQPExchange($testChannel);

Task number 4 - Create an application on erlange using rebar

Install rebar
apt-get install rebar
mkdir 1
rebar create template=simpleapp   srvid=my_server46
rebar create template=simplesrv   srvid=my_server46

Task number 5 - Run the application on erlange

First install CMake:

apt-get install make

In the Makefile, write the following:

	rebar compile
	ERL_LIBS=deps erl +K true -name myapp_app@ -boot start_sasl -pa ebin -s myapp_app -sasl errlog_type error

The line -pa ebin -s myapp_app means that we run ebin / myapp_app.erl and in it the function myapp_app: start ().
ERL_LIBS = deps means that we load all the libraries that are located in the deps folder.

Task number 6 - to connect the necessary libraries for communication RabbitMQ and Erlang

In rebar.config we put the following:

{deps, [
	{rabbit_common, ".*", {git, "git://", {tag, "rabbitmq-3.0.2"}}}
{erl_opts, [
%  warnings_as_errors

Run rebar get-deps, which pulls up dependencies. Further, difficulties arose with the remaining libraries, so I had to use what was written on the official RabbitMQ website. But before these we install the necessary packages:

apt-get install xsltproc
apt-get install zip

After we go into the deps folder that rebar created and, using git, download everything, and then install:

cd deps
git clone
git clone
git clone
cd rabbitmq-erlang-client

Task number 7 - from Erlang to receive messages from RabbitMQ queues

We leave the myapp_app.erl file a little bit editable so that we can run it from the makefile, which we wrote:

-export([start/0,start/2, stop/1]).
start() ->
start(_StartType, _StartArgs) ->
stop(_State) ->

The file myapp_sup.erl, which is responsible for monitoring processes, appends the call to our module from init:

-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
start_link() ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
    {ok, { {one_for_one, 5, 10}, [{my_server46_0, {my_server46, start_link, []},permanent, brutal_kill, worker, [my_server46]}]} }.

The module responsible for communicating with RabbitMQ:

-define(SERVER, ?MODULE).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).
start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
init(Args) ->
  {ok, Args}.
main() ->
  {ok, Connection} =
    amqp_connection:start(#amqp_params_network{host = "localhost"}),
  {ok, Channel} = amqp_connection:open_channel(Connection),
  io:format(" [*] Waiting for messages. To exit press CTRL+C~n"),
  amqp_channel:call(Channel, #'basic.qos'{prefetch_count = 1}),
  amqp_channel:subscribe(Channel, #'basic.consume'{queue = <<"yyyyyyy2">>},self()),
    #'basic.consume_ok'{} -> io:fwrite(" _rec_main_ok_ "),ok
  io:fwrite("begin~n", []).
    {#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{payload = Body}} ->
      Dots = length([C || C <- binary_to_list(Body), C == $.]),
      io:format(" [x] Received Body ~p~n", [Body]),
        Dots*1000 -> io:format(" _loop_rec_after_ ~p",[0]), ok
      amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
      io:format(" [x] Done 3~n")
handle_call(_Request, _From, State) ->
    {reply, ok, State}.
handle_cast(_Msg, State) ->
    {noreply, State}.
handle_info(_Info, State) ->
    {noreply, State}.
terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

Everything is quite simple here, we subscribe to the queue "yyyyyyy2":

amqp_channel:subscribe(Channel, #'basic.consume'{queue = <<"yyyyyyy2">>},self())

Then we inform RabbitMQ that the message was processed successfully:

amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag})

Also popular now: