Help! I want to change my event store

There are many TV series that include some form of time travelling where the events of the past can be altered. There are always many dangers involved, including the erasure of your own existence. Lucky for us, changing your event store or its contents does not carry that particular risk, and we can do it with the proper care. This blog post will dive into the different ways to change and the options to carry those out.

Not all changes are equal

There are many ways in which you would like to change your event store. During this blog, we will go into the following types of changes:

  • Changing a single event (or a small set of them) in place. For example, to correct a mistake.
  • Changing the location of the event store. For example, to migrate from an RDBMS to Axon Server or vice versa.
  • Changing the contents or format of all events. For example, to migrate from XStream to Jackson.
  • Encrypting the personal data of your events to be compliant with GDPR.

You can imagine that not all changes are ideal for the same methods. We will first go into all the methods, and then discuss the suitability for each type of change.

Migration methods

You can change your event store in a few ways. Some update the event store seamlessly in place, while others are a more difficult path of migration. There is Event Transformation, Event Replication and our own Migration tool. Let's dive into them all and see what you can use!

Event Transformation

When using an RDMS or NoSQL database, changing the events of the past is very easy. You just change the row or document containing the event, and all is fine again. Since Axon Server is an immutable event store it doesn't allow this... or does it?

Event Transformation is also available for Axon Server since version 2023.1.0. Let's take a look at how it works. First, we have to use the new connector version in your application, as the current Axon Framework version still contains a 4.6.x version:

<dependency>
<groupId>io.axoniq</groupId>
<artifactId>axonserver-connector-java</artifactId>
<version>2023.1.0</version>
</dependency>

Then, we can define a transformation by acquiring a connection from the AxonServerConnectionManager and define a change. It works like this:

  1. You define a range of events to rewrite by sequence number
  2. You filter these events on one or multiple conditions
  3. You transform the filtered events into something better
  4. You tell Axon Server to execute it

Let's take a look at the code! This particular sample filters and rewrites the entire event store:

public void doEventTransformation() {
AxonServerConnectionManager manager = ...; // Acquire your manager
AxonServerConnection connection = manager.getConnection();

Long startIndex = 0;
Long endIndex = connection.eventChannel().getLastToken().get();

EventSources
.range(connection::eventChannel, startIndex, endIndex) // 1
.filter(this::filter) // 2
.transform("My Change", this::transform) // 3
.execute(connection::eventTransformationChannel) // 4
.get();
}

public boolean filter(EventWithToken event) {
return true; // Filter on something
}

public void transform(Appender appender, EventWithToken event) {
appender.doSomethingWithMyEvent(event); // Do something here
}

When all selected events are streamed and changes have been made based on the filter and transformation they will be executed. This makes a copy of the existing event store with the changes inside of it. Then, it automatically switched from the old to the new store when you are done. The original store is kept until you remove it. You can read more about removing it in the AxonIQ documentation about Event Transformation.

Let's take a look at the different things you can achieve with this.

Deleting events

You can easily delete all events of a single type by defining the following filter and transform methods:

public boolean filter(EventWithToken event) {
String payloadType = eventWithToken.getEvent().getPayload().getType()
return payloadType.equals("com.mydomain.MyEvent");
}

public void transform(Appender appender, EventWithToken event) {
appender.deleteEvent(event.getToken());
}

Easy, right? You can of course also make more advanced filters to suit your needs. You have all the event data available to make a decision.

Change serialization format

You can change the serialization format using this method as well. Let's take a look at the following code:

Serializer xStreamSerializer = ...;
Serializer jacksonSerializer = ...;
EventUpcasterChain upcasterChain = ...;

private void transform(EventWithToken eventWithToken, Appender appender) {
var initialEventRepresentation = new InitialEventRepresentation(
new GrpcBackedDomainEventData(eventWithToken.getEvent()),
xStreamSerializer
);
var upcastedEvents = upcasterChain
.upcast(Stream.of(initialEventRepresentation))
.toList();

if(upcastedEvents.isEmpty()) {
appender.deleteEvent(eventWithToken.getToken());
return;
}

if(upcastedEvents.size() > 1) {
throw new IllegalArgumentException("More than 1 events resulting from an upcaster is not be supported!");
}

// Check if we know the event. If not, delete
var deserializedObject = upcastedEvents.get(0).getData();
if (deserializedObject instanceof UnknownSerializedType) {
// This class is not in the JVM. We cannot migrate it
appender.deleteEvent(eventWithToken.getToken());
}

// Serialize again using Jackson
var jacksonSerialized = jacksonSerializer.serialize(deserializedObject, byte[].class).getData();
var eventWithJackson = Event.newBuilder()
.mergeFrom(eventWithToken.getEvent())
.setPayload(SerializedObject.newBuilder()
.mergeFrom(eventWithToken.getEvent().getPayload())
.setData(ByteString.copyFrom(jacksonSerialized))
.build())
.build();
appender.replaceEvent(eventWithToken.getToken(), eventWithJackson);
}

As you can see, converting the serialization format is more difficult due to working with the low-level GRPC messages of Axon Server. We also need to account for the fact that we might not be able to deserialize an event, since the class might have been moved or deleted. But it's still perfectly doable!

You do need your system to be down for this, ideally. Changing the serialization format is a serious job. You could potentially do this with the system up by creating your own Serializer which can handle both old and new formats and writes in the new format. Then you can migrate only the old events using Event Transformation.

Elimination of upcasters

Upcasters are very lightweight, but can still take just that tiny bit of performance that you want to get back. You might want to get rid of them once you get quite a few.

Good news! We can use Event Transformation for this as well. It works almost identical to the previous sample. Since deserializing the event already invokes the UpcasterChain, the event is updated before storing. But since we don't want to change the serialization format, we just serialize it with the original Serializer again. Let's take a look:

Serializer serializer = ...;
EventUpcasterChain upcasterChain = ...;

private void transform(EventWithToken eventWithToken, Appender appender) {
var initialEventRepresentation = new InitialEventRepresentation(
new GrpcBackedDomainEventData(eventWithToken.getEvent()),
serializer
);
var upcastedEvents = upcasterChain
.upcast(Stream.of(initialEventRepresentation))
.toList();

if(upcastedEvents.isEmpty()) {
appender.deleteEvent(eventWithToken.getToken());
return;
}

if(upcastedEvents.size() > 1) {
throw new IllegalArgumentException("More than 1 events resulting from an upcaster is not be supported!");
}

// Check if we know the event. If not, delete
var deserializedObject = upcastedEvents.get(0).getData();
if (deserializedObject instanceof UnknownSerializedType) {
// This class is not in the JVM. We cannot migrate it
appender.deleteEvent(eventWithToken.getToken());
}

// Serialize again using Jackson
var serialized = serializer.serialize(deserializedObject, byte[].class).getData();
var upcastedEvent = Event.newBuilder()
.mergeFrom(eventWithToken.getEvent())
.setPayload(SerializedObject.newBuilder()
.mergeFrom(eventWithToken.getEvent().getPayload())
.setData(ByteString.copyFrom(serialized))
.build())
.build();
appender.replaceEvent(eventWithToken.getToken(), upcastedEvent);
}

Then, you do the rewrite and blast those upcasters right into git history! Since you are not changing the format, this can be done without any downtime.

Keep in mind that one-to-many upcasters are not supported. When using Event Transformation, the global sequence numbers have to stay intact. When deleting an event in Axon Server, the event entry technically still exists, but only as empty pointer. If you want more customizability, read on for the Event Replication method.

Still, it is very powerful to get rid of your upcasters. Nifty!

The Axon Migration tool

AxonIQ provides a tool to migrate from RDBMS or Mongo to Axon Server easily. It's a JAR you can run and it will read the event store from one place and port it to another. You can find it on Github together with all its options. It can run just once, keep tailing, and event filter on event types.

However, if you want to write a more precise migration, such as changing the serialized format of events, or filtering events based on a condition, then you need to resort to another method I call Event Replication.

Event Replication

The beauty of Axon Framework is that any component implementing the EventStore interface can be used for event streaming and event sourcing. The underlying storage mechanism used does not matter for Axon Framework (but will for its performance). So by streaming events from one store and storing them in a different one, we can migrate the events in any way we would like.

This is quite easy to achieve using just Axon Framework and an @EventHandler method. For example, this method publishes events into a secondary event store as-is:

@EventHandler
public void handle(TrackedEventMessage<?> message) {
secondStore.publish(message);
}

In this way, you can migrate from any source to any destination. You can migrate Postgres to Axon Server, you can migrate Axon Server to MongoDB, from context A to context B in Axon Server, and all else! There's no need to be locked into a vendor, so try out our Axon Server! ;)

There is one caveat to this method: At some point in time, you will have to switch the event store of your application and this needs to be done with care! You want all the writes done to the original event store to have been propagated to the second before switching over. So, you will have to wait until the processor is done, eliminate any writes coming in, bring the application down and bring it up with the new configuration.

With this in mind, let's jump into the various use-cases.

Change of serialization format

