RabbitMQ - SQL Server
- Transfer
A week or two ago, I saw a message on the RabbitMQ Users forum about how to get messages sent from SQL Server to RabbitMQ. Since we are working closely with this at Derivco , I left some suggestions there and also said that I was writing a blog post about how this can be done. Part of my message was not entirely correct - at least until this point (sorry, Bro, was very busy).
Awesome, this is your SQL Server. With it, it is very easy to put information into a database. Retrieving data from a database using a query is just as easy. But getting just the updated or inserted data is already a little more difficult. Think about real-time events; A purchase is made - someone needs to be notified at the same moment as soon as this has happened. Perhaps someone will say that such data should not be pushed out of the database, but from somewhere else. Of course, the way it is, but quite often we simply have no choice.
We have had a task: send the event of a database for further processing outside and there was a question - how to do it?
During the existence of SQL Server, there have been several attempts to organize communications outside the database; SQL Server Notification Services (NS), which appeared in SQL Server 2000, and then, in SQL Server 2005, appeared SQL Server Service Broker (SSB). I described them in my book A First Look at SQL Server 2005 for Developers , along with Bob Boshman and Dan Sullivan. NS appeared in SQL Server 2000, as I said, and was redesigned in beta version of SQL Server 2005. However, from the ready-for-sale (RTM) version of SQL Server 2005, NS wascut out completely.
What we came to as a result was based on the SQLCLR technology. SQLCLR is a .NET platform embedded in the SQL Server kernel and with its help you can execute .NET code inside the kernel. Since we are executing .NET code, we are able to do almost everything in a regular .NET application.
Creating an assembly
Code snippet 1: Creating an assembly in an absolute path
The code performs the following actions:
Code snippet 2: wrapping the .NET method
The code performs the following actions:
When executing the procedure
RMQ is an open source message broker that implements the Advanced Message Queuing Protocol (AMQP) and is written in Erlang.
Since RMQ is a message broker, AMQP client libraries are required to connect to it. The application refers to the client libraries and with their help opens the connection and sends messages - such as, for example, a call is made through ADO.NET to SQL Server. But unlike ADO.NET, where, most likely, the connection is opened each time the database is accessed, the connection remains open during the entire period of the application.
Thus, in order to be able to interact from the database with RabbitMQ we need the application and the client .NET library for RabbitMQ.
RabbitMQ.SqlServer is an assembly that uses the client .NET library for RabbitMQ and provides the ability to send messages from the database to one or several RabbitMQ endpoints (VHosts and exchangers). The code can be downloaded / posted from my RabbitMQ-SqlServer repository on GitHub. It contains source codes of assembly and installation files (i.e. you do not have to compile them yourself).
When the assembly is loaded, or when an explicit call to initialize or indirect circulation at the time of call wrapper procedures, assembly loads connection string in a local database into which it has been installed as endpoints RabbitMQ, to which it is connected:
Connection
Code snippet 3: connection to the endpoint
At the same time, the connection part to the endpoint also creates IModels on the connection, and they are used when sending (adding to the queue) messages:
Sending a message
The method
Post call method
Code snippet 5: Method representation in the form of a procedure.
When the method is executed, it is assumed that the caller sends the identifier of the end point to which the message is to be sent, and, in fact, the message itself. If the value -1 is passed as the identifier of the end point, then we iterate over all the points and send a message to each of them. The message comes in the form of a string from which we get the bytes with
To install and run the example, you need all the files in the folder
Before proceeding, you need to have a running instance of the RabbitMQ broker and VHost (by default, VHost is represented as /). As a rule, we have several vhost, just for isolation. This host also needs an exchanger, in the example we use
RabbitMQ endpoint
Code Snippet 6: Creating an Endpoint in RabbitMQ
At this stage, it is time to deploy the assemblies. There are differences in the deployments for SQL Server versions prior to SQL Server 2014 (2005, 2008, 2008R2, 2012), and for 2014 and higher. The difference is in the supported version of the CLR. Before SQL Server 2014, the .NET platform ran in the CLR version 2, and in SQL Server 2014 and higher, version 4 is used.
Let's start with the versions of SQL Server that run on CLR 2, since there are some special features there. We need to deploy the created assembly, and at the same time deploy the Rabbit .NET client library RabbitMQ (
All recent versions of the library are
Figure 1: System.ServiceModel assembly is missing
This version
As you know, RabbitMQ is open source, well, and we are developers, right? ;) So let's recompile! In the version before the latest releases (i.e. version <3.5.0),
In this example, I did not use client version 3.4.3, but took the stable release 3.6.6 and recompiled using .NET 3.5 (CLR 2). It almost worked :), except that later releases
Fortunately, there is a version
In SQL Server 2014 and later, the assembly is compiled under .NET 4.XX (my example is 4.5.2), and you can refer to any of the latest versions
To install, run the scripts from the directory
To create the procedures that will be used from our assembly (3.5 or 4), run the script
The script also creates a simple T-SQL procedure -
From all of the above, it is clear that in order to send messages to RabbitMQ, we call
What we do in working environments — in stored procedures that process data that needs to be sent to RabbitMQ, we collect data to send and in a connection block we call a procedure similar to that
Data processing procedure
In the code snippet 7, we see how the necessary data is captured and processed in the procedure and sent after processing. To use this procedure, run the script
At this point, you should be ready to send several messages. Before testing, make sure you have queues in RabbitMQ that are attached to the endpoint's exchanger
So, to start you need to do the following:
Open the file
Execute
to initialize the build and load RabbitMQ endpoints. This is not a required step, but it is recommended to preload the assembly after it is created or modified.
Execute
(you can use any other identifier you like).
If everything worked out without errors, then a message should appear in the RabbitMQ queue! So you used SQLCLR to send a message to RabbitMQ.
Congrating!
Awesome, this is your SQL Server. With it, it is very easy to put information into a database. Retrieving data from a database using a query is just as easy. But getting just the updated or inserted data is already a little more difficult. Think about real-time events; A purchase is made - someone needs to be notified at the same moment as soon as this has happened. Perhaps someone will say that such data should not be pushed out of the database, but from somewhere else. Of course, the way it is, but quite often we simply have no choice.
We have had a task: send the event of a database for further processing outside and there was a question - how to do it?
SQL Server and external communications
During the existence of SQL Server, there have been several attempts to organize communications outside the database; SQL Server Notification Services (NS), which appeared in SQL Server 2000, and then, in SQL Server 2005, appeared SQL Server Service Broker (SSB). I described them in my book A First Look at SQL Server 2005 for Developers , along with Bob Boshman and Dan Sullivan. NS appeared in SQL Server 2000, as I said, and was redesigned in beta version of SQL Server 2005. However, from the ready-for-sale (RTM) version of SQL Server 2005, NS was
Note: If you read the book, you will find a number of features there that were not in the RTM version.SSB survived, and Microsoft introduced Service Broker External Activator (EA) in the SQL Server 2008 Feature Pack . It allows SSB to interact outside the local database. In theory, it sounds good, but in practice it is cumbersome and confused. We conducted several tests and quickly realized that he was not fulfilling what we needed. In addition, SSB did not give us the performance that was needed, so we had to invent something else.
SQLCLR
What we came to as a result was based on the SQLCLR technology. SQLCLR is a .NET platform embedded in the SQL Server kernel and with its help you can execute .NET code inside the kernel. Since we are executing .NET code, we are able to do almost everything in a regular .NET application.
Note: I wrote “almost” above, because in fact there are some limitations. In this context, these restrictions have little effect on what we are going to do.The principle of SQLCLR operation is as follows: the code is compiled into a dll library, and then this library is registered by means of SQL Server:
Creating an assembly
CREATEASSEMBLY [RabbitMQ.SqlServer]
AUTHORIZATION rmq
FROM'F:\some_path\RabbitMQSqlClr4.dll'WITH PERMISSION_SET = UNSAFE;
GO
Code snippet 1: Creating an assembly in an absolute path
The code performs the following actions:
CREATE ASSEMBLY
- creates an assembly with the specified name (no matter what it should be).AUTHORIZATION
- indicates the owner of the assembly. In this case, rmq is a predefined SQL Server role.FROM
- determines where the original assembly is located. The proposalFROM
can also specify the path in binary or UNC formats. The installation files for this project use a binary representation.WITH PERMISSION_SET
- sets permissions.UNSAFE
is the least stringent and necessary in this case.
Note: Regardless of whether the role or the login name was used in the proposalWhen the assembly is created, we make the .NET method wrappers in it:AUTHORIZATION
, the appdomain class must be created with the same name as when the assembly was loaded into the domain. It is recommended to separate assemblies with different names of appdomain classes so that when one assembly falls, the others do not collapse. However, if assemblies have dependencies on each other, they cannot be divided into different classes.
CREATEPROCEDURE rmq.pr_clr_PostRabbitMsg @EndpointID int, @Message nvarchar(max)
ASEXTERNALNAME [RabbitMQ.SqlServer].[RabbitMQSqlClr.RabbitMQSqlServer].[pr_clr_PostRabbitMsg];
GO
Code snippet 2: wrapping the .NET method
The code performs the following actions:
- Creates a stored T-SQL procedure with a name
rmq.pr_clr_PostRabbitMsg
that takes two parameters;@EndpointID
and@Message
. - Instead of the procedure body, an external source is used, which consists of:
- The assembly with the name
RabbitMQ.SqlServer
, i.e. the aggregate that we created above in the code snippet 1 . - Full type (namespace and class):
RabbitMQSqlClr.RabbitMQSqlServer
- Method of Namespace above and classes:
pr_clr_PostRabbitMsg
.
- The assembly with the name
When executing the procedure
rmq.pr_clr_PostRabbitMsg
, the method will be called pr_clr_PostRabbitMsg
.Note: when creating a procedure, the assembly name is not case-sensitive, unlike the full name of the type and method. It is not necessary that the name of the procedure being created matches the name of the method. However, the final data types for the parameters must match.As I said earlier, we in Derivco need to send data outside of SQL Server, so we use SQLCLR and RabbitMQ (RMQ).
RabbitMQ
RMQ is an open source message broker that implements the Advanced Message Queuing Protocol (AMQP) and is written in Erlang.
Since RMQ is a message broker, AMQP client libraries are required to connect to it. The application refers to the client libraries and with their help opens the connection and sends messages - such as, for example, a call is made through ADO.NET to SQL Server. But unlike ADO.NET, where, most likely, the connection is opened each time the database is accessed, the connection remains open during the entire period of the application.
Thus, in order to be able to interact from the database with RabbitMQ we need the application and the client .NET library for RabbitMQ.
Note: In the following part of this article, there will be fragments of the RabbitMQ code, but without detailed explanations of what they do. If you are new to working with RabbitMQ, then I suggest looking at various RabbitMQ lessons to understand the purpose of the code. Hello World C # tutorial is a good start. One of the differences between textbooks and code examples is that the examples do not declare exchangers. It is assumed that they are predetermined.
RabbitMQ.SqlServer
RabbitMQ.SqlServer is an assembly that uses the client .NET library for RabbitMQ and provides the ability to send messages from the database to one or several RabbitMQ endpoints (VHosts and exchangers). The code can be downloaded / posted from my RabbitMQ-SqlServer repository on GitHub. It contains source codes of assembly and installation files (i.e. you do not have to compile them yourself).
Note: this is just an example to show how SQL Server can interact with RabbitMQ. This is NOT a finished product and not even a part of it. If this code breaks your brain, you don’t have to blame me, for this is just an example.
Functionality
When the assembly is loaded, or when an explicit call to initialize or indirect circulation at the time of call wrapper procedures, assembly loads connection string in a local database into which it has been installed as endpoints RabbitMQ, to which it is connected:
Connection
internalboolInternalConnect()
{
try
{
connFactory = new ConnectionFactory();
connFactory.Uri = connString;
connFactory.AutomaticRecoveryEnabled = true;
connFactory.TopologyRecoveryEnabled = true;
RabbitConn = connFactory.CreateConnection();
for (int x = 0; x < channels; x++)
{
var ch = RabbitConn.CreateModel();
rabbitChannels.Push(ch);
}
returntrue;
}
catch(Exception ex)
{
returnfalse;
}
}
Code snippet 3: connection to the endpoint
At the same time, the connection part to the endpoint also creates IModels on the connection, and they are used when sending (adding to the queue) messages:
Sending a message
internalboolPost(string exchange, byte[] msg, string topic)
{
IModel value = null;
int channelTryCount = 0;
try
{
while ((!rabbitChannels.TryPop(outvalue)) && channelTryCount < 100)
{
channelTryCount += 1;
Thread.Sleep(50);
}
if (channelTryCount == 100)
{
var errMsg = $"Channel pool blocked when trying to post message to Exchange: {exchange}.";
thrownew ApplicationException(errMsg);
}
value.BasicPublish(exchange, topic, false, null, msg);
rabbitChannels.Push(value);
returntrue;
}
catch (Exception ex)
{
if (value != null)
{
_rabbitChannels.Push(value);
}
throw;
}
}
The method
Post
is called from the method pr_clr_PostRabbitMsg(int endPointId, string msgToPost)
that was presented as a procedure using the sentence CREATE PROCEDURE
in code snippet 2: Post call method
publicstaticvoidpr_clr_PostRabbitMsg(int endPointId, string msgToPost)
{
try
{
if(endPointId == 0)
{
thrownew ApplicationException("EndpointId cannot be 0");
}
if (!isInitialised)
{
pr_clr_InitialiseRabbitMq();
}
var msg = Encoding.UTF8.GetBytes(msgToPost);
if (endPointId == -1)
{
foreach (var rep in remoteEndpoints)
{
var exch = rep.Value.Exchange;
var topic = rep.Value.RoutingKey;
foreach (var pub in rabbitPublishers.Values)
{
pub.Post(exch, msg, topic);
}
}
}
else
{
RabbitPublisher pub;
if (rabbitPublishers.TryGetValue(endPointId, out pub))
{
pub.Post(remoteEndpoints[endPointId].Exchange, msg, remoteEndpoints[endPointId].RoutingKey);
}
else
{
thrownew ApplicationException($"EndpointId: {endPointId}, does not exist");
}
}
}
catch
{
throw;
}
}
Code snippet 5: Method representation in the form of a procedure.
When the method is executed, it is assumed that the caller sends the identifier of the end point to which the message is to be sent, and, in fact, the message itself. If the value -1 is passed as the identifier of the end point, then we iterate over all the points and send a message to each of them. The message comes in the form of a string from which we get the bytes with
Encoding.UTF8.GetBytes
. In a production environment, the call Encoding.UTF8.GetBytes
should be replaced with serialization.Installation
To install and run the example, you need all the files in the folder
src\SQL
. To install, follow these steps:- Run the script
01.create_database_and_role.sql
. He will create:- the test database
RabbitMQTest
where the assembly will be created. - the role
rmq
to be assigned as the owner of the assembly - scheme, which will also be called
rmq
. In this scheme, various database objects are created.
- the test database
- Run the file
02.create_database_objects.sql
. He will create:- the table
rmq.tb_RabbitSetting
in which the local database connection string will be stored. - A table
rmq.tb_RabbitEndpoint
in which one or more endpoints will be storedRabbitMQ
.
- the table
- In the file,
03.create_localhost_connstring.sql
change the value of the variable@connString
to the correct connection string to the databaseRabbitMQTest
created in step 1 and run the script.
Before proceeding, you need to have a running instance of the RabbitMQ broker and VHost (by default, VHost is represented as /). As a rule, we have several vhost, just for isolation. This host also needs an exchanger, in the example we use
amq.topic
. When your RabbitMQ broker is ready, edit the parameters of the procedure rmq.pr_UpsertRabbitEndpoint
that is in the file 04.upsert_rabbit_endpoint.sql
: RabbitMQ endpoint
EXEC rmq.pr_UpsertRabbitEndpoint @Alias = 'rabbitEp1',
@ServerName = 'RabbitServer',
@Port = 5672,
@VHost = 'testHost',
@LoginName = 'rabbitAdmin',
@LoginPassword = 'some_secret_password',
@Exchange = 'amq.topic',
@RoutingKey = '#',
@ConnectionChannels = 5,
@IsEnabled = 1
Code Snippet 6: Creating an Endpoint in RabbitMQ
At this stage, it is time to deploy the assemblies. There are differences in the deployments for SQL Server versions prior to SQL Server 2014 (2005, 2008, 2008R2, 2012), and for 2014 and higher. The difference is in the supported version of the CLR. Before SQL Server 2014, the .NET platform ran in the CLR version 2, and in SQL Server 2014 and higher, version 4 is used.
SQL Server 2005 - 2012
Let's start with the versions of SQL Server that run on CLR 2, since there are some special features there. We need to deploy the created assembly, and at the same time deploy the Rabbit .NET client library RabbitMQ (
RabbitMQ.Client
). From our assembly we will refer to the RabbitMQ client library. Since we planned to use CLR 2, then our build RabbitMQ.Client
should be compiled based on .NET 3.5. There are problems. All recent versions of the library are
RabbitMQ.Client
compiled for the CLR 4 environment, so they cannot be used in our build. The latest version of client libraries for CLR 2 is compiled on .NET 3.4.3. But even if we try to deploy this assembly, we will receive an error message: Figure 1: System.ServiceModel assembly is missing
This version
RabbitMQ.Client
refers to an assembly that is not part of the CLR SQL Server. This is a WCF assembly, and this is one of the limitations in SQLCLR that I mentioned above: this particular assembly is intended for those types of tasks that are not allowed to be performed in SQL Server. The latest versions RabbitMQ.Client
do not have these dependencies, so they can be used without any problems, except for the annoying requirements of the CLR 4 environment. What should I do? As you know, RabbitMQ is open source, well, and we are developers, right? ;) So let's recompile! In the version before the latest releases (i.e. version <3.5.0),
RabbitMQ.Client
I deleted the links to System.ServiceModel
and recompiled. I had to change a couple of lines of code using the functionality System.ServiceModel
, but these were minor changes.In this example, I did not use client version 3.4.3, but took the stable release 3.6.6 and recompiled using .NET 3.5 (CLR 2). It almost worked :), except that later releases
RabbitMQ.Client
use Task
'and which are not originally part of .NET 3.5. Fortunately, there is a version
System.Threading.dll
for .NET 3.5 that includes Task
. I downloaded it, set up links and everything went! Here the main thing is that it System.Threading.dll
should be installed with the assembly.Note: the source codeTo install the necessary assemblies (RabbitMQ.Client
from which I compiled the version on .NET 3.5 is in my repository on GitHub RabbitMQ Client 3.6.6 .NET 3.5 . The binary dll along withSystem.Threading.dll
for .NET 3.5 also lies in thelib\NET3.5
repository directory (RabbitMQ-SqlServer) .
System.Threading
, RabbitMQ.Client
and RabbitMQ.SqlServer
), run the installation scripts from the directory src\sql
in the following order:05.51.System.Threading.sql2k5-12.sql
- System.Threading05.52.RabbitMQ.Client.sql2k5-12.sql
- RabbitMQ.Client05.53.RabbitMQ.SqlServer.sql2k5-12.sql
- RabbitMQ.SqlServer
SQL Server 2014+
In SQL Server 2014 and later, the assembly is compiled under .NET 4.XX (my example is 4.5.2), and you can refer to any of the latest versions
RabbitMQ.Client
that can be obtained using NuGet . In my example, I use 4.1.1. RabbitMQ.Client
which is also in the lib\NET4
repository directory (RabbitMQ-SqlServer) . To install, run the scripts from the directory
src\sql
in the following order:05.141.RabbitMQ.Client.sql2k14+.sql
- RabbitMQ.Client05.142.RabbitMQ.SqlServer.sql2k14+.sql
- RabbitMQ.SqlServer
SQL method wrappers
To create the procedures that will be used from our assembly (3.5 or 4), run the script
06.create_sqlclr_procedures.sql
. It will create T-SQL procedures for the three .NET methods:rmq.pr_clr_InitialiseRabbitMq
causespr_clr_InitialiseRabbitMq
. Used to load and initialize the RabbitMQ.SqlServer assembly.rmq.pr_clr_ReloadRabbitEndpoints
causespr_clr_ReloadRabbitEndpoints
. Loads various RabbitMQ endpoints.rmq.pr_clr_PostRabbitMsg
causespr_clr_PostRabbitMsg
. Used to send a message to RabbitMQ.
The script also creates a simple T-SQL procedure -
rmq.pr_PostRabbitMsg
that applies to rmq.pr_clr_PostRabbitMsg
. This is a wrapper procedure that knows what to do with the data, handles exceptions, etc. In a production environment, we have several similar procedures that handle different types of messages. Read more about this below.Using
From all of the above, it is clear that in order to send messages to RabbitMQ, we call
rmq.pr_PostRabbitMsg
or rmq.pr_clr_PostRabbitMsg
by passing the endpoint identifier and the message itself as a string. All this, of course, cool, but I would like to see how it will work in reality. What we do in working environments — in stored procedures that process data that needs to be sent to RabbitMQ, we collect data to send and in a connection block we call a procedure similar to that
rmq.pr_PostRabbitMsg
. The following is a very simplified example of such a procedure: Data processing procedure
ALTERPROCEDURE dbo.pr_SomeProcessingStuff @idintASBEGINSET NOCOUNT ON;
BEGIN TRY
--создаем переменную для конечной точкиDECLARE @endPointId int;
--создаем переменную для сообщенияDECLARE @msg nvarchar(max) = '{'--выполняем необходимые действия и собираем данные для сообщенияSET @msg = @msg + '"Id":' + CAST(@idASvarchar(10)) + ','-- делаем что-то ещеSET @msg = @msg + '"FName":"Hello",';
SET @msg = @msg + '"LName":"World"';
SET @msg = @msg + '}';
--снова что-то делаем-- получаем идентификатор конечной точки откуда-то, по каким-то условиямSELECT @endPointId = 1;
--здесь начинается блок подключения--вызываем процедуру для отправки сообщения
EXEC rmq.pr_PostRabbitMsg @Message = @msg, @EndpointID = @endPointId;
END TRY
BEGIN CATCH
DECLARE @errMsg nvarchar(max);
DECLARE @errLine int;
SELECT @errMsg = ERROR_MESSAGE(), @errLine = ERROR_LINE();
RAISERROR('Error: %s at line: %d', 16, -1, @errMsg, @errLine);
END CATCH
END
In the code snippet 7, we see how the necessary data is captured and processed in the procedure and sent after processing. To use this procedure, run the script
07.create_processing_procedure.sql
from the directory src\SQL
.Let's run it all
At this point, you should be ready to send several messages. Before testing, make sure you have queues in RabbitMQ that are attached to the endpoint's exchanger
rmq.tb_RabbitEndpoint
. So, to start you need to do the following:
Open the file
99.test_send_message.sql
. Execute
EXEC rmq.pr_clr_InitialiseRabbitMq;
to initialize the build and load RabbitMQ endpoints. This is not a required step, but it is recommended to preload the assembly after it is created or modified.
Execute
EXEC dbo.pr_SomeProcessingStuff @id = 101
(you can use any other identifier you like).
If everything worked out without errors, then a message should appear in the RabbitMQ queue! So you used SQLCLR to send a message to RabbitMQ.
Congrating!