Event Source (17): Read the events stored in the hand-carved Event Store

July 7 18::09~19:21; 21:15~23:23; July 8 13:45~16:27

截圖 2022-07-08 下午4.25.53

▲Store Checkpoints in the Event Store


Although using EventStoreDB, a database specially designed for Event Sourcing and CQRS, can reduce a lot of development work, in practice developers may only use relational databases due to company requirements or project constraints. In this case, you must simulate the Event Store with an associative database yourself.

In < Event Sourcing (4): Storing Aggregate in Outbox Store >, Teddy introduced how to use relational database to store table data and domain events of traditional ORM at the same time, but he has not talked about how to read these events. to introduce this issue.


Guaranteed storage order of events

Teddy uses the open source software Message DB ( https://github.com/message-db/message-db ) used by ezKanbna as an example to introduce how it ensures the order of all events when writing. Figure 1 is the command used by Message DB to create a storage event table. The global_position field in line 7 brings in the current maximum value of this field and adds 1 each time a new piece of data is added (simply think that this is an automatic increase field), through which the sequence of events is maintained.

No, it’s that simple.

截圖 2022-07-07 下午6.24.22

▲Figure 1: Message DB generates an instruction to store the event table


At least once

Message DB itself only contains the code required to use PostgreSQL as an Event Store, and does not provide a client program. Users must write it themselves, and there is no direct support for at least once. Message DB originally belonged to a module (sub-project) in the Eventide Project . Eventide is an open source software for Event Sourcing and Pub/Sub that supports the Ruby language. For the usage method, please refer to the official document of Eventide .

Eventide’s Consumer program is developed in Ruby, and Teddy has never used it. From its official documents (Figure 2), it is not clear whether it has the function of providing at least once. Figure 2, point 4 mentioned: “Consumer will automatically write the position read by the client to the backend periodically”. In this case, it is assumed that the backend has read the data but has not yet processed it, but the Consumer writes the client The read data position is written to the backend (meaning the data is read away), which may cause message loss.

截圖 2022-07-07 下午6.54.51

▲Figure 2: It is not clear from the Eventide file whether there is support for at lease once


Regardless of whether the Ruby client program provided by Eventide, the “native family” of Message DB, supports at least once, Teddy explains the common methods of how to achieve at least once. The way mentioned in ” Enterprise Integration Patterns ” is to use the Transactional Client design pattern. Its concept is the Pulsar approach introduced by Teddy in the previous episode < Event Sourcing (16): Event Semantics of Distributed Systems and Idempotent >, the client After confirming that the event is processed, send an ack to the server, similar to the commit command of the database, to complete the transaction, as shown in Figure 3.

截圖 2022-07-07 下午7.08.27

▲Figure 3: The event that is read out after the Consumer sends an ack to the Server will be deleted from the Topic


If you implement a driver that reads the event table in the database yourself, how can you make a similar effect? The practice is not difficult, as long as a checkpoint is stored for each Consumer to represent the number of events it has read so far. When the Consumer reads the event and processes it, it adds 1 to the checkpoint (if it is a batch processing event, it can add N at a time), which means that it has read an event. The event is not actually deleted from the database, but the value of the checkpoint represents the position of each Consumer read.

Basically, checkpoint is like the index used when reading the array. An event stream can support multiple Consumer reading at the same time, and the progress (position) of each Consumer reading is different. Therefore , each Consumer should take a unique name, and use this name as the name of the checkpoint to record the reading progress of each Consumer .

With this approach, if you want to reread the entire stream, just delete the checkpoint. Simple, right.

Now the question is, where should this checkpoint exist? Referring to the official documentation of EventStoreDB , checkpoints can be stored on the server or client side to form two different subscriptions (Consumer):

  • Persistent Subscription : Store the checkpoint on the server side. As shown in Figure 4, line 113 calls the ack method to mark which event has been processed and update the value of the checkpoint on the server side. However, because the Persistent Subscription of EventStoreDB supports the Competing Consumer (competing consumer) introduced in the previous episode and has the function of automatic rerety (re-send events) , the order of events is not guaranteed (the order of events received by the Consumer may be different from that of the database). are stored in different order). The documentation for EventStoreDB recommends using its Catch-up Subscription if ordering is to be guaranteed.
  • Catch-up Subscription : Catch-up Subscription does not store checkpoints on the server side. Consumers need to save checkpoints by themselves. When the Consumer is connected to the database, it tells the database which event stream to read, and where to start reading, as shown in Figure 5.

截圖 2022-07-07 下午9.51.53

▲Figure 4: The Persistent Subscription of EventStoreDB saves the checkpoint on the server side.

截圖 2022-07-07 下午10.10.42

▲Figure 5: Example of the Catch-up Subscription specified read location program on the official website of EventStoreDB


Implement Persistent Consumer

Having said so much, the next step is to write the Persistent Consumer to store the checkpoint in the Message DB. Because it is an Event Sourcing system, there will also be an event stream representing the Consumer in the checkpoint on the database side. Please refer to Figure 6, the value of the stream_name field is $$Checkpoint-ezkanban-11, where the prefix ” $$Checkpoint- ” means it is a stream generated by the system to store checkpoints, and ezKanban-11 is the Consumer name. The value of the type field is $System$Checkpointed , which means it is an event that generates or updates a checkpoint. Finally, the data field stores {“position”: 5}, which represents the value of the checkpoint, and currently reads the fifth position of the event stream.

In theory, because it is an Event Sourcing system, each time the value of the checkpoint is updated, a new event should be written, and then the last data of the checkpoint stream is the latest read position. Here Teddy uses the traditional CRUD approach, only storing the latest checkpoint data. That is to say, updating the checkpoint does not write a new event, but directly updates the data field of the original event.

截圖 2022-07-08 下午2.07.48

▲Figure 6: Checkpoint is stored in the event stream representing Consumer

Figure 7 is the code for generating the PresentConsumer. Line 50 determines whether the Consumer is created for the first time. If it is called the _writeMessage method in line 51, a new event stream is generated and a checkpoint=0 data is written.

截圖 2022-07-08 下午2.50.37

▲Figure 7: Checkpoint is stored in the event stream representing Consumer

In order to read events from the event stream, the PresentConsumer must periodically query the Event Store, as shown in Figure 8. Lines 35 to 41 get the value of the checkpoint, line 43 executes a while(true) loop, and reads the event of $all stream from the specified checkpoint position on line 45 (the purpose of this Consumer is to monitor all system events ). After reading the data, you can process them. After processing, line 51 calls the ack method to write the new checkpoint position to the database. Here is an implementation detail to pay attention to: under normal circumstances, the 44th itinerary will set the batch size parameter of “read at most a few pieces of data at a time” when reading events, in case there is a very large amount of data in the event stream. , causing the Consumer to get stuck or even crash (possibly running out of memory).

If there is no new event in the monitored event stream, it will simply sleep for a period of time (polling interval) and check again.

截圖 2022-07-08 下午2.48.33

▲Figure 8: The run method of PresentConsumer

Figure 9 shows the ack code. First, determine whether the event stream exists (76~79), and whether the position of the checkpoint is greater than the position of the last event in the event stream (81~84), and finally check that the stream name is not built in the system. stream. If all is well, write a new checkpoint (line 90).

截圖 2022-07-08 下午2.59.45

▲Figure 9: ack method


next episode preview

After introducing how to record checkpoint on the server side to achieve event delivery at least once, the next episode will introduce how to implement Idempotent.


Youzo’s inner monologue: The serialization is nearing its end.

This article is reprinted from https://teddy-chen-tw.blogspot.com/2022/07/17event-store.html
This site is for inclusion only, and the copyright belongs to the original author.

Leave a Comment