Hangfire queue support

Hangfire is a library for .net (core) that allows you to asynchronously execute some code according to the principle of "fire and forget". An example of such a code could be sending E-Mail, video processing, synchronization with another system, etc. In addition to "fire and forget" there is support for deferred tasks, as well as scheduled tasks in the Cron format.


Currently there are a lot of similar libraries. There are several advantages in favor of the Hangfire:


  • Easy configuration, user friendly API
  • Reliability. Hangfire ensures that the created task will be executed at least once
  • Ability to perform tasks in parallel and excellent performance
  • Extensibility (we will use it below)
  • Quite complete and understandable documentation
  • Dashboard on which you can see all the statistics about the tasks

I won't go into too much detail, since there are quite a few good articles about Hangfire and how to use it. In this article I will explain how to use the support of several queues (or task pools), how to fix the standard retry-functionality and make sure that each queue has an individual configuration.


Existing support for (pseudo) queues


Important note: in the title, I used the term pseudo-turn, because Hangfire does not guarantee the execution of tasks in a specific order. Those. The principle of "First In First Out" does not work and we will not rely on it. Moreover, the author of the library recommends making the tasks idempotent, i.e. steady to unexpected multiple execution. Further I will simply use the word "turn", since Hangfire uses the term "Queue".


Hangfire has simple queue support. Although it does not offer the flexibility of Message Queue Systems, such as rabbitMQ or Azure Service Bus, this is often enough for a wide range of tasks.


Each task has a property "Queue", that is, the name of the queue in which it should run. By default, the task is sent to the queue with the name "default", unless otherwise specified. Support for multiple queues is needed in order to separately manage the execution of tasks of different types. For example, we may want video processing tasks to go to the "video_queue" queue, and send E-Mails to the "email_queue" queue. In this way, we are able to independently perform these two types of tasks. If we want to bring the video processing to a dedicated server, we can easily do this by running a separate Hangfire server as a console application that will process the "video_queue" queue.


Let's go to practice


Setting up a hangfire server in asp.net core is as follows:


publicvoidConfigure(IApplicationBuilder app)
{
    app.UseHangfireServer(new BackgroundJobServerOptions
    {
        WorkerCount = 2,
        Queues = new[] { "email_queue", "video_queue" }
    });
}

Problem 1 - Tasks in the replay fall into the queue "default"


As I mentioned above, there is a default queue in Hangfire, which is called "default". If the queued task, for example, "video_queue", failed and needs to be replayed, then it will be sent to the "default" queue, not "video_queue", and, as a result, our task will not be performed at all the instance of the Hangfire server we would like, if at all. This behavior was established by me experimentally and is probably a bug in the Hangfire itself.


Job filters


Hangfire provides us with the possibility of expanding functionality with the help of so-called filters ( Job Filters ), which are similar in principle to Actions Filters in ASP.NET MVC. The fact is that the internal logic of the Hangfire is implemented as a State Machine. It is an engine that alternately translates tasks in a pool from one state to another (for example, created -> enqueued -> processing -> succeeded), and filters allow us to "intercept" the task being performed each time its state changes and to manipulate it. A filter is implemented as an attribute that can be applied to a particular method, class, or globally.


Job Parameters


As an argument, an ElectStateContext object is passed to the filter method. This object contains complete information about the currently running task. Among other things, it has the methods GetJobParameter <> (...) and SettJobParameter <> (...). Job Parameters allow you to store task-related information in a database. It is in the Job Parameters that the name of the queue to which the task was originally sent is stored, only for some reason this information is ignored during the next repetition.


Decision


So, we have a task that failed with an error and should be sent for re-execution to the right queue (to the same one that was assigned to it at the time of the initial creation). The repetition of a task that completed with an error is a transition from the state “failed” to the state “enqueued”. To solve the problem, create a filter that, when the task goes to the "enqueued" state, checks to which queue the task was initially sent and set the "QueueName" parameter to the required value:


publicclassHangfireUseCorrectQueueFilter 
    : JobFilterAttribute, IElectStateFilter
{
    publicvoidOnStateElection(ElectStateContext context)
    {
        if (context.CandidateState is EnqueuedState enqueuedState)
        {
            var queueName = context.GetJobParameter<string>("QueueName");
            if (string.IsNullOrWhiteSpace(queueName))
            {
                context.SetJobParameter("QueueName", enqueuedState.Queue);
            }
            else
            {
                enqueuedState.Queue = queueName;
            }
        }
    }
}

In order to apply the default filter to all tasks (that is, globally), add the following code to our configuration:


GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 });

Another small snag is that the GlobalJobFilters collection by default contains an instance of the class AutomaticRetryAttribute. This is a standard filter that is responsible for re-execution of failed tasks. It also sends the task to the "default" queue, ignoring the original queue. In order for our bike to go, you need to remove this filter from the collection and allow our filter to take responsibility for re-performing tasks. As a result, the configuration code will look like this:


var defaultRetryFilter = GlobalJobFilters.Filters
    .FirstOrDefault(f => f.Instance is AutomaticRetryAttribute);
if (defaultRetryFilter != null && defaultRetryFilter.Instance != null)
{
    GlobalJobFilters.Filters.Remove(defaultRetryFilter.Instance);
}
GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 });

It should be noted that AutomaticRetryAttribute implements the logic of automatic increase in the interval between attempts (the interval increases with each subsequent attempt), and removing AutomaticRetryAttribute from the GlobalJobFilters collection, we discard this functionality (see implementation of the ScheduleAgainLater method )


So, we have achieved that our tasks can be performed in different queues and this allows us to independently manage their execution, including processing different queues on different machines. Only now we don’t know how many times and at what intervals our tasks will be repeated in case of an error, because we removed the AutomaticRetryAttribute from the filter collection.


