Implementation of webhooks as an example of the interaction of third-party services with online cash registers
- Tutorial
I asked our marketing team to draw an illustration and for a long time explained what webhooks are.
Not so long ago, I faced the task of realizing the work of webhooks in the Personal Account of the owner of the box office of the company Dreamcas. As it turned out, the network has virtually no description and tutorials on how to do this. I will tell you how we implemented it without heavy crowns according to the database.
This article will be useful for middle node.js developers.
Where to use webhooks
To understand the specifics, you have to start from afar.
Dreamkas produces online cash desks and the cloud service Cabinet Dreamkas . All cash registers in real time send data on sales over the Internet to the tax office - this is a requirement of the new law. By connecting to the Cabinet, the cashier’s owner gets remote access to these sales statistics and other tools.
The Drimkas cabinet allows the cashier’s owner from the web interface to monitor sales, work with reports, create, edit and automatically upload the product base to all cash registers, and connect external inventory systems.
We needed webhooks when we connected online stores to the Cabinet. For online trading, you also need a cashier, only a paper check is not printed. We decided to create a tool for them so that they could record sales data in financial markets from ordinary json with purchase data and transfer them to the OFD.
Since the fiscalization operation can take a long time, exceeding the usual HTTP request, we needed to give the opportunity to find out the status of this check. Each time, knocking on the Cabinet for check status is not beneficial for us or the Internet store. And with webhooks, we immediately kill two birds with one stone: The cabinet makes a request only once, and the online store receives a check as soon as it is ready.
When we started to implement them, we decided to give integrator services access to this functionality. With their help, third-party services that are connected to the Cabinet receive notifications about sales, opening / closing shifts, creating and editing goods, depositing and withdrawing money. We have not stopped so far, and we immediately translate all the important events into webhooks.
Our Webhook Requirements
- Webhooks must be called upon certain events;
- It should be possible to "subscribe" to certain of them if we do not want to receive all the events;
- Webhooks must repeat requests until a third-party service accepts them;
- A webhook is deleted if it was not accepted within 24 hours;
- We need the last item when the third-party service does not respond to the request. We assume that it has fallen, and sending new requests may drop it again. Therefore, we do these repeated requests at certain intervals.
Current backend stack
We write on node.js. As a web framework, koa is selected. We have two databases. Postrges with sequelize, where strongly related data is stored, such as ticket offices and users. To store unrelated and immutable data - checks, shifts - we use MongoDB. Queues on rabbitMQ are still commonly used to smooth out spasmodic loads. Plus redis for cache.
Webhook Implementation
Defining events for calling webhooks
First, let's define the places where we want to call webhooks. At the model level, we can use hooks in mongoose and in most cases sequelize.
Historically, in our sequelize model you cannot create a product immediately with data. We create an empty product and immediately change it, so we had to add handlers for calling webhooks in all controllers with our hands.
When there is no such problem, everything is quite simple. Example from mongoose model:
schema.static('setStatus', async function (_id, status, data) {
// логика изменения статуса
const res = await this.update({ _id }, { … });
await Webhook.send({ ... });
return res;
});
Event Subscriptions
To define the concept of subscribing to certain events, we use bit masks.
In the backend, we store all the information about the types of events in one number, and we send the finished json object to the front:
{
"types": {
"products": true,
"receipts": false,
"shifts": true,
"encashments": false,
"devices": false,
"operations": true
},
}
To pack a number in json and extract it back, we create virtual attributes in sequelize. We install getters and setters in them. Virtual fields are calculated on the fly, changed to fields in the table, but the databases are not stored.
// Статические методы, которые хранятся в отдельном файле
import _ from 'lodash';
export const scopeBits = {
products: 0,
receipts: 1,
shifts: 2,
encashments: 3,
devices: 4,
operations: 5,
};
/**
* Этот маппинг появился, потому что мы захотели,
* чтобы по вебхуками прилетало название модели
* и тип операции в UPPER CASE и в единственном числе.
*/
/* eslint-disable key-spacing */
const typeToTypes = {
PRODUCT: { products: true },
RECEIPT: { receipts: true },
SHIFT: { shifts: true },
ENCASHMENT: { encashments: true },
DEVICE: { devices: true },
OPERATION: { operations: true },
};
/* eslint-enable key-spacing */
export function formMask(scope) {
if (_.isEmpty(scope)) {
return 0;
}
return _.reduce(Object.keys(scope), (mask, key) => {
if (scope[key]) {
mask |= 1 << scopeBits[key];
}
return mask;
}, 0);
}
export function formEvents(mask) {
return _.reduce(scopeBits, (memo, bit, scope) => {
if (mask & (1 << bit)) {
memo[scope] = true;
} else {
memo[scope] = false;
}
return memo;
}, {});
}
// В описании модели:
subscribes: {
type: DataTypes.INTEGER,
allowNull: false,
},
types: {
type: DataTypes.VIRTUAL(DataTypes.INTEGER, ['subscribes']),
get() {
return this.constructor.formEvents(this.get('subscribes'));
},
set(types) {
this.setDataValue('subscribes', this.constructor.formMask(types));
},
},
CRUD for managing webhooks
The user controls webhooks from the web interface or through the API. Therefore, we need standard CRUDs for this model.
import _ from 'lodash';
const editCols = ['url', 'types', 'isActive'];
export async function create(ctx) {
const fields = _.pick(ctx.request.body.fields, editCols);
fields.userId = ctx.state.user.id;
const webhook = await Webhook.create(fields);
ctx.body = { id: webhook.id };
ctx.status = 201;
}
Call preparation
We do not call webhooks in the static method of the Webhook class - this allows us to save the resources of the main site. It is the work of the workers to do background tasks without interfering with working with the REST-API.
When an event is generated on the site, we notify the workers about this:
import _ from 'lodash';
import { getClient } from '../../storage/redis';
import { stringify, getChannel } from '../../storage/rabbitmq';
/**
* получаем вебхуки, которых нужно вызвать для этого события у этого юзера
* types: { products: true, devices: false, ...}
*/
async function search({ userId, types }) {
const mask = formMask(types);
/**
* Ищем флаг нужного события через битовую операцию "и"
* и проверяем результат с самим числом, таким образом заложились на будущее,
* если вдруг захотим искать сразу по двум флагам
*/
return Webhook.sequelize.query(`SELECT id, url, subscribes
FROM "Webhook" WHERE subscribes & ? = ?
AND "userId" = ?
AND "isActive" = TRUE`,
{
type: Webhook.sequelize.QueryTypes.SELECT,
replacements: [mask, mask, userId],
},
);
}
/**
* Я не нашел в документации, как сделать этот запрос средствами sequelize
* Поэтому здесь использован сырой SQL-запрос
*/
/**
* Вставка в очередь задания «Дернуть вебхук»
* type=PRODUCT|DEVICE|ENCASHMENT|RECEIPT|OPERATION|...
* action=CREATE|UPDATE|DELETE
*/
export async function send({ userId, type, action, itemId }) {
// поиск по Redis
const client = getClient();
const key = `webhooks:${userId}:${type}`;
const isWebhooksExist = await client.existsAsync(key);
let webhooks;
if (!isWebhooksExist) {
// поиск в Postgres
const types = typeToTypes[type];
webhooks = await search({ userId, types });
// Кэшируем в Redis, даже если не нашли
await client.setAsync(key, JSON.stringify(webhooks), 'EX', 10);
} else {
webhooks = JSON.parse(await client.getAsync(key));
}
_.each(webhooks, (w) => {
const payload = stringify({
url: w.url,
itemId,
action,
type,
timestamp: Date.now(),
});
/**
* Ставим задачу в очередь. Устанавливаем время, какой URL вызвать и свойства:
* тип сущности (товар, чек, касса, ...), тип операции (создание, удаление, ..)
* и id сущности
*/
getChannel().sendToQueue('kab-webhooks-delayed-0', payload, { persistent: true });
});
}
In short, what we do: we look in the database for all the webhooks of this user who has a subscription to the current event. We cache them, even if we did not find anything - if the user loads a bunch of goods, there will be extra queries in the database. When there is a webhook, we throw in the queue a task with a timestamp, link, identifier and type of event.
There is a nuance: we save the site’s resources, and we throw in the queue only the identifier of the object. If possible, it is better to throw the object itself. When you create an object and immediately delete it, two tasks fall into the queue. The first task during execution will not be able to pull out the body of the object from the base. If you throw the whole body of an object, there will be no such problems.
Webhook Challenges and Recalls
Our stack uses message queues. We chose 5 time intervals, and each created a queue. If the call failed on the first attempt, the webhook proceeds to the next queue. When the worker receives a task as an input, he delays its execution for the required amount of time from 0 milliseconds to a day. After 24 hours, we call the webhook for the last time and delete it.
An example of a webhook that cannot be accepted during the day.
Each next task in the queue cannot be called earlier than the current one, as it was added there later. Therefore, when we took the task from the queue and saw that it was too early to call a webhook, we did not complete this task so as not to get the next one.
import Bluebird from 'bluebird';
import request from 'request';
import { parse, getChannel, stringify } from '../../lib/storage/rabbitmq';
const requestPostAsync = Bluebird.promisify(request.post);
const times = {
0: 0,
'5sec': 5 * 1000,
'1min': 1 * 60 * 1000,
'1hour': 1 * 60 * 60 * 1000,
'3hours': 3 * 60 * 60 * 1000,
'1day': 24 * 60 * 60 * 1000,
};
const getBodyById = async ({ itemId, type, action }) => {
/** Достаем из БД актуальное состояние сущности */
};
const handle = async (channel, msg, waitENUM, nextQueue) => {
const task = parse(msg);
const { url, itemId, type, action, timestamp } = task;
const data = await getBodyById({ itemId, type, action });
const estimatedTime = Date.now() - (new Date(timestamp).getTime());
const wait = times[waitENUM];
if (estimatedTime < wait) {
await Bluebird.delay(wait - estimatedTime);
}
try {
const response = await requestPostAsync(url, {
body: {
action,
type,
data,
},
headers: {
'content-type': 'application/json',
},
json: true,
timeout: 20 * 1000,
});
if (response.statusCode < 200
|| response.statusCode >= 300) {
throw new Error();
}
channel.ack(msg);
} catch (err) {
if (nextQueue) {
getChannel().sendToQueue(nextQueue, stringify(task));
}
channel.nack(msg, false, false);
}
};
/* eslint-disable no-multi-spaces */
export default function startConsume(channel) {
channel.prefetch(2);
channel.consume('kab-webhooks-delayed-0', msg => handle(channel, msg, 0,
'kab-webhooks-delayed-1'), { noAck: false });
channel.consume('kab-webhooks-delayed-1', msg => handle(channel, msg, '5sec',
'kab-webhooks-delayed-2'), { noAck: false });
channel.consume('kab-webhooks-delayed-2', msg => handle(channel, msg, '1min',
'kab-webhooks-delayed-3'), { noAck: false });
channel.consume('kab-webhooks-delayed-3', msg => handle(channel, msg, '1hour',
'kab-webhooks-delayed-4'), { noAck: false });
channel.consume('kab-webhooks-delayed-4', msg => handle(channel, msg, '3hour',
'kab-webhooks-delayed-5'), { noAck: false });
channel.consume('kab-webhooks-delayed-5', msg => handle(channel, msg, '1day',
''), { noAck: false });}
/* eslint-enable no-multi-spaces */
4 more facts
- It happens that tasks are queued in the wrong order - the next task in the queue must be completed earlier than the current one. This is not critical for us. The difference between them will not be more than 20 seconds - this is our request timeout.
- As the time intervals, we chose the following set of values: 0 seconds, 5 seconds, 1 minute, 1 hour, 3 hours and 24 hours.
- Requests that are not completed within 24 hours are not logged or stored. If a third-party service has a downtime of 24 hours, then there is nothing wrong with the non-received webhooks, because the problems there are of a different scale.
- If worker does not respond to the received event and simply disconnects, then RabbitMQ will add this message to the queue again. Thus, even if the application crashes, the webhook itself will not disappear.