WebSockets in Angular. Part 2. Product Solutions

  • Tutorial
image

In the previous article, we talked about the general solution for web sockets in Angular, where we built a bus with a reconnect and a service for use in components based on a WebSocketSubject. Such an implementation is suitable for most simple cases, for example, receiving and sending messages in a chat, etc., but its capabilities may not be enough where you need to build something more flexible and controllable. In this article, I will reveal some of the features when working with websockets and talk about the requirements that I faced myself and may have come across you.

Often, in large projects with high attendance, the front-end faces challenges that in other circumstances are more common to see on the backend. In the conditions of austerity of server resources, some of the problems migrate to the frontend territory, for this reason, the project lays the maximum for extensibility and control.

Here is a list of basic requirements for a web client client that will be covered in this article:

  • Automatic "smart" reconnect;
  • Debug mode;
  • Event subscription system based on RxJs;
  • Reception and parsing of binary data;
  • Projection (mapping) of the information received on the model;
  • Control over model changes as new events arrive;
  • Ignoring arbitrary events and canceling ignore.

Consider each item in more detail.

Reconnect / Debag


I wrote about reconnect in the previous article, so I will simply quote part of the text:

Reconnect, or the organization of reconnection to the server, is a primary factor when working with websockets, since Network breaks, server crashes, or other errors that cause a disconnection can cause the application to crash.
It is important to note that reconnection attempts should not be too frequent and should not continue indefinitely, since this behavior is capable of hanging the client.

By itself, a web socket does not know how to re-establish a connection during a break. Therefore, if the server has rebooted or dropped, or the Internet has been reconnected by the user, then the web socket will also need to be reconnected to continue.

In this article, reconnecting and debugging will use Reconnecting WebSocket , which contains the necessary functionality and other options, such as changing the web sock url between reconnects, choosing an arbitrary WebSocket designer, etc. Other alternative solutions will also work. Reconnect from the previous article is not suitable, because it is written under WebSocketSubject, which this time does not apply.

Event subscription system based on RxJs


To use webboxes in components, you need to subscribe to events and unsubscribe from them when required. To do this, we use the popular Pub / Sub design pattern .
"Subscriber-subscriber (publisher-subscriber or pub / pub) is a behavioral design pattern of messaging in which message senders, called publishers, are not directly tied to the subscribers' program code ). Instead, messages are divided into classes and do not contain information about their subscribers, if any. Similarly, subscribers deal with one or more classes of messages, abstracting from specific publishers. ”

The subscriber does not contact the publisher directly, but through an intermediate bus, a web socket service. It should also be possible to subscribe to several events with the same type of returned data. For each subscription, a separate Subject is created, which is added to the listeners object, which allows you to address the web socket events to the required subscriptions. When working with RxJs Subject, there are some difficulties with unsubscribing, so we will create a simple garbage collector that will remove objects from the object listeners when they have no observers.

Receive and parse binary data


WebSocket supports the transfer of binary data, files or streams, which is often used in large projects. It looks something like this:
0x80, <length - one or several bytes>, <message body>

In order not to create restrictions on the length of the transmitted message and at the same time not to spend bytes irrationally, the protocol developers used the following algorithm. Each byte in the length indication is considered separately: the most significant one indicates whether it is the last byte (0) or others follow it (1), and the lower 7 bits contain the data to be transmitted. Therefore, when the sign of the binary data frame 0x80 appears, the next byte is taken and deposited in a separate “piggy bank”. Then the next byte, if it has a high-order bit, is also transferred to the piggy bank, and so on, until a byte with a zero high-order bit is encountered. This byte is the last one in the length index and also folds into a piggy bank. Now the high bits are removed from the bytes in the "piggy bank", and the remainder is combined.

The mechanism of parsing and binary stream on the front end is complex and is associated with the mapping of data on the model. This can be a separate article. This time we will analyze a simple variant, and we will leave complex cases for the next publications, if there is interest in the topic.

Projecting (mapping) the information received on the model


Regardless of the type of transfer received, it is required to safely read and modify. There is no consensus on how to do this, I adhere to the theory of the data model , because I consider it logical and reliable for programming in OOP-style.
“A data model is an abstract, self-contained, logical definition of objects, operators, and other elements that together make up an abstract data access machine with which the user interacts. These objects allow you to model the data structure, and the operators - the behavior of the data. "

All sorts of popular tires that do not give an idea of ​​an object as a class in which behavior, structure, etc. are defined, create confusion, are less controlled and sometimes overgrown by what is not typical of them. For example, a dog class must describe a dog under any conditions. If the dog is perceived as a set of fields: the tail, color, muzzle, etc., then the dog may grow an extra paw, and another dog will appear instead of the head.

image

Control over model changes as new events arrive


At this point I will describe the task I encountered when working on the web interface of the mobile sports betting mobile application. The API of the application worked through the websockets through which it received: updating coefficients, adding and removing new types of bets, notifications about the beginning or end of the match, etc. - a total of about three hundred events of the website. During the match, the rates and information are continuously updated, sometimes 2-3 times per second, so the problem was that after them the interface was updated without intermediate controls.

When the user followed the bet from a mobile device, and at the same time lists were updated on his display, the bet disappeared from view, so the user had to look for the tracked bet again. This behavior was repeated for each update.

image

The solution required immobility for objects that were displayed on the screen, but the coefficients of the rates had to be changed, irrelevant rates became inactive, and new ones were not added until the user scrolls the screen. The outdated versions were not stored on the backend, so such lines needed to be memorized and marked with the flag “deleted”, for which an intermediate data store was created between the web socket and the subscription, which provided control over the changes.

In the new service we will also create a substitute layer and this time we will use Dexie.js  - a wrapper over the IndexedDB API, but any other virtual or browser database will do. Redux is allowed.

Ignore arbitrary events and cancel ignore


In one company, there are often several projects of the same type: mobile and web versions, versions with different settings for different groups of users, extended and reduced versions of the same application.

Often, they all use a single code base, so it is sometimes necessary to disable unnecessary events in runtime or during DI, without deleting the subscription and re-enable, i.e. ignore some of them, so as not to handle unnecessary events. This is a simple but useful feature that adds flexibility to the Pub / Sub bus.

Let's start with the description of the interfaces:

export interface IWebsocketService { // публичный интерфейс сервиса
    addEventListener<T>(topics: string[], id?: number): Observable<T>;
    runtimeIgnore(topics: string[]): void;
    runtimeRemoveIgnore(topics: string[]): void;
    sendMessage(event: string, data: any): void;
}
export interface WebSocketConfig { // конфиг при DI
    url: string;
    ignore?: string[];
    garbageCollectInterval?: number;
    options?: Options;
}
export interface ITopic<T> { // Топик для Pub/Sub
    [hash: string]: MessageSubject<T>;
}
export interface IListeners { // объект с топиками
    [topic: string]: ITopic<any>;
}
export interface IBuffer { // бинарный буфер из ws.message
    type: string;
    data: number[];
}
export interface IWsMessage { // ws.message
    event: string;
    buffer: IBuffer;
}
export interface IMessage { // Для демо
    id: number;
    text: string;
}
export type ITopicDataType = IMessage[] | number | string[]; // типизируем callMessage в сервисе

Inherit a Subject to create a garbage collector:

export classMessageSubject<T> extendsSubject<T> {
    constructor(
        private listeners: IListeners, // объект с топикамиprivate topic: string, // текущий топикprivate id: string // id сабжекта
    ) { 
        super();
    }
    /*
    * переопределяем стандартный next, 
    * теперь на очередное обращение при отсутствии подписок, 
    * будет вызываться garbageCollect
    */public next(value?: T): void {
        if (this.closed) {
            throw new ObjectUnsubscribedError();
        }
        if (!this.isStopped) {
            const {observers} = this;
            const len = observers.length;
            const copy = observers.slice();
            for (let i = 0; i < len; i++) {
                copy[i].next(value);
            }
            if (!len) {
                this.garbageCollect(); // выносим мусор
            }
        }
    }
    /*
    * garbage collector
    * */private garbageCollect(): void {
        delete this.listeners[this.topic][this.id]; // удаляем Subjectif (!Object.keys(this.listeners[this.topic]).length) { // удаляем пустой топик
            delete this.listeners[this.topic];
        }
    }
}

Unlike the previous implementation of websocket.events.ts, we will make it part of the web socket module

exportconst WS_API = {
    EVENTS: {
        MESSAGES: 'messages',
        COUNTER: 'counter',
        UPDATE_TEXTS: 'update-texts'
    },
    COMMANDS: {
        SEND_TEXT: 'set-text',
        REMOVE_TEXT: 'remove-text'
    }
};

To configure the module when connecting, create a websocket.config:

import { InjectionToken } from'@angular/core';
exportconst config: InjectionToken<string> = new InjectionToken('websocket');

Create a model for Proxy:

importDexie from 'dexie';
import { IMessage, IWsMessage } from './websocket.interfaces';
import { WS_API } from './websocket.events';
classMessagesDatabaseextendsDexie{ // это стандартное использование Dexie с typescript
    public messages!: Dexie.Table<IMessage, number>; // id is number in this case
    constructor() {
        super('MessagesDatabase'); // имя хранилищаthis.version(1).stores({  // модель стора
            messages: '++id,text'
        });
    }
}

A simple parser of models, in real conditions it is better to split it into several files:

exportconst modelParser = (message: IWsMessage) => {
    if (message && message.buffer) {
        /* парсим */const encodeUint8Array = String.fromCharCode
            .apply(String, newUint8Array(message.buffer.data));
        const parseData = JSON.parse(encodeUint8Array);
        let MessagesDB: MessagesDatabase; // IndexedDBif (message.event === WS_API.EVENTS.MESSAGES) { // IMessage[]if (!MessagesDB) {
                MessagesDB = new MessagesDatabase();
            }
                parseData.forEach((messageData: IMessage) => {
                    /* создаем транзакцию */
                    MessagesDB.transaction('rw', MessagesDB.messages, async () => {
                        /* создаем, если запись отсутствует */if ((await MessagesDB.messages
                            .where({id: messageData.id}).count()) === 0) {
                            const id = await MessagesDB.messages
                                .add({id: messageData.id, text: messageData.text});
                            console.log(`Addded message with id ${id}`);
                        }
                    }).catch(e => {
                        console.error(e.stack || e);
                    });
                });
            return MessagesDB.messages.toArray(); // возвращаем массив IMessage[]
        }
        if (message.event === WS_API.EVENTS.COUNTER) { // counterreturnnewPromise(r => r(parseData)); // промис с счетчиком
        }
        if (message.event === WS_API.EVENTS.UPDATE_TEXTS) { // textconst texts = [];
            parseData.forEach((textData: string) => {
                texts.push(textData);
            });
            returnnewPromise(r => r(texts)); // промис с массивом строк
        }
    } else {
        console.log(`[${Date()}] Buffer is "undefined"`);
    }
};

WebsocketModule:

@NgModule({
    imports: [
        CommonModule
    ]
})
export classWebsocketModule{
    publicstaticconfig(wsConfig: WebSocketConfig): ModuleWithProviders {
        return {
            ngModule: WebsocketModule,
            providers: [{provide: config, useValue: wsConfig}]
        };
    }
}

Let's start creating the service:

private listeners: IListeners; // список топиковprivate uniqueId: number; // соль для id подпискиprivate websocket: ReconnectingWebSocket; // объект вебсокетаconstructor(@Inject(config)private wsConfig: WebSocketConfig) {
    this.uniqueId = -1;
    this.listeners = {};
    this.wsConfig.ignore = wsConfig.ignore ? wsConfig.ignore : [];
    // коннектимсяthis.connect();
}
ngOnDestroy() {
    this.websocket.close(); // убиваем вебсокет при дестрое
}

Connect method:

private connect(): void {
    // ReconnectingWebSocket configconst options = {
        connectionTimeout: 1000, // таймаут реконнекта, если не задано
        maxRetries: 10, // попытки реконнекта, если не задано
        ...this.wsConfig.options
    };
    // Коннектимсяthis.websocket = new ReconnectingWebSocket(this.wsConfig.url, [], options);
    this.websocket.addEventListener('open', (event: Event) => {
        // соединение открытоconsole.log(`[${Date()}] WebSocket connected!`);
    });
    this.websocket.addEventListener('close', (event: CloseEvent) => {
        // соединение закрытоconsole.log(`[${Date()}] WebSocket close!`);
    });
    this.websocket.addEventListener('error', (event: ErrorEvent) => {
        // ошибка соединенияconsole.error(`[${Date()}] WebSocket error!`);
    });
    this.websocket.addEventListener('message', (event: MessageEvent) => {
        // диспатчим события в подпискиthis.onMessage(event);
    });
    setInterval(() => {
        // дублируем сборщик мусораthis.garbageCollect();
    }, (this.wsConfig.garbageCollectInterval || 10000));
}

Duplicate the garbage collector, will check subscriptions by timeout:

privategarbageCollect(): void {
    for (consteventinthis.listeners) {
        if (this.listeners.hasOwnProperty(event)) {
            const topic = this.listeners[event];
            for (const key in topic) {
                if (topic.hasOwnProperty(key)) {
                    const subject = topic[key];
                    // удаляем Subject если нет подписокif (!subject.observers.length) {
                        delete topic[key];
                    }
                }
            }
            Удаляем топик, если пуст
            if (!Object.keys(topic).length) {
                delete this.listeners[event];
            }
        }
    }
}

We look at which subscription to send an event:

private onMessage(event: MessageEvent): void {
    const message = JSON.parse(event.data); 
    for (const name inthis.listeners) {
        if (this.listeners.hasOwnProperty(name) && !this.wsConfig.ignore.includes(name)) {
            const topic = this.listeners[name];
            const keys = name.split('/'); // если подписаны на несколько событийconst isMessage = keys.includes(message.event);
            const model = modelParser(message); // получаем промис с моделямиif (isMessage && typeof model !== 'undefined') {
                model.then((data: ITopicDataType) => {
                    // отправляем в Subjectthis.callMessage<ITopicDataType>(topic, data);
                });
            }
        }
    }
}

Helmet event in Subject:

private callMessage<T>(topic: ITopic<T>, data: T): void {
    for (const key in topic) {
        if (topic.hasOwnProperty(key)) {
            const subject = topic[key];
            if (subject) {
                // отправляем подписчику
                subject.next(data);
            } else {
                console.log(`[${Date()}] Topic Subject is"undefined"`);
            }
        }
    }
}

Create a Pub / Sub topic:

private addTopic<T>(topic: string, id?: number): MessageSubject<T> {
    const token = (++this.uniqueId).toString();
    const key = id ? token + id : token; // уникальный id для токенаconst hash = sha256.hex(key); // SHA256-хэш в качестве id топикаif (!this.listeners[topic]) {
        this.listeners[topic] = <any>{};
    }
    returnthis.listeners[topic][hash] = new MessageSubject<T>(this.listeners, topic, hash);
}

Subscribe to one or more events:

public addEventListener<T>(topics: string | string[], id?: number): Observable<T> {
    if (topics) {
        // подписка на одно или несколько событийconst topicsKey = typeof topics === 'string' ? topics : topics.join('/');
        returnthis.addTopic<T>(topicsKey, id).asObservable();
    } else {
        console.log(`[${Date()}] Can't add EventListener. Type of event is "undefined".`);
    }
}

Here everything is intentionally simplified, but you can convert to binary entities, as is the case with the server. Sending commands to the server:

publicsendMessage(event: string, data: any = {}): void {
    // если соединение активно, шлем имя события и информациюif (event && this.websocket.readyState === 1) {
        this.websocket.send(JSON.stringify({event, data}));
    } else {
        console.log('Send error!');
    }
}

Add events to the ignorelist at runtime:

publicruntimeIgnore(topics: string[]): void {
    if (topics && topics.length) { 
        // добавляем в игнорлистthis.wsConfig.ignore.push(...topics);
    }
}

Delete events from the ignore list:

publicruntimeRemoveIgnore(topics: string[]): void {
    if (topics && topics.length) {
        topics.forEach((topic: string) => {
            // ищем событие в списке топиковconst topicIndex = this.wsConfig.ignore.findIndex(t => t === topic);
            if (topicIndex > -1) {
                // снова слушаем собтияthis.wsConfig.ignore.splice(topicIndex, 1);
            }
        });
    }
}

We connect the web socket module:

@NgModule({
    declarations: [
        AppComponent
    ],
    imports: [
        BrowserModule,
        ReactiveFormsModule,
        WebsocketModule.config({
            url: environment.ws, // или "ws://mywebsocketurl"// список игнорируемых событий
            ignore: [WS_API.EVENTS.ANY_1, WS_API.EVENTS.ANY_2],
            garbageCollectInterval: 60 * 1000, // интервал сборки мусора
            options: {
                connectionTimeout: 1000, // таймаут реконнекта
                maxRetries: 10// попытки реконнекта
            }
        })
    ],
    providers: [],
    bootstrap: [AppComponent]
})
export classAppModule{
}

We use in components:

@Component({
    selector: 'app-root',
    templateUrl: './app.component.html',
    styleUrls: ['./app.component.css']
})
export classAppComponentimplementsOnInit, OnDestroy {private messages$: Observable<IMessage[]>;
    private messagesMulti$: Observable<IMessage[]>;
    private counter$: Observable<number>;
    private texts$: Observable<string[]>;
    public form: FormGroup;
    constructor(
      private fb: FormBuilder, 
      private wsService: WebsocketService) {
    }
    ngOnInit() {
        this.form = this.fb.group({
            text: [null, [
                Validators.required
            ]]
        });
        // get messagesthis.messages$ = this.wsService
            .addEventListener<IMessage[]>(WS_API.EVENTS.MESSAGES);
        // get messages multithis.messagesMulti$ = this.wsService
            .addEventListener<IMessage[]>([
                WS_API.EVENTS.MESSAGES, 
                WS_API.EVENTS.MESSAGES_1
             ]);
        // get counterthis.counter$ = this.wsService
            .addEventListener<number>(WS_API.EVENTS.COUNTER);
        // get textsthis.texts$ = this.wsService
            .addEventListener<string[]>(WS_API.EVENTS.UPDATE_TEXTS);
    }
    ngOnDestroy() {
    }
    public sendText(): void {
        if (this.form.valid) {
            this.wsService
                .sendMessage(WS_API.COMMANDS.SEND_TEXT, this.form.value.text);
            this.form.reset();
        }
    }
    public removeText(index: number): void {
        this.wsService.sendMessage(WS_API.COMMANDS.REMOVE_TEXT, index);
    }
}

Service is ready to use.



The example from the article is not at all a universal solution for each project, but it does demonstrate one of the approaches to working with websockets in large and complex applications. You can take it on board and modify it depending on current tasks.

The full version of the service can be found on GitHub .

For all questions you can contact us in the comments, to me in Telegrams or on the Angular channel in the same place.

Also popular now: