Snapshots of events in Axonframework 3, improving performance

Axonframework Framework Review


Axonframework is a framework that implements several principles and design patterns such as:

CQRS separates the processing of requests for reading and writing data
Event Sourcing is when the application state is stored as a chain of events
DDD Aggregate is a domain object that stores the

final Application states in the form of a chain of events is the number of events stored and processed. Fortunately, Axonframework allows you to create a snapshot of events (snapshot event), which contains the result of several events (domain event).

Snapshots of events


Snapshot event (snapshot event) - these are the resulting values ​​of several events (domain event). This allows you to quickly recreate the state of the unit (Aggregate). It is important to understand that the snapshot is created from the events that were used for a specific Unit with a unique identifier.

For example ( Fig. 1 ), let's set up in the configuration a snapshot for every two events (threshold = 2 - for illustrative purposes of the example). In this case, when two events change the state of the Unit, one snapshot will be created with the resulting values ​​of the previous two events.


Figure 1. A snapshot of two events. (threshold = 2)

Consider an example more complicated ( Fig. 2), the configuration also contains a threshold of 2, so that a snapshot is created every two events. When 2 events change the state of the unit, one shot will be created. Further, the other 2 events change the state of the Unit and a new snapshot is not created, but an existing one is updated.


Fig.2 The result of a chain of events in one picture (threshold = 2)

Performance


On the one hand, when the application accumulates a long chain of events, it takes time to read and process a large number of events to recreate the state of the Unit. On the other hand, if you take a snapshot, the state of the unit will be recreated quickly, but it will take time to create the snapshot. It is necessary to find a balance between these two situations.


Fig.3 Performance without creating a snapshot


Fig.4 Performance with creating a snapshot (threshold = 3)

By default, a snapshot is created in the stream that called the scheduleSnapshot () method. This setting is not recommended for combat environments (see Fig.4 / entry).

Below is an example of code using ThreadPoolExecutor (...) which will provide a separate thread for creating a snapshot. In this case, our client will not notice the slowdown in the application and the allotted time to create a snapshot.

Code


To activate snapshot creation, you need to make small changes to the application code. The aggregate annotation indicates the name of the repository used in the code of the configuration class. In the configuration class, a threshold is specified for creating snapshots, a method for creating snapshots, a repository, etc.

AxonConfig.java

@Autowiredprivate EventStore eventStore;
@Beanpublic SpringAggregateSnapshotterFactoryBean springAggregateSnapshotterFactoryBean(){
   returnnew SpringAggregateSnapshotterFactoryBean();
}
@Beanpublic SpringAggregateSnapshotter snapshotter(ParameterResolverFactory parameterResolverFactory, EventStore eventStore, TransactionManager transactionManager){
   Executor executor = Executors.newFixedThreadPool(10);
   returnnew SpringAggregateSnapshotter(eventStore, parameterResolverFactory, executor, transactionManager);
}
@Bean("reservationRepository")
public EventSourcingRepository<Reservation> reservationRepository(Snapshotter snapshotter, ParameterResolverFactory parameterResolverFactory){
   returnnew EventSourcingRepository<Reservation>(reservationAggregateFactory(), eventStore, parameterResolverFactory, new EventCountSnapshotTriggerDefinition(snapshotter, 50));
}
@Bean(name = "reservationAggregateFactory")
public AggregateFactory<Reservation> reservationAggregateFactory(){
   SpringPrototypeAggregateFactory<Reservation> aggregateFactory = 
   new SpringPrototypeAggregateFactory<>();
   aggregateFactory.setPrototypeBeanName("reservation");
   return aggregateFactory;
}

Reservation.java

@Aggregate(repository = "reservationRepository")
publicclassReservation{
	//…
}

It is worth noting that the Google Groups discussion thread contains useful code examples and discussions.

Selecting a threshold for taking snapshots



5.1. Theoretical path

Calculate the number of events that can be applied to an Aggregate in the EventListener class. Then, theoretically, we estimate the average number of events applied to the Unit in a typical situation, and we set the value somewhat less than this as the threshold for creating snapshots. This can be done if the application is only created and there is no real data for analysis.

5.2. Practical path

Let's analyze data from the database, while we assume that the database is used by MongoDB and it works inside the docker container.

> docker exec -it <container-id> mongo 
> show dbs
admin          	0.000GB
axonframework	0.000GB
local          	0.000GB
> use axonframework
switched to db axonframework
> show collections
domainevents
sagas
snapshotevents
> db.domainevents.findOne()
{
 “_id” : ObjectId(“5bb1dc8d4446d63bcc765feb”),
 “aggregateIdentifier” : “b1e320d5–58aa-4b9b-a667-aa724900592f”,
 “type” : “Reservation”,
 “sequenceNumber” : NumberLong(0),
 “serializedPayload” : “<com.example.ReservationStarted><reservationIdentifier>b1e320d5–58aa-4b9b-a667-aa724900592f</reservationIdentifier><duration resolves-to=\”java.time.Ser\”><byte>1</byte><long>2400</long><int>0</int></duration></com.example.ReservationStarted>”,
 “timestamp” : “2018–10–01T08:36:29.434Z”,
 “payloadType” : “com.example.ReservationStarted”,
 “payloadRevision” : null,
 “serializedMetaData” : “<meta-data><entry><string>traceId</string><string>b090b86a-ec89–484b-ae9f-e4fa0f9bcd39</string></entry><entry><string>correlationId</string><string>b090b86a-ec89–484b-ae9f-e4fa0f9bcd39</string></entry></meta-data>”,
 “eventIdentifier” : “f324f021–50b4–4e91–84d0-f8c4425f3eb9”
}

Each stored event contains an aggregateIdentifier field, by which we count the number of events applied to each Aggregate by a simple query:

db.domainevents.aggregate([ 
    {$group: {_id: "$aggregateIdentifier", count: {$sum: 1} } },
    {$sort : {count : -1} }
]);
{ "_id" : "0d84afd1-f199-45c8-b50e-7d9ebfa4c8fb", "count" : 136 }
{ "_id" : "49de7c32-38ea-435a-b837-ccdb61ec0baa", "count" : 136 }
{ "_id" : "12957b0b-af05-47c4-a3d8-968b75cf9ffb", "count" : 136 }
{ "_id" : "97a24559-ee3a-43e7-a6be-1eb6840b662a", "count" : 132 }
{ "_id" : "b6aeb1af-0620-4b02-8de3-c2446c2f7d83", "count" : 132 }
{ "_id" : "b385aaf4-3338-489f-8d1b-4600d5e088b9", "count" : 132 }
{ "_id" : "5970327f-9551-4945-94e9-3844c0cd3543", "count" : 132 }
...
{ "_id" : "0182239h-3948-3334-98t5-9643j4ld8346", "count" : 1 }

The threshold for creating snapshots can be selected below the average so that snapshots are created effectively. In this case, the value of 50 is fine.

Verification of snapshot activation


> mongo
> show dbs
admin          	0.000GB
axonframework	0.000GB
local          	0.000GB
> use axonframework
> show collections
domainevents
sagas
snapshotevents
> db.domainevents.count()
515
> db.snapshotevents.count()
7

If the snapshotevents collection is not empty and contains snapshots, then snapshot creation is activated successfully.

Other snapshot capabilities


The documentation also mentions other variations on the activation of creating snapshots, for example:

  • the number of events created since the last snapshot exceeded the threshold
  • time to initialize the unit has expired
  • time delay, etc. etc.

Also popular now: