'Update only fields that are not empty in Cassandra Sink

I'm trying to receive messages from Kafka to update a Cassandra Database using Flink

Messages are like

case class Message(userId: String, info: Info)
case class Info(property1: Option[Int], property2: Option[Int])

I'm using json4s to parse the Kafka message and extracting it to a DataStream[Message]

val kafkaSource: KafkaSource[String] = KafkaSource.builder()
      .setBootstrapServers("localhost:29092")
      .setTopics("my-topic")
      .setStartingOffsets(OffsetsInitializer.earliest())
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .build()

Now, I just want to update the fields that are not None, like

{
  "user_id": "abc-123",
  "info": 
    {
      "property1": 1
    }
}

Will generate a case class like:

Message(abc-123, Info(Some(1), None))

How can I make a CassandraSink to be able to update just this property for user abc-123?

I was trying to use something like:

CassandraSink
      .addSink(dataStream)
      .setClusterBuilder(new ClusterBuilder() {
        override def buildCluster(builder: Cluster.Builder): Cluster = {
          builder
            .addContactPoint("127.0.0.1")
            .withPort(29042)
            .withCredentials("cassandra", "cassandra")
            .build()
        }
      })
      .setQuery("UPDATE user_info SET property1 = ?, property2 = ? WHERE id = ?")
      .build()

and trying to manipulate the query outside CassandraSink builder, but it wasn't possible.

Is there any way to just update fields who aren't None?



Solution 1:[1]

May be you can implement RichMapFunction, and update fields who aren't None in the map interface

public class MyMapFunction extends RichMapFunction<T> {
    @Override
    public void open(Configuration parameters) throws Exception {
    }

    @Override
    public T map(...) throws Exception {
        return xxx;
    }
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 ChangLi