Problem 2 - Individual settings for each queue


We want to be able to configure the interval and the number of repetitions separately for each queue, and also, if we have not explicitly specified values ​​for any queue, we want the default values ​​to be applied. To do this, we will implement another filter and name it HangfireRetryJobFilter.


Ideally, the configuration code should look something like this:


GlobalJobFilters.Filters.Add(new HangfireRetryJobFilter
{
    Order = 2,
    ["email_queue"] = new HangfireQueueSettings
    {
        DelayInSeconds = 120,
        RetryAttempts = 3
    },
    ["video_queue"] = new HangfireQueueSettings
    {
        DelayInSeconds = 60,
        RetryAttempts = 5
    }
});

Decision


To do this, first add a class HangfireQueueSettingsthat will serve as a container for our settings.


publicsealedclassHangfireQueueSettings
{
    publicint RetryAttempts { get; set; }
    publicint DelayInSeconds { get; set; }
}

Then we add the implementation of the filter itself, which, when the tasks are executed again after the error, will apply the settings depending on the queue configuration and monitor the number of retries:


publicclassHangfireRetryJobFilter 
        : JobFilterAttribute, IElectStateFilter, IApplyStateFilter
{
    privatereadonly HangfireQueueSettings _defaultQueueSettings = 
        new HangfireQueueSettings { RetryAttempts = 3, DelayInSeconds = 10 };
    privatereadonly IDictionary<string, HangfireQueueSettings> _settings 
        = new Dictionary<string, HangfireQueueSettings>();
    public HangfireQueueSettings this[string queueName]
    {
        get
        {
            return _settings.TryGetValue(queueName, out HangfireQueueSettings queueSettings) 
                ? queueSettings 
                : _defaultQueueSettings;
        }
        set
        {
            _settings[queueName] = value;
        }
    }
    publicvoidOnStateElection(ElectStateContext context)
    {
        if (!(context.CandidateState is FailedState failedState))
        {
            // This filter accepts only failed job state.return;
        }
        var retryAttempt = context.GetJobParameter<int>("RetryCount") + 1;
        var queueName = context.GetJobParameter<string>("QueueName");
        if (retryAttempt <= this[queueName].RetryAttempts)
        {
            ScheduleAgainLater(context, retryAttempt, failedState, queueName);
        }
        else
        {
            TransitionToDeleted(context, failedState, queueName);
        }
    }
    publicvoidOnStateApplied(
        ApplyStateContext context, 
        IWriteOnlyTransaction transaction)
    {
        if (context.NewState is ScheduledState &&
            context.NewState.Reason != null &&
            context.NewState.Reason.StartsWith("Retry attempt"))
        {
            transaction.AddToSet("retries", context.BackgroundJob.Id);
        }
    }
    publicvoidOnStateUnapplied(
        ApplyStateContext context, 
        IWriteOnlyTransaction transaction)
    {
        if (context.OldStateName == ScheduledState.StateName)
        {
            transaction.RemoveFromSet("retries", context.BackgroundJob.Id);
        }
    }
    privatevoidScheduleAgainLater(
        ElectStateContext context, 
        int retryAttempt, 
        FailedState failedState, 
        string queueName)
    {
        context.SetJobParameter("RetryCount", retryAttempt);
        var delay = TimeSpan.FromSeconds(this[queueName].DelayInSeconds);
        constint maxMessageLength = 50;
        var exceptionMessage = failedState.Exception.Message.Length > maxMessageLength
            ? failedState.Exception.Message.Substring(0, maxMessageLength - 1) + "…"
            : failedState.Exception.Message;
        // If attempt number is less than max attempts, we should// schedule the job to run again later.var reason = $"Retry attempt {retryAttempt} of {this[queueName].RetryAttempts}: {exceptionMessage}";
        context.CandidateState = delay == TimeSpan.Zero
            ? (IState)new EnqueuedState { Reason = reason }
            : new ScheduledState(delay) { Reason = reason };
    }
    privatevoidTransitionToDeleted(
        ElectStateContext context, 
        FailedState failedState, 
        string queueName)
    {
        context.CandidateState = new DeletedState
        {
            Reason = this[queueName].RetryAttempts > 0
                ? "Exceeded the maximum number of retry attempts."
                : "Retries were disabled for this job."
        };
    }
}

Note to the code: the HangfireRetryJobFilterclass AutomaticRetryAttributefrom Hangfire was taken as the basis for the implementation of the class , so the implementation of some methods partially coincides with the corresponding methods of this class.

Problem 3 - How to send a task for execution to a specific queue?


I managed to find two ways to assign a queue to a task: documented and not.


1st way - hang the corresponding attribute on the method


[Queue("video_queue")]
publicvoidSomeMethod() { }
BackgroundJob.Enqueue(() => SomeMethod());

http://docs.hangfire.io/en/latest/background-processing/configuring-queues.html


2nd way (undocumented) - use classBackgroundJobClient


var client = new BackgroundJobClient();
client.Create(() => MyMethod(), new EnqueuedState("video_queue"));

The advantage of the second method is that it does not create unnecessary dependencies on Hangfire and allows you to decide in the process of execution which task the queue should go to. Unfortunately, in the official documentation, I did not find a mention of the class BackgroundJobClientor how to use it. I used the second method in my decision, so it is tested in practice.


Conclusion


In this article, we used the support of several queues in the Hangfire to separate the processing of different types of tasks. We implemented our own mechanism for repeating failed tasks with the possibility of individual configuration for each queue, expanding Hangfire functionality using Job Filters, and also learned how to send tasks for execution to the queue we need.


I hope this article will be useful to someone. I would be happy to comment.


useful links


Hangfire Documentation
Source Code Hangfire
Scott Hanselman - How to Run Background Tasks in ASP.NET


Also popular now: