'How can I scale kotlin coroutines workers?

I'm trying to lean about Kotlin coroutines and use them to execute async tasks, currently I have a spring web application where I need to read some files content and insert into a dynamoDB.

To speed things up, I tried to read and buffer 25 lines read into a list and to a batchWrite into dynamo, also this batchWrite I'm doing in an async function like this:

val fileData = file.readAllBytes()
val batchSaveList = mutableListOf<ClientDTO>()

fileData.inputStream().bufferedReader()
    .forEach {
        batchSaveList.add(it)
            if (batchSaveList.size == 25) {
                saveClientListAsync(clientRepository, batchSaveList.toList())
                batchSaveList.clear()
        }
    }

private fun saveClientListAsync(
        clientRepository: ClientRepository,
        batchSaveList: MutableList<ClientDTO>
    ) {
        GlobalScope.async {
            launch {
                clientRepository.saveList(hardFilterListBatchSave)
                // The saveList above run a simple dynamoDBMapper.batchSave(hardFilterList)
            }
        }
    }

The file I'm reading has ~500_000 lines, grouping them into lists of 25 items to do the batchWrite means about ~20_000 calls to dynamoDB.

The current behavior is that my code can read all the 500k lines really fast, about a couple minutes, but the async's dynamo saves are really slow, not the dynamo call it self, as I could see it's taking like 15ms each call with the dynamoDBMapper.batchSave(), but the application seems to use only 2 workers to process this "async list" which takes about 20 minutes to execute them all.

Is there a way to improve this flow? Maybe scale up the workers somehow? Or there is a better way to do that since looks like dynamoDB calls are not the problem.



Solution 1:[1]

I don't use Spring so I can comment only on the parts I recognize.

  1. You're doing unnecessary list copies. That can be avoided by iterating a sequence of the lines and using chunked().

  2. You're creating nested coroutines for no reason by calling async and then calling launch inside it. There's no reason to use aysnc if you aren't doing anything with the returned result.

Fixing the above issues avoids some unnecessary work, but I don't know if it will be significant in your case.

I don't know if your repo's saveList is a blocking write function. If it is, then you should specify Dispatchers.IO when you launch the coroutine.

You might also try creating a FileInputStream and reading from that instead of reading all the data up front.

And you might also try batching more than 25 lines at a time--25 seems trivial to me. Try 1000.

file.inputStream().bufferedReader()
    .useLines { lineSequence ->
        lineSequence.chunked(1000)
            .forEach { saveClientListAsync(clientRepository, it) }
    }

private fun saveClientListAsync(
        clientRepository: ClientRepository,
        batchSaveList: MutableList<ClientDTO>
    ) {
        GlobalScope.launch(Dispatchers.IO) {
            clientRepository.saveList(hardFilterListBatchSave)
            // The saveList above run a simple dynamoDBMapper.batchSave(hardFilterList)
        }
    }

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1