'Iterate rows and columns in Spark dataframe

I have the following Spark dataframe that is created dynamically:

val sf1 = StructField("name", StringType, nullable = true)
val sf2 = StructField("sector", StringType, nullable = true)
val sf3 = StructField("age", IntegerType, nullable = true)

val fields = List(sf1,sf2,sf3)
val schema = StructType(fields)

val row1 = Row("Andy","aaa",20)
val row2 = Row("Berta","bbb",30)
val row3 = Row("Joe","ccc",40)

val data = Seq(row1,row2,row3)

val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")

Now, I need to iterate each row and column in sqlDF to print each column, this is my attempt:

sqlDF.foreach { row =>
  row.foreach { col => println(col) }
}

row is type Row, but is not iterable that's why this code throws a compilation error in row.foreach. How to iterate each column in Row?



Solution 1:[1]

You can convert Row to Seq with toSeq. Once turned to Seq you can iterate over it as usual with foreach, map or whatever you need

    sqlDF.foreach { row => 
           row.toSeq.foreach{col => println(col) }
    }

Output:

Berta
bbb
30
Joe
Andy
aaa
20
ccc
40

Solution 2:[2]

You should use mkString on your Row:

sqlDF.foreach { row =>
  println(row.mkString(",")) 
}

But note that this will be printed inside the executors JVM's, so norally you won't see the output (unless you work with master = local)

Solution 3:[3]

sqlDF.foreach is not working for me but Approach 1 from @Sarath Avanavu answer works but it was also playing with the order of the records sometime.

I found one more way which is working

df.collect().foreach { row =>
   println(row.mkString(","))
}

Solution 4:[4]

This worked fine for me

sqlDF.collect().foreach(row => row.toSeq.foreach(col => println(col)))

Solution 5:[5]

You should iterate over the partitions which allows the data to be processed by Spark in parallel and you can do foreach on each row inside the partition.

You can further group the data in partition into batches if need be

sqlDF.foreachPartition { partitionedRows: Iterator[Model1] =>     
  if (partitionedRows.take(1).nonEmpty) {
       partitionedRows.grouped(numberOfRowsPerBatch).foreach { batch =>
        batch.foreach { row => 
        .....

Solution 6:[6]

simple collect result and then apply foreach

df.collect().foreach(println)

Solution 7:[7]

My solution using FOR because it was I need:

Solution 1:

case class campos_tablas(name:String, sector:String, age:Int)

for (row <- df.as[campos_tablas].take(df.count.toInt)) 
{ 
     print(row.name.toString)

}    

Solution 2:

for (row <- df.take(df.count.toInt))
{ 
   print(row(0).toString)
}

Solution 8:[8]

Let's assume resultDF is the Dataframe.

val resultDF = // DataFrame //
var itr = 0
val resultRow = resultDF.count
val resultSet = resultDF.collectAsList
var load_id = 0
var load_dt = ""
var load_hr = 0

while ( itr < resultRow ){
    col1 = resultSet.get(itr).getInt(0)
    col2 = resultSet.get(itr).getString(1) // if column is having String value
    col3 = resultSet.get(itr).getLong(2) // if column is having Long value

    // Write other logic for your code //

    itr = itr + 1
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 SCouto
Solution 2 Raphael Roth
Solution 3 Naresh Joshi
Solution 4 Pooja Bhat
Solution 5 skjagini
Solution 6 GANESH CHOKHARE
Solution 7 Andronicus
Solution 8