RabbitMQ - SQL Server

Original author: Nils Berglund
  • Transfer
A week or two ago, I saw a message on the RabbitMQ Users forum about how to get messages sent from SQL Server to RabbitMQ. Since we are working closely with this at Derivco , I left some suggestions there and also said that I was writing a blog post about how this can be done. Part of my message was not entirely correct - at least until this point (sorry, Bro, was very busy).

Awesome, this is your SQL Server. With it, it is very easy to put information into a database. Retrieving data from a database using a query is just as easy. But getting just the updated or inserted data is already a little more difficult. Think about real-time events; A purchase is made - someone needs to be notified at the same moment as soon as this has happened. Perhaps someone will say that such data should not be pushed out of the database, but from somewhere else. Of course, the way it is, but quite often we simply have no choice.

We have had a task: send the event of a database for further processing outside and there was a question - how to do it?

SQL Server and external communications


During the existence of SQL Server, there have been several attempts to organize communications outside the database; SQL Server Notification Services (NS), which appeared in SQL Server 2000, and then, in SQL Server 2005, appeared SQL Server Service Broker (SSB). I described them in my book A First Look at SQL Server 2005 for Developers , along with Bob Boshman and Dan Sullivan. NS appeared in SQL Server 2000, as I said, and was redesigned in beta version of SQL Server 2005. However, from the ready-for-sale (RTM) version of SQL Server 2005, NS was cut out completely.
Note: If you read the book, you will find a number of features there that were not in the RTM version.
SSB survived, and Microsoft introduced Service Broker External Activator (EA) in the SQL Server 2008 Feature Pack . It allows SSB to interact outside the local database. In theory, it sounds good, but in practice it is cumbersome and confused. We conducted several tests and quickly realized that he was not fulfilling what we needed. In addition, SSB did not give us the performance that was needed, so we had to invent something else.

SQLCLR


What we came to as a result was based on the SQLCLR technology. SQLCLR is a .NET platform embedded in the SQL Server kernel and with its help you can execute .NET code inside the kernel. Since we are executing .NET code, we are able to do almost everything in a regular .NET application.
Note: I wrote “almost” above, because in fact there are some limitations. In this context, these restrictions have little effect on what we are going to do.
The principle of SQLCLR operation is as follows: the code is compiled into a dll library, and then this library is registered by means of SQL Server:

Creating an assembly

CREATEASSEMBLY [RabbitMQ.SqlServer]
AUTHORIZATION rmq
FROM'F:\some_path\RabbitMQSqlClr4.dll'WITH PERMISSION_SET = UNSAFE;
GO

Code snippet 1: Creating an assembly in an absolute path

The code performs the following actions:

  • CREATE ASSEMBLY - creates an assembly with the specified name (no matter what it should be).
  • AUTHORIZATION- indicates the owner of the assembly. In this case, rmq is a predefined SQL Server role.
  • FROM- determines where the original assembly is located. The proposal FROMcan also specify the path in binary or UNC formats. The installation files for this project use a binary representation.
  • WITH PERMISSION_SET- sets permissions. UNSAFEis the least stringent and necessary in this case.

Note: Regardless of whether the role or the login name was used in the proposal AUTHORIZATION, the appdomain class must be created with the same name as when the assembly was loaded into the domain. It is recommended to separate assemblies with different names of appdomain classes so that when one assembly falls, the others do not collapse. However, if assemblies have dependencies on each other, they cannot be divided into different classes.
When the assembly is created, we make the .NET method wrappers in it:

CREATEPROCEDURE rmq.pr_clr_PostRabbitMsg @EndpointID int, @Message nvarchar(max)
ASEXTERNALNAME  [RabbitMQ.SqlServer].[RabbitMQSqlClr.RabbitMQSqlServer].[pr_clr_PostRabbitMsg];
GO

Code snippet 2: wrapping the .NET method

The code performs the following actions:

  • Creates a stored T-SQL procedure with a name rmq.pr_clr_PostRabbitMsgthat takes two parameters; @EndpointIDand @Message.
  • Instead of the procedure body, an external source is used, which consists of:
    • The assembly with the name RabbitMQ.SqlServer, i.e. the aggregate that we created above in the code snippet 1 .
    • Full type (namespace and class): RabbitMQSqlClr.RabbitMQSqlServer
    • Method of Namespace above and classes: pr_clr_PostRabbitMsg.

When executing the procedure rmq.pr_clr_PostRabbitMsg, the method will be called pr_clr_PostRabbitMsg.
Note: when creating a procedure, the assembly name is not case-sensitive, unlike the full name of the type and method. It is not necessary that the name of the procedure being created matches the name of the method. However, the final data types for the parameters must match.
As I said earlier, we in Derivco need to send data outside of SQL Server, so we use SQLCLR and RabbitMQ (RMQ).

