'How to form sparkstreaming startingOffsets dynamically in scala
I have a dataframe like below. This is a dynamic dataframe and will grow as more Topic fields are getting added.
val ds = Seq(("T1",0,44),
("T1",1,54),
("T1",2,13),
("T2",0,13),
("T2",1,9),
("T2",2,7)).toDF("Topic","PartitionId","OffsetId")
I want to format the dataframe above into a json string like the folowing format. Based on more entries the following json should grow.
{"T1":{"0":44,"1":54,2:"13"},"T2":{"0":13,"1":9,"2":7}}
I tried to use StringBuilder and append the values by converting the dataframe to a collection of json string with the code below.
def formatEvents(array: JsonArray):String= {
val iterable = array.iterator()
val s = new StringBuilder
s.append("{")
var processedTopicMap = collection.mutable.Map[String, Int]()
while (iterable.hasNext) {
val next = iterable.next()
val jsonObject = next.getAsJsonObject
val topic = jsonObject.get("Topic").toString
val partition = jsonObject.get("PartitionId").getAsInt
val offset = jsonObject.get("OffsetId").toString
println(s"topic = $topic , partition = $partition, offset = $offset")
if(!processedTopicMap.keySet.contains(topic)) {
s.append(s"$topic:{")
processedTopicMap.put(topic,partition);
}
s.append(s"""\"$partition\":\"$offset\",""")
}
s.setLength(s.length() - 1);
s.append("}")
s.append("}")
s.toString()
}
However it does not work as expected when the second topic is there.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
