PHP-AMQP version 2
In the article New ideas on API RabbitMQ AMQP for PHP , a sketch on the PHP-AMQP API was published.
In continuation of the previously published ideas, I present their implementation, which is more OOP than the first version.
extension code can be found here the description of the project and svn while the old versions (1.0), translated into English a
brief description of version 2.0:
APMQConection :: APMQConection ([array params])
Parameters (all optional):
The exception is there is no logical or physical connection.
Example:
creation of exchange, if the name is set otherwise the initialization of the class
AMQPExchange :: AMQPExchange (APMQConection cnn, string [name])
name - exchange name
Example:
proto bool AMQPEexchange :: declare ([string name], [string type = direct], [bit params]);
name - exchange name
type - exchange type, types are allowed: direct, topic & fanout
params - parameters:
returns the result of the operation
Example:
proto bool AMQPExchange :: delete ([string name], [bit params]);
name - exchange name
params - parameters:
returns the result of the operation
Example:
proto bool AMQPExchange :: bind (string queueName, string routingKey);
queueName - queue name
key - routing-key, route key, string
returns the result of the operation
Example:
proto bool AMQPExchange :: publish (string msg, [string key], bit [parms]);
Publication of a message with the key key for the type of exchange topic or direct
msg - message, line
key - routing-key, route key, line
params - parameters:
returns the result of the operation
Example:
AMQPQueue :: AMQPQueue (AMQPConnection cnn, string [name])
name - the name of the queue
proto int AMQPQueue :: declare (string [name], bit [params])
name - queue name
params - parameters:
returns the number of elements in the queue if the queue already exists.
Example:
proto bool AMQPQueue :: delete (string [name], bit [params])
name - queue name
params - parameters:
returns the result of the operation
Example:
proto bool AMQPQueue :: bind (string exchangeName, string routingKey);
name - exchange name
routingkey - route key
Example:
proto array AMQPQueue :: consume (int n);
get an array of n-messages from the queue (all others are discarded)
n - number of received messages
params - parameters:
Attention!
the number of received messages cannot exceed the total number of messages in the queue, otherwise the API will wait for all messages from the broker to receive.
If you specify the number of messages that are less than currently in the queue, then all unselected messages will be marked as selected, that is, they will be lost when re-reading from the queue if the AMQP_NOACK flag is not set.
Example:
proto bool AMQPQueue :: unbind (string exchangeName, string routingKey);
disconnects the current queue from cheating exchangeName for the routing key routingKey
name - exchange name
routingkey - the routing key
returns the result of the operation
proto bool AMQPQueue :: purge (string [name])
All messages in the queue are discarded, the queue itself remains.
name - the queue name
returns the result of the operation
Example:
proto array AMQPQueue :: get (string [name], bit [params])
name - queue name
params - parameters:
returns an associative array:
In continuation of the previously published ideas, I present their implementation, which is more OOP than the first version.
extension code can be found here the description of the project and svn while the old versions (1.0), translated into English a
brief description of version 2.0:
Class AMQPConnection - opening a logical connection, including channel connection.
Constructor:
APMQConection :: APMQConection ([array params])
Parameters (all optional):
- host = [localhost]
- port = [5672]
- login = [guest]
- psw = [guest]
- vhost = [/]
The exception is there is no logical or physical connection.
Example:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1', 'vhost'=>'v1' ) );
AMQPExchange Exchange Class
constructor:
creation of exchange, if the name is set otherwise the initialization of the class
AMQPExchange :: AMQPExchange (APMQConection cnn, string [name])
name - exchange name
Example:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');
Exchange Ad:
proto bool AMQPEexchange :: declare ([string name], [string type = direct], [bit params]);
name - exchange name
type - exchange type, types are allowed: direct, topic & fanout
params - parameters:
- AMQP_PASSIVE
- AMQP_DURABLE
- AMQP_AUTODELETE
- AMQP_INTERNAL
returns the result of the operation
Example:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn);
$exchange->declare('ex_name', 'topic',AMQP_DURABLE );
Delete exchange:
proto bool AMQPExchange :: delete ([string name], [bit params]);
name - exchange name
params - parameters:
- AMQP_IFUNUSED
returns the result of the operation
Example:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');
$res = $exchange->delete();
// если имя не задано - удаляется с текущим именем, объявленного в конструкторе класса.
Queuing an exchange:
proto bool AMQPExchange :: bind (string queueName, string routingKey);
queueName - queue name
key - routing-key, route key, string
returns the result of the operation
Example:
$msg = "моя новость, раздел СПб...";
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');
$exchange->bind('mylogin','spb.news')
$res = $exchange->publish( $msg, 'spb.news');
Publication:
proto bool AMQPExchange :: publish (string msg, [string key], bit [parms]);
Publication of a message with the key key for the type of exchange topic or direct
msg - message, line
key - routing-key, route key, line
params - parameters:
- AMQP_MANDATORY
- AMQP_IMMEDIATE
returns the result of the operation
Example:
$msg = "новости из СПб...";
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$exchange = new AMQPExchange($cnn, 'ex_name');
$res = $exchange->publish( $msg, 'spb.news');
AMQPQueue Queue Class
constructor - class initialization
AMQPQueue :: AMQPQueue (AMQPConnection cnn, string [name])
name - the name of the queue
Queue Announcement
proto int AMQPQueue :: declare (string [name], bit [params])
name - queue name
params - parameters:
- AMQP_AUTODELETE (default)
- AMQP_DURABLE
- AMQP_PASSIVE
- AMQP_EXCLUSIVE
returns the number of elements in the queue if the queue already exists.
Example:
$cnn = new APMQConection(array ('port'=>5673, 'login' => 'sector1' ));
$queue = new AMQPQueue($cnn,'chat_12');
$queue->declare('chat_12', AMQP_AUTEDELETE | AMQP_DURABLE);
Delete queue
proto bool AMQPQueue :: delete (string [name], bit [params])
name - queue name
params - parameters:
- AMQP_IFUNUSED
- AMQP_IFEMPTY
returns the result of the operation
Example:
$queue = new AMQPQueue(new APMQConection(),'chat_12');
$queue->delete();
Linking a queue to an exchange
:proto bool AMQPQueue :: bind (string exchangeName, string routingKey);
name - exchange name
routingkey - route key
Example:
// Привязка очереди 'mylogin' к обмену 'ex_estate' через ключ '*.spb'
$queue = new AMQPQueue(APMQConection(), 'mylogin');
$queue->declare();
$queue->bind('ex_estate','*.spb');
Subscribe
proto array AMQPQueue :: consume (int n);
get an array of n-messages from the queue (all others are discarded)
n - number of received messages
params - parameters:
- AMQP_NOLOCAL
- AMQP_NOACK
- AMQP_EXCLUSIVE
Attention!
the number of received messages cannot exceed the total number of messages in the queue, otherwise the API will wait for all messages from the broker to receive.
If you specify the number of messages that are less than currently in the queue, then all unselected messages will be marked as selected, that is, they will be lost when re-reading from the queue if the AMQP_NOACK flag is not set.
Example:
$i=0;
$queue = new AMQPQueue(APMQConection());
$n = $queue->declare('mylogin');
$queueMessages = $queue->consume( $n );
foreach($queueMessages as $item){
$i++;
echo "$i.$item";
}
Unsubscribe
proto bool AMQPQueue :: unbind (string exchangeName, string routingKey);
disconnects the current queue from cheating exchangeName for the routing key routingKey
name - exchange name
routingkey - the routing key
returns the result of the operation
Queue Reset
proto bool AMQPQueue :: purge (string [name])
All messages in the queue are discarded, the queue itself remains.
name - the queue name
returns the result of the operation
Example:
$queue = new AMQPQueue(new APMQConection());
$queue->purge('chat_12');
Get Queue Item
proto array AMQPQueue :: get (string [name], bit [params])
name - queue name
params - parameters:
- AMQP_NOASK (default)
returns an associative array:
- msg - current message
- count - the number of messages remaining in the queue, if:
- count = 0 - this message is the last in the queue
- count = -1 the queue is empty, the msg key is missing
When comparing performance, the consume () method is faster than get (),
but the get method is more reliable.