RabbitMQ


RMQ is an open source message broker that implements the Advanced Message Queuing Protocol (AMQP) and is written in Erlang.

Since RMQ is a message broker, AMQP client libraries are required to connect to it. The application refers to the client libraries and with their help opens the connection and sends messages - such as, for example, a call is made through ADO.NET to SQL Server. But unlike ADO.NET, where, most likely, the connection is opened each time the database is accessed, the connection remains open during the entire period of the application.

Thus, in order to be able to interact from the database with RabbitMQ we need the application and the client .NET library for RabbitMQ.
Note: In the following part of this article, there will be fragments of the RabbitMQ code, but without detailed explanations of what they do. If you are new to working with RabbitMQ, then I suggest looking at various RabbitMQ lessons to understand the purpose of the code. Hello World C # tutorial is a good start. One of the differences between textbooks and code examples is that the examples do not declare exchangers. It is assumed that they are predetermined.

RabbitMQ.SqlServer


RabbitMQ.SqlServer is an assembly that uses the client .NET library for RabbitMQ and provides the ability to send messages from the database to one or several RabbitMQ endpoints (VHosts and exchangers). The code can be downloaded / posted from my RabbitMQ-SqlServer repository on GitHub. It contains source codes of assembly and installation files (i.e. you do not have to compile them yourself).
Note: this is just an example to show how SQL Server can interact with RabbitMQ. This is NOT a finished product and not even a part of it. If this code breaks your brain, you don’t have to blame me, for this is just an example.

Functionality


When the assembly is loaded, or when an explicit call to initialize or indirect circulation at the time of call wrapper procedures, assembly loads connection string in a local database into which it has been installed as endpoints RabbitMQ, to which it is connected:

Connection

internalboolInternalConnect()
{
  try
  {
    connFactory = new ConnectionFactory();
    connFactory.Uri = connString;
    connFactory.AutomaticRecoveryEnabled = true;
    connFactory.TopologyRecoveryEnabled = true;
    RabbitConn = connFactory.CreateConnection();
    for (int x = 0; x < channels; x++)
    {
      var ch = RabbitConn.CreateModel();
      rabbitChannels.Push(ch);
    }
    returntrue;
  }
  catch(Exception ex)
  {
    returnfalse;
  }
}

Code snippet 3: connection to the endpoint

At the same time, the connection part to the endpoint also creates IModels on the connection, and they are used when sending (adding to the queue) messages:

Sending a message

internalboolPost(string exchange, byte[] msg, string topic)
{
  IModel value = null;
  int channelTryCount = 0;
  try
  {
    while ((!rabbitChannels.TryPop(outvalue)) && channelTryCount < 100)
    {
      channelTryCount += 1;
      Thread.Sleep(50);
    }
    if (channelTryCount == 100)
    {
      var errMsg = $"Channel pool blocked when trying to post message to Exchange: {exchange}.";
      thrownew ApplicationException(errMsg);
      }
    value.BasicPublish(exchange, topic, false, null, msg);
    rabbitChannels.Push(value);
    returntrue;
  }
  catch (Exception ex)
  {
    if (value != null)
    {
      _rabbitChannels.Push(value);
    }
    throw;
  }
}

The method Postis called from the method pr_clr_PostRabbitMsg(int endPointId, string msgToPost)that was presented as a procedure using the sentence CREATE PROCEDUREin code snippet 2:

Post call method

publicstaticvoidpr_clr_PostRabbitMsg(int endPointId, string msgToPost)
{
  try
  {
    if(endPointId == 0)
    {
      thrownew ApplicationException("EndpointId cannot be 0");
    }
    if (!isInitialised)
    {
      pr_clr_InitialiseRabbitMq();
    }
    var msg = Encoding.UTF8.GetBytes(msgToPost);
    if (endPointId == -1)
    {
      foreach (var rep in remoteEndpoints)
      {
        var exch = rep.Value.Exchange;
        var topic = rep.Value.RoutingKey;
        foreach (var pub in rabbitPublishers.Values)
        {
          pub.Post(exch, msg, topic);
        }
      }
    }
    else
    {
      RabbitPublisher pub;
      if (rabbitPublishers.TryGetValue(endPointId, out pub))
      {
        pub.Post(remoteEndpoints[endPointId].Exchange, msg, remoteEndpoints[endPointId].RoutingKey);
      }
      else
      {
        thrownew ApplicationException($"EndpointId: {endPointId}, does not exist");
      }
    }
  }
  catch
  {
    throw;
  }
}  

Code snippet 5: Method representation in the form of a procedure.

