'Update only fields that are not empty in Cassandra Sink
I'm trying to receive messages from Kafka to update a Cassandra Database using Flink
Messages are like
case class Message(userId: String, info: Info)
case class Info(property1: Option[Int], property2: Option[Int])
I'm using json4s to parse the Kafka message and extracting it to a DataStream[Message]
val kafkaSource: KafkaSource[String] = KafkaSource.builder()
.setBootstrapServers("localhost:29092")
.setTopics("my-topic")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
Now, I just want to update the fields that are not None, like
{
"user_id": "abc-123",
"info":
{
"property1": 1
}
}
Will generate a case class like:
Message(abc-123, Info(Some(1), None))
How can I make a CassandraSink to be able to update just this property for user abc-123?
I was trying to use something like:
CassandraSink
.addSink(dataStream)
.setClusterBuilder(new ClusterBuilder() {
override def buildCluster(builder: Cluster.Builder): Cluster = {
builder
.addContactPoint("127.0.0.1")
.withPort(29042)
.withCredentials("cassandra", "cassandra")
.build()
}
})
.setQuery("UPDATE user_info SET property1 = ?, property2 = ? WHERE id = ?")
.build()
and trying to manipulate the query outside CassandraSink builder, but it wasn't possible.
Is there any way to just update fields who aren't None?
Solution 1:[1]
May be you can implement RichMapFunction, and update fields who aren't None in the map interface
public class MyMapFunction extends RichMapFunction<T> {
@Override
public void open(Configuration parameters) throws Exception {
}
@Override
public T map(...) throws Exception {
return xxx;
}
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | ChangLi |
