In the previous post the services used eventuate to communicate. But how did it know to create events? How does the CDC module work? Well behind the scenes it is mining the transaction logs and create events from it. Enter Debezium. It’s build on top of Kafka and records the changes made to the database in kafka logs from where they are consumed.
It has 4 connectors available to this date:
- MySql
- MongoDB
- Oracle
- Postgres
This means you need to use one of these databases in order to be able to use it. You can follow the instructions described here to set it up. You can use the docker images to build your components or you can use the embedded version. Let’s break down the example from previous post and see debezium in action.
The eventuateio-local-cdc-service uses an embedded debezium engine which is configured in EventTableChangesToAggregateTopicRelay class. This command side of the CQRS puts the command into the AggregateRepository and events are processed from it. The events are persisted into the database.
public CompletableFuture<Object> startCapturingChanges() throws InterruptedException {
logger.debug("Starting to capture changes");
cdcStartupValidator.validateEnvironment();
producer = new EventuateKafkaProducer(kafkaBootstrapServers);
String connectorName = "my-sql-connector";
Configuration config = Configuration.create()
/* begin engine properties */
.with("connector.class",
"io.debezium.connector.mysql.MySqlConnector")
.with("offset.storage", KafkaOffsetBackingStore.class.getName())
.with("bootstrap.servers", kafkaBootstrapServers)
.with("offset.storage.topic", "eventuate.local.cdc." + connectorName + ".offset.storage")
.with("poll.interval.ms", 50)
.with("offset.flush.interval.ms", 6000)
/* begin connector properties */
.with("name", connectorName)
.with("database.hostname", jdbcUrl.getHost())
.with("database.port", jdbcUrl.getPort())
.with("database.user", dbUser)
.with("database.password", dbPassword)
.with("database.server.id", 85744)
.with("database.server.name", "my-app-connector")
//.with("database.whitelist", "eventuate")
.with("database.history",
io.debezium.relational.history.KafkaDatabaseHistory.class.getName())
.with("database.history.kafka.topic",
"eventuate.local.cdc." + connectorName + ".history.kafka.topic")
.with("database.history.kafka.bootstrap.servers",
kafkaBootstrapServers)
.build();
CompletableFuture<Object> completion = new CompletableFuture();
engine = EmbeddedEngine.create()
.using((success, message, throwable) -> {
if (success)
completion.complete(null);
else
completion.completeExceptionally(new RuntimeException("Engine failed to start" + message, throwable));
})
.using(config)
.notifying(this::handleEvent)
.build();
Executor executor = Executors.newCachedThreadPool();
executor.execute(() -> {
try {
engine.run();
} catch (Throwable t) {
t.printStackTrace();
}
});
logger.debug("Started engine");
return completion;
}
Now the CDC service enters and using debezium engine in async mode it reads the transaction log and pushes the events to Kafka topics. This is the producer. In order to see the topics you need to get on the kafka docker machine
docker run -i -t eventuateio/eventuateio-local-kafka bash
and run
./bin/kafka-topics.sh --zookeeper 10.200.10.1:2181 --list
The following topics should be present:
__consumer_offsets
com.so.aggregate.BidAggregate
com.so.aggregate.ItemAggregate
eventuate.local.cdc.my-sql-connector.history.kafka.topic
eventuate.local.cdc.my-sql-connector.offset.storage
We are interested in the aggregate topics as this is the place where our events will appear. Let’s start listening on the ItemAggregate topic
./bin/kafka-console-consumer.sh --zookeeper 10.200.10.1:2181 --topic com.so.aggregate.ItemAggregate
and insert an item.

We will see the event
{
"id":"0000015dec64a33f-0242ac1100070000",
"entityId":"0000015dec64a340-0242ac1100070000",
"entityType":"com.so.aggregate.ItemAggregate",
"eventData":"{\"itemCode\":\"1\",\"reservePrice\":400}",
"eventType":"com.so.events.item.CreateItemEvent"
}
The same applies for the BidAggregate topic
./bin/kafka-console-consumer.sh --zookeeper 10.200.10.1:2181 --topic com.so.aggregate.BidAggregate

And the event
{
"id":"0000015dec7be0e8-0242ac1100070000",
"entityId":"0000015dec7be0e9-0242ac1100070000",
"entityType":"com.so.aggregate.BidAggregate",
"eventData":"{\"itemCode\":\"1\",\"amount\":700}",
"eventType":"com.so.events.bid.CreateBidEvent"
}
Cool. This is the way Debezium contributes to event-sourcing microservices.
The consumer is the query side of the CQRS which listen on these topics and reacts by updating the materialized views.
Like this:
Like Loading...
1 thought on “Debezium – Transaction log miner”