'How to form sparkstreaming startingOffsets dynamically in scala

I have a dataframe like below. This is a dynamic dataframe and will grow as more Topic fields are getting added.

val ds = Seq(("T1",0,44),
    ("T1",1,54),
    ("T1",2,13),
    ("T2",0,13),
    ("T2",1,9),
    ("T2",2,7)).toDF("Topic","PartitionId","OffsetId")

I want to format the dataframe above into a json string like the folowing format. Based on more entries the following json should grow.

{"T1":{"0":44,"1":54,2:"13"},"T2":{"0":13,"1":9,"2":7}}

I tried to use StringBuilder and append the values by converting the dataframe to a collection of json string with the code below.

def formatEvents(array: JsonArray):String= {
    val iterable = array.iterator()
    val s = new StringBuilder
    s.append("{")
    var processedTopicMap = collection.mutable.Map[String, Int]()
    while (iterable.hasNext) {
      val next = iterable.next()
      val jsonObject = next.getAsJsonObject
      val topic = jsonObject.get("Topic").toString
      val partition = jsonObject.get("PartitionId").getAsInt
      val offset = jsonObject.get("OffsetId").toString
      println(s"topic = $topic , partition = $partition, offset = $offset")
      if(!processedTopicMap.keySet.contains(topic)) {
        s.append(s"$topic:{")
        processedTopicMap.put(topic,partition);
      }
      s.append(s"""\"$partition\":\"$offset\",""")
    }
    s.setLength(s.length() - 1);
    s.append("}")
    s.append("}")
    s.toString()
  } 

However it does not work as expected when the second topic is there.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source