Event Sourcing (3): Store Aggregate in EventStoreDB

Original link: https://teddy-chen-tw.blogspot.com/2022/06/3aggregateeventstoredb.html

June 29 05:39~09:08

截圖 2022-06-29 上午9.06.33

▲ The record of the ezKanban team discussing the design of the Repository

foreword

Today, I will store the Tag Aggregate written in the last episode to EventStoreDB ( https://eventstore.com ), a special-purpose open source database specially designed for Event Sourcing.

***

Prepare the environment

First install EventStoreDB, the official has made a ready-made Docker image file, you can use it directly without installing and executing the following commands:

docker run –name esdb-node -it -p 2113:2113 -p 1113:1113 eventstore/eventstore:latest –insecure –run-projections=All –enable-external-tcp –enable-atom-pub-over-http –start -standard-projections

As shown in Figure 1, open the browser and enter the following URL to connect to the EventStoreDB management screen: http://localhost:2113/web/index.html#/dashboard

截圖 2022-06-29 上午5.53.17

▲Figure 1: EventStoreDB management screen

Switch to the Stream Browser page, as shown in Figure 2. The Aggregate stored to EventStoreDB will then appear on this page.

截圖 2022-06-29 上午5.57.05

▲Figure 2: Stream Browser page of EventStoreDB

Then install the client driver of EventStoreDB. Teddy uses Maven to manage project dependencies and adds the following dependencies to the pom.xml file:

<dependency>
< groupId>com.eventstore</groupId>
<artifactId>db-client-java</artifactId>
<version>2.0.0</version>
</dependency>

Currently, the latest Java driver for EventStoreDB is 3.0.0, and the Teddy sample program uses version 2.0.0.

***

Repository design and implementation

In Domain-Driven Design (DDD), Aggregate is stored and read through the Repository design pattern. Figure 3 shows the AbstractRepository interface designed by Teddy. All Concrete Repository will implement this interface.

截圖 2022-06-29 上午6.24.26

▲Figure 3: AbstractRepository interface

Figure 4 shows the TagRepository interface, which implements AbstractRepository.

截圖 2022-06-29 上午6.24.42

▲Figure 4: TagRepository interface

Next, we discuss the implementation of TagRepository. Although the topic of this series is Event Sourcing, in practice many developers may be accustomed to storing data in relational databases due to company regulations or personal preferences. Therefore, Teddy hopes that the implementation of TagRepository can support both Event Sourcing and State Sourcing plus Transactional Outbox .

The final design is shown in Figure 5. TagRepository has two implementations, namely TagEventSourcingRepository and TagOutboxRepository. The former supports Event Sourcing, the latter supports State Sourcing plus Transactional Outbox. Today, we will discuss the implementation of TagEventSourcingRepository, and we will look at TagOutboxRepository in the next episode.

截圖 2022-06-29 上午6.35.32

▲Figure 5: Repository architecture design that supports Event Sourcing and State Sourcing plus Transactional Outbox

***

TagEventSourcingRepository Implementation

The TagEventSourcingRepository program is shown in Figure 6. Basically, it delegates everything to be done to the GenericEventSourcingRepository.

截圖 2022-06-29 上午7.01.36

▲Figure 6: TagEventSourcingRepository code

Figure 7 shows the implementation of GenericEventSourcingRepository. First, you see findById in line 26, which reads all domain events from the database according to the Aggregate stream name through eventSourcingStore (lines 28~29). Since Teddy applies Clean Architecture, the data written and read from the event source database has a unified interface AggregateRootData.

Lines 33~34 convert the DomainEventData on AggregateRootData into DomainEvent, and then line 36 uses Java Reflection technology to generate the specified Aggregate instance, and passes the domain events just read from the database to it. Aggregate’s constructor receives domain events and replays them in order to calculate the latest state. Finally, the Aggregate instance is returned, so that the Aggregate is successfully read from the event source database.

The save method on line 48 is relatively simple. First, the incoming Aggregate is converted into AggregateData in line 51, and then the AggregateData is stored through eventSourcingStore in line 52. After saving line 53, reset the Aggregate version, and then clear the domain events on the Aggregate on line 54 (because the state of the Aggregate has been stored in the database, so clear the domain events on it, otherwise the same Aggregate will be saved if it is stored again. Duplicate Domain Events).

截圖 2022-06-29 上午7.08.24

▲Figure 7: GenericEventSourcingRepository code

***

EsdbStore implementation

After reading the above for a long time, villagers may ask: “I still haven’t seen how the domain events are stored in EventStoreDB?” Because ezKanban supports different event source databases, the Repository implementation that adopts the event source storage method is The database is operated through the EventSourcingStore interface (please refer to Figure 5).

The EsdbStore category in Figure 8 is the implementation of EventSourcingStore provided by ezKanban for EventStoreDB, and the EventStoreDBClient category in line 14 is the EventStoreDB driver officially provided by EventStoreDB, through which developers finally connect to EventStoreDB and read and write data. If the villagers do not need ezKanban to apply Clean Architecture and support multiple event source databases, they can use EventStoreDBClient to operate EventStoreDB directly in the Repository implementation category, which will be much simpler.

Next, look at the save method on line 16, which accepts AggregateRootData as an input parameter. This category Teddy just introduced, it is a category designed by ezKanban to convert Aggregate into AggregateRootData and then pass it to the database in order to comply with the cross-layer principle of Clean Architecture. Lines 21 to 28 convert DomainEventData on AggregateRootData to EventData . EventData is a data structure designed by the EventStoreDB driver, and it is the object that is actually stored in the EventStoreDB.

When operating an associative database, the developer needs to specify a Table (data table), and the object operated by EventStoreDB is not a Table, but an event stream (event stream, which can be imagined as an append-only file). When opening the event stream, you can specify the opening mode, which is represented by the AppendToStreamOptions category in the EventStoreDB driver, as shown in lines 31 to 38 in Figure 8.

EventStoreDB supports optimistic locking . If line 32 of aggregateRootData.getVersion() returns -1, a new Aggregate is generated, and the parameter of expectedRevision in line 34 is set to ExpectedRevision.ANY . EventStoreDB does not perform optimistic locking checks when writing data in this mode. If a new Aggregate is not generated, as shown in line 37, set the parameter of expectedRevision to new StreamRevision(aggregateRootData.getVersion()). StreamRevision is used by the EventStoreDB driver to represent the version of the object in the event stream. Here, you can directly pass aggregateRootData.getVersion() as a parameter to it. When expectedRevision is set, EventStoreDB will start optimistic locking checking when writing data.

Finally, lines 39~41 write data to the database through the appendToStream method of the EventStoreDBClient class.

截圖 2022-06-29 上午7.37.55

▲Figure 8: EsdbStore code

Next, look at the load method on line 57 of Figure 8. It reads data from the event stream to which the Aggregate instance belongs (each Aggregate instance has a dedicated event stream to store its own domain events). Lines 63 to 71 call the getResolvedEvents method (refer to Figure 9) to read the data, and the data format read is the ResolvedEvent class defined by the EventStoreDB driver.

Add aggregateRootData on line 76, then convert the ResolvedEvent just read from the database to DomainEventData on lines 77~79, and finally return aggregateRootData on line 84.

截圖 2022-06-29 上午8.29.03

▲Figure 9: The getResolvedEvents method of the EsdbStore category

***

Execute test cases

After completing the entire implementation of TagRepository, rewrite the test case of the previous episode < Event Sourcing (2): Implementing Event Sourced Aggregate >, and replace the original InMemoryTagRepository with TagEventSourcingRepository, as shown in Figure 10.

截圖 2022-06-29 上午8.43.50

▲Figure 10: Replace the tagRepository in the test case with the TagEventSourcingRepository implementation

After executing the test case, open the EventStoreDB management screen and see a new event stream, as shown in Figure 11.

截圖 2022-06-29 上午8.53.37

▲Figure 11: See the newly added event stream from the EventStoreDB management screen

Click on this event stream and see that there is a piece of data in it, its type is the TagEvents$TagCreated event, and the data content is stored in JSON format, as shown in Figure 12.

截圖 2022-06-29 上午8.53.46

▲Figure 12: Watch the content of event stream with the EventStoreDB management screen

***

Next episode preview

The next episode introduces the implementation of TagRepository with State Sourcing and Transactional Outbox.

***

Youzo’s inner monologue: This article has been written for so long.

This article is reprinted from: https://teddy-chen-tw.blogspot.com/2022/06/3aggregateeventstoredb.html
This site is for inclusion only, and the copyright belongs to the original author.

Leave a Comment