'Iterate rows and columns in Spark dataframe
I have the following Spark dataframe that is created dynamically:
val sf1 = StructField("name", StringType, nullable = true)
val sf2 = StructField("sector", StringType, nullable = true)
val sf3 = StructField("age", IntegerType, nullable = true)
val fields = List(sf1,sf2,sf3)
val schema = StructType(fields)
val row1 = Row("Andy","aaa",20)
val row2 = Row("Berta","bbb",30)
val row3 = Row("Joe","ccc",40)
val data = Seq(row1,row2,row3)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
Now, I need to iterate each row and column in sqlDF to print each column, this is my attempt:
sqlDF.foreach { row =>
row.foreach { col => println(col) }
}
row is type Row, but is not iterable that's why this code throws a compilation error in row.foreach. How to iterate each column in Row?
Solution 1:[1]
You can convert Row to Seq with toSeq. Once turned to Seq you can iterate over it as usual with foreach, map or whatever you need
sqlDF.foreach { row =>
row.toSeq.foreach{col => println(col) }
}
Output:
Berta
bbb
30
Joe
Andy
aaa
20
ccc
40
Solution 2:[2]
You should use mkString on your Row:
sqlDF.foreach { row =>
println(row.mkString(","))
}
But note that this will be printed inside the executors JVM's, so norally you won't see the output (unless you work with master = local)
Solution 3:[3]
sqlDF.foreach is not working for me but Approach 1 from @Sarath Avanavu answer works but it was also playing with the order of the records sometime.
I found one more way which is working
df.collect().foreach { row =>
println(row.mkString(","))
}
Solution 4:[4]
This worked fine for me
sqlDF.collect().foreach(row => row.toSeq.foreach(col => println(col)))
Solution 5:[5]
You should iterate over the partitions which allows the data to be processed by Spark in parallel and you can do foreach on each row inside the partition.
You can further group the data in partition into batches if need be
sqlDF.foreachPartition { partitionedRows: Iterator[Model1] =>
if (partitionedRows.take(1).nonEmpty) {
partitionedRows.grouped(numberOfRowsPerBatch).foreach { batch =>
batch.foreach { row =>
.....
Solution 6:[6]
simple collect result and then apply foreach
df.collect().foreach(println)
Solution 7:[7]
My solution using FOR because it was I need:
Solution 1:
case class campos_tablas(name:String, sector:String, age:Int)
for (row <- df.as[campos_tablas].take(df.count.toInt))
{
print(row.name.toString)
}
Solution 2:
for (row <- df.take(df.count.toInt))
{
print(row(0).toString)
}
Solution 8:[8]
Let's assume resultDF is the Dataframe.
val resultDF = // DataFrame //
var itr = 0
val resultRow = resultDF.count
val resultSet = resultDF.collectAsList
var load_id = 0
var load_dt = ""
var load_hr = 0
while ( itr < resultRow ){
col1 = resultSet.get(itr).getInt(0)
col2 = resultSet.get(itr).getString(1) // if column is having String value
col3 = resultSet.get(itr).getLong(2) // if column is having Long value
// Write other logic for your code //
itr = itr + 1
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | SCouto |
| Solution 2 | Raphael Roth |
| Solution 3 | Naresh Joshi |
| Solution 4 | Pooja Bhat |
| Solution 5 | skjagini |
| Solution 6 | GANESH CHOKHARE |
| Solution 7 | Andronicus |
| Solution 8 |