When the method is executed, it is assumed that the caller sends the identifier of the end point to which the message is to be sent, and, in fact, the message itself. If the value -1 is passed as the identifier of the end point, then we iterate over all the points and send a message to each of them. The message comes in the form of a string from which we get the bytes with Encoding.UTF8.GetBytes. In a production environment, the call Encoding.UTF8.GetBytesshould be replaced with serialization.

Installation


To install and run the example, you need all the files in the folder src\SQL. To install, follow these steps:

  • Run the script 01.create_database_and_role.sql. He will create:
    • the test database RabbitMQTestwhere the assembly will be created.
    • the role rmqto be assigned as the owner of the assembly
    • scheme, which will also be called rmq. In this scheme, various database objects are created.

  • Run the file 02.create_database_objects.sql. He will create:

    • the table rmq.tb_RabbitSettingin which the local database connection string will be stored.
    • A table rmq.tb_RabbitEndpointin which one or more endpoints will be stored RabbitMQ.

  • In the file, 03.create_localhost_connstring.sqlchange the value of the variable @connStringto the correct connection string to the database RabbitMQTestcreated in step 1 and run the script.

Before proceeding, you need to have a running instance of the RabbitMQ broker and VHost (by default, VHost is represented as /). As a rule, we have several vhost, just for isolation. This host also needs an exchanger, in the example we use amq.topic. When your RabbitMQ broker is ready, edit the parameters of the procedure rmq.pr_UpsertRabbitEndpointthat is in the file 04.upsert_rabbit_endpoint.sql:

RabbitMQ endpoint

EXEC rmq.pr_UpsertRabbitEndpoint @Alias = 'rabbitEp1',
                                 @ServerName = 'RabbitServer',
                                 @Port = 5672,
                                 @VHost = 'testHost',
                                 @LoginName = 'rabbitAdmin',
                                 @LoginPassword = 'some_secret_password',
                                 @Exchange = 'amq.topic',
                                 @RoutingKey = '#',
                                 @ConnectionChannels = 5,
                                 @IsEnabled = 1

Code Snippet 6: Creating an Endpoint in RabbitMQ

At this stage, it is time to deploy the assemblies. There are differences in the deployments for SQL Server versions prior to SQL Server 2014 (2005, 2008, 2008R2, 2012), and for 2014 and higher. The difference is in the supported version of the CLR. Before SQL Server 2014, the .NET platform ran in the CLR version 2, and in SQL Server 2014 and higher, version 4 is used.

SQL Server 2005 - 2012


Let's start with the versions of SQL Server that run on CLR 2, since there are some special features there. We need to deploy the created assembly, and at the same time deploy the Rabbit .NET client library RabbitMQ ( RabbitMQ.Client). From our assembly we will refer to the RabbitMQ client library. Since we planned to use CLR 2, then our build RabbitMQ.Clientshould be compiled based on .NET 3.5. There are problems.

All recent versions of the library are RabbitMQ.Clientcompiled for the CLR 4 environment, so they cannot be used in our build. The latest version of client libraries for CLR 2 is compiled on .NET 3.4.3. But even if we try to deploy this assembly, we will receive an error message:


Figure 1: System.ServiceModel assembly is missing

This versionRabbitMQ.Clientrefers to an assembly that is not part of the CLR SQL Server. This is a WCF assembly, and this is one of the limitations in SQLCLR that I mentioned above: this particular assembly is intended for those types of tasks that are not allowed to be performed in SQL Server. The latest versions RabbitMQ.Clientdo not have these dependencies, so they can be used without any problems, except for the annoying requirements of the CLR 4 environment. What should I do?

As you know, RabbitMQ is open source, well, and we are developers, right? ;) So let's recompile! In the version before the latest releases (i.e. version <3.5.0), RabbitMQ.ClientI deleted the links to System.ServiceModeland recompiled. I had to change a couple of lines of code using the functionality System.ServiceModel, but these were minor changes.

In this example, I did not use client version 3.4.3, but took the stable release 3.6.6 and recompiled using .NET 3.5 (CLR 2). It almost worked :), except that later releases RabbitMQ.Clientuse Task'and which are not originally part of .NET 3.5.

Fortunately, there is a version System.Threading.dllfor .NET 3.5 that includes Task. I downloaded it, set up links and everything went! Here the main thing is that it System.Threading.dllshould be installed with the assembly.
Note: the source code RabbitMQ.Clientfrom which I compiled the version on .NET 3.5 is in my repository on GitHub RabbitMQ Client 3.6.6 .NET 3.5 . The binary dll along with System.Threading.dllfor .NET 3.5 also lies in the lib\NET3.5repository directory (RabbitMQ-SqlServer) .
To install the necessary assemblies ( System.Threading, RabbitMQ.Clientand RabbitMQ.SqlServer), run the installation scripts from the directory src\sqlin the following order:

  1. 05.51.System.Threading.sql2k5-12.sql - System.Threading
  2. 05.52.RabbitMQ.Client.sql2k5-12.sql - RabbitMQ.Client
  3. 05.53.RabbitMQ.SqlServer.sql2k5-12.sql - RabbitMQ.SqlServer

