'How to use datastax driver 4 for performant concurrent cassandra database queries in kotlin?

I have a kotlin application which serves data via a RESTFul api. That data is stored in a cassandra database. To fulfill a request, the application needs to perform n queries to Cassandra. I want the API to respond quickly so I would like those n queries to execute in parallel. I also want to be able to handle multiple concurrent users without performance degrading.

Libraries:

implementation("com.datastax.oss:java-driver-core:4.13.0")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:1.4.3")

In Datastax 3, I have code which uses the synchronous method execute. I am wrapping this in a coroutine dispatcher and awaiting all requests. Here is a sample code which queries the same row n times in a loop,

     val numbers: List<Int> = (1..NUMBER_OF_QUERIES).toList()
     val query = "SELECT JSON * FROM keyspace.table WHERE partition_key=X AND clustering_key=Y"
        val (result, elapsed1) = measureTimedValue {
            numbers.map { num: Int ->

                CoroutineScope(Dispatchers.IO).async {

                    session.execute(q).all().map{it ->
                        toJson(it.getString(0).toString())
                        )
                    }
                }
            }.awaitAll()
        }

Datastax 3 offers executeAsync using guava's ListenableFuture, but I couldn't get that to work within a coroutine even with https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-guava/index.html

For Datastax 4, I am trying to use the asynchronous API to achieve a similar result. My hope is the asynchronous API can perform better using fewer threads as it is non-blocking. However when I run a similar test case, I observe that the above code runs slower than the sync api from V3. In addition, the code does not perform well as more concurrent users are added.

     val numbers: List<Int> = (1..NUMBER_OF_QUERIES).toList()
     val query = "SELECT JSON * FROM keyspace.table WHERE partition_key=X AND clustering_key=Y"
        val (result, elapsed1) = measureTimedValue {
           numbers.map { num: Int ->

                CoroutineScope(Dispatchers.IO).async {
                    session.executeAsync(q).asDeferred()
                }
            }.awaitAll().awaitAll().map{rs -> toJson(rs).await()}
        }

Is there a better way to handle parallel executions of tasks returning CompletionStage<T> in kotlin?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source