Microservices

Debezium – Transaction log miner

In the previous post the services used eventuate to communicate. But how did it know to create events? How does the CDC module work? Well behind the scenes it is mining the transaction logs and create events from it. Enter Debezium. It’s build on top of Kafka and records the changes made to the database in kafka logs from where they are consumed.

It has 4 connectors available to this date:

  • MySql
  • MongoDB
  • Oracle
  • Postgres

This means you need to use one of these databases in order to be able to use it. You can follow the instructions described here to set it up. You can use the docker images to build your components or you can use the embedded version. Let’s break down the example from previous post and see debezium in action.

The eventuateio-local-cdc-service uses an embedded debezium engine which is configured in EventTableChangesToAggregateTopicRelay class. This command side of the CQRS puts the command into the AggregateRepository and events are processed from it. The events are persisted into the database.

 public CompletableFuture<Object> startCapturingChanges() throws InterruptedException {

    logger.debug("Starting to capture changes");

    cdcStartupValidator.validateEnvironment();

    producer = new EventuateKafkaProducer(kafkaBootstrapServers);

    String connectorName = "my-sql-connector";
    Configuration config = Configuration.create()
                                    /* begin engine properties */
            .with("connector.class",
                    "io.debezium.connector.mysql.MySqlConnector")

            .with("offset.storage", KafkaOffsetBackingStore.class.getName())
            .with("bootstrap.servers", kafkaBootstrapServers)
            .with("offset.storage.topic", "eventuate.local.cdc." + connectorName + ".offset.storage")

            .with("poll.interval.ms", 50)
            .with("offset.flush.interval.ms", 6000)
                                    /* begin connector properties */
            .with("name", connectorName)
            .with("database.hostname", jdbcUrl.getHost())
            .with("database.port", jdbcUrl.getPort())
            .with("database.user", dbUser)
            .with("database.password", dbPassword)
            .with("database.server.id", 85744)
            .with("database.server.name", "my-app-connector")
            //.with("database.whitelist", "eventuate")
            .with("database.history",
                    io.debezium.relational.history.KafkaDatabaseHistory.class.getName())
            .with("database.history.kafka.topic",
                    "eventuate.local.cdc." + connectorName + ".history.kafka.topic")
            .with("database.history.kafka.bootstrap.servers",
                    kafkaBootstrapServers)
            .build();

    CompletableFuture<Object> completion = new CompletableFuture();
    engine = EmbeddedEngine.create()
            .using((success, message, throwable) -> {
              if (success)
                completion.complete(null);
              else
                completion.completeExceptionally(new RuntimeException("Engine failed to start" + message, throwable));
            })
            .using(config)
            .notifying(this::handleEvent)
            .build();

    Executor executor = Executors.newCachedThreadPool();
    executor.execute(() -> {
      try {
        engine.run();
      } catch (Throwable t) {
        t.printStackTrace();
      }
    });

    logger.debug("Started engine");
    return completion;
  }


Now the CDC service enters and using debezium engine in async mode it reads the transaction log and pushes the events to Kafka topics. This is the producer. In order to see the topics you need to get on the kafka docker machine

 docker run -i -t eventuateio/eventuateio-local-kafka bash

and run

./bin/kafka-topics.sh --zookeeper 10.200.10.1:2181 --list

The following topics should be present:

__consumer_offsets
com.so.aggregate.BidAggregate
com.so.aggregate.ItemAggregate
eventuate.local.cdc.my-sql-connector.history.kafka.topic
eventuate.local.cdc.my-sql-connector.offset.storage

We are interested in the aggregate topics as this is the place where our events will appear. Let’s start listening on the ItemAggregate topic

./bin/kafka-console-consumer.sh --zookeeper 10.200.10.1:2181 --topic com.so.aggregate.ItemAggregate

and insert an item.

create_item

We will see the event

 
   "id":"0000015dec64a33f-0242ac1100070000",
   "entityId":"0000015dec64a340-0242ac1100070000",
   "entityType":"com.so.aggregate.ItemAggregate",
   "eventData":"{\"itemCode\":\"1\",\"reservePrice\":400}",
   "eventType":"com.so.events.item.CreateItemEvent"
}

The same applies for the BidAggregate topic

./bin/kafka-console-consumer.sh --zookeeper 10.200.10.1:2181 --topic com.so.aggregate.BidAggregate

create_bid3

And the event

 
   "id":"0000015dec7be0e8-0242ac1100070000",
   "entityId":"0000015dec7be0e9-0242ac1100070000",
   "entityType":"com.so.aggregate.BidAggregate",
   "eventData":"{\"itemCode\":\"1\",\"amount\":700}",
   "eventType":"com.so.events.bid.CreateBidEvent"
}

Cool. This is the way Debezium contributes to event-sourcing microservices.

The consumer is the query side of the CQRS which listen on these topics and reacts by updating the materialized views.

1 thought on “Debezium – Transaction log miner”

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.