SQL Server 2014+


In SQL Server 2014 and later, the assembly is compiled under .NET 4.XX (my example is 4.5.2), and you can refer to any of the latest versions RabbitMQ.Clientthat can be obtained using NuGet . In my example, I use 4.1.1. RabbitMQ.Clientwhich is also in the lib\NET4repository directory (RabbitMQ-SqlServer) .

To install, run the scripts from the directory src\sqlin the following order:

  1. 05.141.RabbitMQ.Client.sql2k14+.sql - RabbitMQ.Client
  2. 05.142.RabbitMQ.SqlServer.sql2k14+.sql - RabbitMQ.SqlServer

SQL method wrappers


To create the procedures that will be used from our assembly (3.5 or 4), run the script 06.create_sqlclr_procedures.sql. It will create T-SQL procedures for the three .NET methods:

  • rmq.pr_clr_InitialiseRabbitMqcauses pr_clr_InitialiseRabbitMq. Used to load and initialize the RabbitMQ.SqlServer assembly.
  • rmq.pr_clr_ReloadRabbitEndpointscauses pr_clr_ReloadRabbitEndpoints. Loads various RabbitMQ endpoints.
  • rmq.pr_clr_PostRabbitMsgcauses pr_clr_PostRabbitMsg. Used to send a message to RabbitMQ.

The script also creates a simple T-SQL procedure - rmq.pr_PostRabbitMsgthat applies to rmq.pr_clr_PostRabbitMsg. This is a wrapper procedure that knows what to do with the data, handles exceptions, etc. In a production environment, we have several similar procedures that handle different types of messages. Read more about this below.

Using


From all of the above, it is clear that in order to send messages to RabbitMQ, we call rmq.pr_PostRabbitMsgor rmq.pr_clr_PostRabbitMsgby passing the endpoint identifier and the message itself as a string. All this, of course, cool, but I would like to see how it will work in reality.

What we do in working environments — in stored procedures that process data that needs to be sent to RabbitMQ, we collect data to send and in a connection block we call a procedure similar to that rmq.pr_PostRabbitMsg. The following is a very simplified example of such a procedure:

Data processing procedure

ALTERPROCEDURE dbo.pr_SomeProcessingStuff @idintASBEGINSET NOCOUNT ON;
  BEGIN TRY
    --создаем переменную для конечной точкиDECLARE @endPointId int;
    --создаем переменную для сообщенияDECLARE @msg nvarchar(max) = '{'--выполняем необходимые действия и собираем данные для сообщенияSET @msg = @msg + '"Id":' + CAST(@idASvarchar(10)) + ','-- делаем что-то ещеSET @msg = @msg + '"FName":"Hello",';
    SET @msg = @msg + '"LName":"World"';
    SET @msg = @msg + '}';
    --снова что-то делаем-- получаем идентификатор конечной точки откуда-то, по каким-то условиямSELECT @endPointId = 1;
    --здесь начинается блок подключения--вызываем процедуру для отправки сообщения
    EXEC rmq.pr_PostRabbitMsg @Message = @msg, @EndpointID = @endPointId;
  END TRY
  BEGIN CATCH
    DECLARE @errMsg nvarchar(max);
    DECLARE @errLine int;
    SELECT @errMsg = ERROR_MESSAGE(), @errLine = ERROR_LINE();
    RAISERROR('Error: %s at line: %d', 16, -1, @errMsg, @errLine);
  END CATCH
END

In the code snippet 7, we see how the necessary data is captured and processed in the procedure and sent after processing. To use this procedure, run the script 07.create_processing_procedure.sqlfrom the directory src\SQL.

Let's run it all


At this point, you should be ready to send several messages. Before testing, make sure you have queues in RabbitMQ that are attached to the endpoint's exchanger rmq.tb_RabbitEndpoint.

So, to start you need to do the following:
Open the file 99.test_send_message.sql.
Execute

EXEC rmq.pr_clr_InitialiseRabbitMq;

to initialize the build and load RabbitMQ endpoints. This is not a required step, but it is recommended to preload the assembly after it is created or modified.

Execute

EXEC dbo.pr_SomeProcessingStuff @id = 101

(you can use any other identifier you like).

If everything worked out without errors, then a message should appear in the RabbitMQ queue! So you used SQLCLR to send a message to RabbitMQ.

Congrating!

Also popular now: