Kotlin - Spring MVC queueing requests with channel
Why?
One common thing I have on my projects is when the application scales up some kind of async processing needs to be done.
The steps for this are usually:
- Check the request for proper Authority and Roles
 - Request is Validated
 - The request is put into some queue (JMS, RabbitMQ, Pubsub) and an uuid for this request is given to the client
 - Some other background process (or even application) reads from the queue and process it
 - The response is sent to a response queue
 - The client fetches the response using the uuid
 
I always hated is setting up the queueing system since it requires a lot of operation investment. Setting up a dedicated server, choosing the queueing system, installing it, etc.
In some workplaces/companies setting up this system could take MONTHS, even YEARS.
I decided to work on this simple POC just to see if it is possible to implement some dumb queue using channels.
There are many downsides of this:
- everything is in memory, if the system goes down you loose all the non processed messages!
 - no retry
 - no error mitigation
 
How?
I will use: Kotlin + Channels + Spring MVC
Create a Spring Component to hold the channel
@Component
class ChannelComponent {
    val channelMessage = Channel<String>(5)
    @PreDestroy
    fun preDestroy(){
        channelMessage.close()
        println("channel closed")
    }
}
Create a Spring Component to read from the channel
To emulate a slow and complex process I will add a Thread.sleep.
@Component
class ChannelListenerBean(val channelComponent: ChannelComponent): InitializingBean {
    override fun afterPropertiesSet() {
        GlobalScope.launch{
            for(message in channelComponent.channelMessage){
                // THIS IS SLOOOOOW, that is why we need an async process!
                Thread.sleep(5000)
                println("The message is $message")
            }
        }
    }
}
Create the Controller to receive messages
Yea, not really REST but this is good enough for testing.
@RestController
class TestController(val channelComponent: ChannelComponent) {
    @GetMapping("/async")
    fun ShouldProcessAsync(@RequestParam message: String): String {
        runBlocking {
            channelComponent.channelMessage.send("Sending Message!")
            println("Message sent!")
        }
        return "Enqueued!"
    }
}
Test it using curl… or postman… or the browser
curl http://localhost:8080/async?message=I_AM_AWESOME
You should see a couple of messages on the console stating “Message sent!”, and after 5 seconds “The message is I_AM_AWESOME”
If you trigger the curl more than 5 times, you should see that the request is stuck for a while - this indicates that the “Queue” is full.
Tuning up by setting more readers / workers in parallel
First create a class for workers and give it a name for tracing:
class ChannelWorkerReader(val channel: Channel<String>, val workerName:String){
    suspend fun doIt(){
        for (message in channel) {
            // THIS IS SLOOOOOW, that is why we need an async process!
            Thread.sleep(5000)
            println("${workerName}: The message is $message")
        }
    }
}
Second spawn one coroutine/thread per defined worker:
@Component
class ChannelListenerWorkersBean(val channelComponent: ChannelComponent,
                          @Value("\${channel.workers:3}") val numberOfWorkers:Int): InitializingBean {
    override fun afterPropertiesSet() {
        val workerPool = Executors.newFixedThreadPool(numberOfWorkers).asCoroutineDispatcher()
        for(i in 1..numberOfWorkers) {
            val workerName = "worker $i"
            println("Starting worker $workerName")
            val worker = ChannelWorkerReader(channelComponent.channelMessage, workerName)
            GlobalScope.launch(workerPool) {
                worker.doIt()
            }
        }
    }
}
Testing…. again
If we use curl really fast (not really really fast, just 8+ times before 5 seconds), you should see that on the 9th request, it will take a while to get the response.
This happens because the first 3 requests are read by workers thus freeing up the channel, then the next 5 requests fill up the channel which locks it on the channelComponent.channelMessage.send method on the controller.
Summary
Yep, it works - has many downsides, is not production ready but can be done.
Ideas to improve
There are many ways of improving this, but from the top of my head:
- Add some persistency so we don’t loose messages in case of malfunction
 - Tweak queue (channel) size
 - Expose the queue in some other faster protocol
 - Add a way of proper data streaming
 
Source code: Here