Migrating to another event format is quite easy as well. In this case, you provide a different Serializer to your secondStore's builder, which it uses to serialize the events. For example, for Axon Server it would look like this:

EventStore secondStore = AxonServerEventStore.builder()
.defaultContext(destinationContext)
.platformConnectionManager(manager)
.configuration(configuration)
// Use a Jackson Serializer, instead of XStream
.eventSerializer(JacksonSerializer.builder().build())
.build();

You can do two migrations in one go. If your application uses events in XML format in Postgres, you can migrate them to Axon Server and Jackson at the same time.

Changing events

We can also change or delete events while writing to the secondary event store. However, keep in mind that the sequence numbers of the aggregate need to remain intact! So when you are removing an event from an aggregate, you need the following events to be one index lower. So we need to keep a map of the sequences. One of the ways you can achieve this is in the following sample, where we delete an event while writing to Axon Server:

private final Map<String, Long> aggregateIndex = new HashMap<>();

@EventHandler
public void handleFormatChange(TrackedEventMessage<?> message) {
if (message instanceof DomainEventMessage<?> dem) {
// Aggregate ID involved. Sequences need to stay correct.
Long currentIndex = aggregateIndex.compute(dem.getAggregateIdentifier(), (k, v) -> {
Long index = v == null ? getCurrentSequence(dem) : v;
return index + 1;
});
var newMessage = new GenericDomainEventMessage(dem.getType(),
dem.getAggregateIdentifier(),
currentIndex,
dem.getPayload(),
dem.getMetaData(),
dem.getIdentifier(),
dem.getTimestamp());
secondStore.publish(newMessage);
} else {
// No aggregate ID. Can publish as-is
secondStore.publish(message);
}
}

private Long getCurrentSequence(DomainEventMessage<?> dem) {
try {
return manager.getConnection(destinationContext)
.eventChannel()
.findHighestSequence(dem.getAggregateIdentifier())
.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

Now you can do any filtering or changes you like before publishing the event. Such as deleting an event of a certain type:

@EventHandler
public void handle(TrackedEventMessage<?> message) {
String payloadType = message.getPayloadType().getName();
if(payloadType.equals("io.axoniq.inspector.api.QueryBusConfigurationUpdated")) {
return;
}
// ... other code
}

Elimination of upcasters

When following the previous example, you will have already eliminated upcasters. Before payloads are handed to the @EventHandler method, upcasting has taken place. By publishing this event to another store, the new representation is saved! Now you can get rid of those upcasters.

Additional migration concerns

When you are changing the sequence of events or their store, you need to change the tracking tokens in your database. The AxonServerEventStore uses a GlobalSequenceTrackingToken, while the EmbeddedEventStore, which you use with other databases use a GapAwareTrackingToken. Besides having a different format, the positions might have changed.

For example, the tracking token might have to change from the GlobalSequenceTrackingToken used by JDBC:

{"index": 523232, "gaps": [523222, 523231]}

to a GlobalSequenceTrackingToken used by Axon Server:

{"globalIndex": 50111}

As you can see, the position changed as well! RDBMS's reserve a set of sequences at once, and might not be used (or in order). For Axon Server the sequence is guaranteed, so it will most probably be lower.

Make sure to plan for this, as you will need downtime to do this. This is not necessary when using Event Transformation as it modifies already existing events in place, but is necessary for all other methods of migration.

 

Which method should I use?

Use your own best judgement when defining the method you want for your changes.

When you are using Axon Server, I highly recommend using Event Transformation as the effort required is very low. It saves the hassle of changing and repositioning the tokens.

When migrating to Axon Server it makes sense to use our migration tool! Take a look at the Github for more information.

For all other cases, write your custom migration by streaming one event store into another. This provides an insane amount of flexibility, as long as you watch your aggregate sequence numbers.

 

Conclusion

History is not set in stone. You are not locked into a vendor. You are not forever bound by one serialization format. You can change your event store, in many ways. Let me know if you have any questions by reaching out through our Discuss. Good luck with your migration!

Mitchell Herrijgers
Solutions Architect for Axon Framework at AxonIQ Mitchell is a software craftsman, passionate coder, and eager to learn. He likes complex challenges and he doesn't get discouraged easily. He is interested in the following topics; Kotlin/Java, Event-Sourcing / CQRS / Axon Framework, Cloud/AWS/Infrastructure as Code, and performance tuning. After his study, in Computer Science at the Rotterdam University of Applied Sciences, he worked as a software engineer at ING Bank, Codecentric, and the Port of Rotterdam.
Mitchell Herrijgers

Share: