Event Sourcing (4): Store Aggregate to Outbox Store

Original link: https://teddy-chen-tw.blogspot.com/2022/07/4aggregateoutbox-store.html

June 30 10:41~12:02; 12:56~15:24

截圖 2022-06-30 上午11.35.04

▲Figure 1: Outbox storage method, the database contains the data tables required by State Sourcing and Transactional Outbox


In this episode, the traditional State Sourcing method is used to store the Tag Aggregate in the PostgreSQL relational database. In addition to storing the Tag Aggregate data in the database table through the ORM tool, Transactional Outbox also needs to be applied in the same transaction where the Tag is stored. And store the domain events on it in the consular events table of the database. We refer to this method of storing both existing state and domain events in the same transaction as Outbox for short.


Prepare the environment

As shown in Figure 1, the database using Outbox needs to have a preset table for storing domain events. Teddy uses the open source software Message DB (https://github.com/message-db/message-db ). It has been designed to store the database table of events (the table name is messages ), please refer to its official website for details.

Figure 2 shows the database screen of ezKanban using Message DB. The messages table is created by Message DB, and other tables such as board, board_content, board_member, and card are created automatically by ORM (ezKanban uses JPA to automatically generate these tables).

截圖 2022-06-30 上午11.53.44

▲Picture 2: ezKanban adopts Outbox’s database pen grid (part of the screen)


TagOutboxRepository implementation

The previous episode <Event Sourcing (3): Storing Aggregate in EventStoreDB > already talked about how the design of Repository supports both Event Sourcing and Outbox data storage methods. This episode directly implements the TagOutboxRepository category. The program is shown in Figure 3. Its implementation is similar to TagEventSourcingRepository. The difference is that TagEventSourcingRepository delegates work to GenericEventSourcingRepository, while TagOutboxRepository delegates to GenericOutboxRepository .

截圖 2022-06-30 下午12.59.39

▲Figure 3: TagEventSourcingRepository code

Figure 4 shows the GenericOutboxRepository implementation, first see line 11, which accepts two generic parameters: AggregateRoot and OutboxData . The former is used to indicate which Concreate Aggregate the GenericOutboxRepository is used for, and the latter is the data that the Concreate Aggregate needs to store in the database through Outbox.

截圖 2022-06-30 下午1.07.05

▲Figure 4: GenericOutboxRepository category

Figure 5 shows the TagData category. Basically, it has all the properties of Tag Aggregate that need to be stored in the database (lines 17~25), plus JAP annotations such as @Id and @Column. The streanName on the 14th line and the domainEventDatas on the 16th line are used to save the data of the domain event, which will eventually be stored in the database table of messages. The version in the last 26~28 lines is the attribute used to support optimistic locking.

截圖 2022-06-30 下午1.19.46

▲Figure 5: TagData category

Continue to see findById in line 23 of Figure 4. In line 25, it directly finds the Outbox Data object representing the Aggregate from the database according to the Aggregate id through the OutboxStore interface. Take Tag Aggregate, for example, the implementation of this Outbox Data object is TagData. Then line 27 converts the Outbox Data object to Aggregate through OutboxMapper and returns it; taking Tag as an example, convert TagData to Tag and then return Tag to the caller of findById.

Next, we see the save method on line 33. First, the incoming Aggregate is converted into Outbox Data on line 35, and then the Outbox Data is stored in the database on line 36 through the OutboxStore interface. After saving, line 37 resets the Aggregate version, and then clears the domain events on the Aggregate on line 38 (because the state of the Aggregate has been stored in the database, so clear the domain events on it, otherwise the same Aggregate will be stored again if it is stored again. store repeating realm events).

There is an important point here, which is the store.save() method on line 36. Take Tag Data as an example, it will first store the Tag Data in the Tag Table in the database, and then store the domain events on it to the messages Table in the database. These two storage actions will be executed in the same transaction to ensure that the state of the tag is correct.


PostgresOutboxStore implementation

The above-mentioned OutboxStore interface has a PostgresOutboxStore category in ezKanban to implement it, as shown in Figure 6. It can be seen that PostgresOutboxStore entrusts the work to PostgresOutboxStoreClient , and the program that really deals with the database is written on it.

截圖 2022-06-30 下午1.44.49

▲Figure 6: PostgresOutboxStore category

Figure 7 shows the PostgresOutboxStoreClient code, which stores data to the ORM table and the messages table through OrmStoreClient and PostgrresMessageStoreClient , respectively. The bottom layer of ezKanban adopts the SpringBoot framework, and transfers transaction processing to SpringBoot management ( @Transactional annotation on lines 22 and 36).

The save method on line 23 first calls the saveAndUpdateVersion method to store the ORM data, and then calls saveDomainEventsWithoutVersion on line 25 to store domain events.

The delete method on line 36 first calls deleteById to delete the data in the ORM table, and then calls saveDomainEventsWithoutVersion to store domain events.

截圖 2022-06-30 下午1.54.33

▲Figure 7: PostgresOutboxStoreClient category

Finally, we see the implementation of OrmStoreClient and PostgrresMessageStoreClient , as shown in Figure 8 and Figure 9. The former inherits SpringBoot’s CrudRepository, and the code is very simple. Teddy will not explain much.

截圖 2022-06-30 下午2.12.52

▲Figure 8: OrmStoreClinet category

PostgresMessageStoreClient is a Java client driver written by ezKanban for Message DB . In fact, Message DB only designs messages Table scheam and writes several functions of PostgreSQL database so that client programs can write and read domain events. There is no Java driver provided so that Java clients can directly read and write the database. The writeMessage method on line 28 in Figure 9 is the driver written for it by ezKanban, as long as this method is called directly, the domain events can be written into the Message DB .

截圖 2022-06-30 下午2.19.01

▲Figure 9: PostgresMessageStoreClient category


Execute test cases

After completing the implementation of TagOutboxRepository, rewrite the test case of the previous episode < Event Source (3): Store Aggregate in EventStoreDB >, and inject TagRepository into TagOutboxRepository, as shown in Figure 10.

截圖 2022-06-30 下午2.57.36

▲Figure 10: Replacing tagRepository in the test case with TagOutboxRepository implementation

After executing the test case, open the PostgreSQL management screen, and see that the tag Table and the messages Table have added a new data, as shown in Figure 11.

截圖 2022-06-30 下午2.56.24

▲Figure 11: See the newly added data from the PostgreSQL management screen


Next episode preview

Both storage methods, Event Sourcing and Outbox, have been completed, but there is one small detail that has not been explained, that is, how to store the “Event Type” in the database. The next episode will discuss this issue.


Youzo’s inner monologue: There are many details to deal with.

This article is reprinted from: https://teddy-chen-tw.blogspot.com/2022/07/4aggregateoutbox-store.html
This site is for inclusion only, and the copyright belongs to the original author.

Leave a Comment