How to scale event processors

Modern applications scale horizontally very easily, for which we can be grateful to cloud providers and container platforms like Kubernetes. Axon Framework, together with Axon Server, is very capable of scaling commands and queries this way. Scale your application horizontally and you can handle more commands and queries. However, one part does not scale automatically: Event Processor segments. Let's build a solution that does that!

Explainer: What are event processor segments?

A segment of an event processor is an independent part of your event stream that is being handled by the event processor. If you have 4 segments, and the split is even, each segment processes 25% of the event stream. This number can be as high as you want, so with 64 segments each processes 1.56% of the event stream. You can read more about how event processors work and how to tune them in my earlier blog.

The Admin API

The admin API introduced in Axon Server 4.6.0 is going to be our main tool for this job. We can use it to retrieve the number of connected applications and scale them accordingly. You can acquire the Admin API channel like this:

@Service
class AutoscalingService(
private val axonServerConnectionManager: AxonServerConnectionManager
) {

@Scheduled(fixedDelay = 60000)
fun doScale() {
val adminChannel = axonServerConnectionManager.connection.adminChannel()
}
}

As you can see, the sample contains a Kotlin-based Spring service that is invoked every minute through the Spring scheduling support. It acquires the admin channel, and now, we can get to work!

Creating our scaling mechanism

To determine if we need to split or merge segments based on the current scaling, we have to know:

  • How many segments we want per instance
  • How many instances are connected
  • How many segments we currently have

Let's first determine the amount of segments we want and let's call this the "Split Factor". This can be configurable via a Spring property as in the next example. Personally, I think it is nice to default to 2, but you can tune this further yourself. 

@Service
class AutoscalingService(
private val axonServerConnectionManager: AxonServerConnectionManager,
@Value("\${axon.autoscaling.split-factor:2}")
private val splitFactor: Int,
) {

Now we need to know how many instances are connected. Let's check this via the admin API! First, we need to retrieve the processor information we want. Let's create a method for this:

fun getProcessor(component: String, processorName: String, tokenStoreIdentifier: String): EventProcessor? {
    val adminChannel = axonServerConnectionManager.connection.adminChannel()
    val stream = adminChannel.eventProcessorsByComponent(component)
    var next = stream.next()

    while (next != null) {
        if (next.identifier.processorName == processorName
            && next.identifier.tokenStoreIdentifier == tokenStoreIdentifier) {
            return next
        }
        next = stream.next()
    }
    return null
}

This method loops through the stream and returns the event processor we are looking for. We can then use this to determine the amount of connected applications and segments in our main method:

@Scheduled(fixedDelay = 60000)
fun doScale() {

    eventProcessorConfiguration.eventProcessors().forEach { name, processor ->
        val tokenStore = eventProcessorConfiguration.tokenStore(name)
        val identifier = tokenStore.retrieveStorageIdentifier().orElseThrow()
        val processorAxonServer = getProcessor("my-component", processor.name, identifier)

        if(processorAxonServer != null) {
            val count = processorAxonServer.clientInstanceCount
            val segmentCount = processorAxonServer.clientInstanceList.sumOf { it.claimedSegmentCount}
        }
    }
}

Now we have all we need to scale! We multiple the instances by our scale factor and compare the amount of segments. Then, we either split, merge, or do nothing at all. Note that we only split/merge one segment at a time to scale gradually and give the processors time to claim the tokens.

val desiredCount = processorAxonServer.clientInstanceCount  * splitFactor
val segmentCount = processorAxonServer.clientInstanceList.sumOf { it.claimedSegmentCount}

if(desiredCount > segmentCount) {
    adminChannel.splitEventProcessor(processor.name, identifier)

} else if (segmentCount > desiredCount) {
    adminChannel.mergeEventProcessor(processor.name, identifier)
}

And that's it. Start up the service and test it out.

The full code

You can check out the full code down here. Feel free to adjust it in any way necessary for your specific use-case to work. 

@Service
class AutoscalingService(
    private val axonServerConnectionManager: AxonServerConnectionManager,
    @Value("\${axon.autoscaling.split-factor:2}")
    private val splitFactor: Int,
    private val eventProcessorConfiguration: EventProcessingConfiguration,
    @Value("\${spring.application.name}")
    private val componentName: String,
) {

    private val adminChannel = axonServerConnectionManager.connection.adminChannel()


    @Scheduled(fixedDelay = 60000)
    fun doScale() {
        eventProcessorConfiguration.eventProcessors().forEach { (name, processor) ->
            val tokenStore = eventProcessorConfiguration.tokenStore(name)
            val identifier = tokenStore.retrieveStorageIdentifier().orElseThrow()
            val processorAxonServer = getProcessor(componentName, processor.name, identifier)
            if(processorAxonServer != null) {
                val desiredCount = processorAxonServer.clientInstanceCount  * splitFactor
                val segmentCount = processorAxonServer.clientInstanceList.sumOf { it.claimedSegmentCount}
                if(desiredCount > segmentCount) {
                    adminChannel.splitEventProcessor(processor.name, identifier)
                } else if (segmentCount > desiredCount) {
                    adminChannel.mergeEventProcessor(processor.name, identifier)
                }
            }
        }
    }


    fun getProcessor(component: String, processorName: String, tokenStoreIdentifier: String): EventProcessor? {
        val stream = adminChannel.eventProcessorsByComponent(component)
        var next = stream.nextIfAvailable(10, TimeUnit.SECONDS)
        while (next != null) {
            if (next.identifier.processorName == processorName
                && next.identifier.tokenStoreIdentifier == tokenStoreIdentifier) {
                return next
            }
            next = stream.next()
        }
        return null
    }
}

 

Feedback

We are always looking for feedback on our products and possible enhancements on our products. Do you think that autoscaling event processor segments would be a good feature for Axon Server? Please reach out to us and let us know on the Discuss platform or Github.

 

Powerful API

The Admin API is very powerful and can be used to do many things. Feel free to explore and discover its API and design any solution you need. We are always curious as to what you are building, so please share it with us for inspiration.

 

Conclusion

The solution outlined in this blog will allow you to scale your event processor segments with the amount of instances currently running. This will in turn allow you to spread load over all available instances.

Mitchell Herrijgers
Solutions Architect for Axon Framework at AxonIQ Mitchell is a software craftsman, passionate coder, and eager to learn. He likes complex challenges and he doesn't get discouraged easily. He is interested in the following topics; Kotlin/Java, Event-Sourcing / CQRS / Axon Framework, Cloud/AWS/Infrastructure as Code, and performance tuning. After his study, in Computer Science at the Rotterdam University of Applied Sciences, he worked as a software engineer at ING Bank, Codecentric, and the Port of Rotterdam.
Mitchell Herrijgers

Share: