'Will Spark Executor kill Lettuce thread when I'm writing a dataframe into Redis
I'm using lettuce to insert a big spark dataframe into Redis. Will the executor kills itself before Lettuce client finish inserting to Redis? The following is the code I'm using to insert data into Redis.
First, I create a RedisConnectionProvider with a connection pool.
class RedisConnectionProvider {
var pool: GenericObjectPool[StatefulRedisClusterConnection[String, String]] = _
def getPool(url: String): GenericObjectPool[StatefulRedisClusterConnection[String, String]] = {
if (pool == null) {
val clusterClient: RedisClusterClient = RedisClusterClient.create(RedisURI.create(url, 6379))
clusterClient.getPartitions
val supplier: Supplier[StatefulRedisClusterConnection[String, String]] = (() => clusterClient.connect()).asJava
val config: GenericObjectPoolConfig[StatefulRedisClusterConnection[String, String]] = new GenericObjectPoolConfig[StatefulRedisClusterConnection[String, String]]
config.setMaxTotal(30)
pool = ConnectionPoolSupport.createGenericObjectPool(supplier, config)
}
pool
}
}
object RedisConnectionProvider {
val instance = new RedisConnectionProvider()
}
Then I get the pool within each partition of my rdd and insert data with the connection I got from the pool. The RedisConnectionProvider is a scala object, so I guess each Spark compute node will only create one pool object.
val rdd = spark.sparkContext.parallelize(Seq("a", "b", "c"))
rdd.foreachPartition(partitionRecords => {
val pool = RedisConnectionProvider.instance.getPool("127.0.0.1")
val connection = pool.borrowObject
val commands = connection.reactive()
commands.setAutoFlushCommands(false)
partitionRecords.grouped(500).foreach((group: Seq[Any]) => {
Flux.fromIterable(group.map(s => s.toString))
.flatMap(((s: String) => {commands.hset(s, value)}).asJava)
.subscribe()
}
}
These pieces of code work most of the time. But I'm not sure if it's robust. Anyone could pls help?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
