'Making HTTP post requests on Spark usign foreachPartition

Need some help to understand the behaviour of the below in Spark (using Scala and Databricks)

I have some dataframe (reading from S3 if that matters), and would send that data by making HTTP post requests in batches of 1000 (at most). So I repartitioned the dataframe to make sure each partition has no more than 1000 records. Also, created a json column for each line (so I need only to put them in an array later on)

The trouble is on the making the requests. I created the following a Serializable class using the following code

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.http.client.methods.HttpPost
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.HttpHeaders
import org.apache.http.entity.StringEntity
import org.apache.commons.io.IOUtils

object postObject extends Serializable{
  val client = HttpClientBuilder.create().build()
  val post = new HttpPost("https://my-cool-api-endpoint")
  post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
  def makeHttpCall(row: Iterator[Row]) = {
      val json_str = """{"people": [""" + row.toSeq.map(x => x.getAs[String]("json")).mkString(",") + "]}"      
      post.setEntity(new StringEntity(json_str))
      val response = client.execute(post)
      val entity = response.getEntity()
      println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))
      println(IOUtils.toString(entity.getContent()))
  }
}

Now when I try the following:

postObject.makeHttpCall(data.head(2).toIterator)

It works like a charm. The requests go through, there is some output on the screen, and my API gets that data.

But when I try to put it in the foreachPartition:

data.foreachPartition { x => 
  postObject.makeHttpCall(x)
}

Nothing happens. No output on screen, nothing arrives in my API. If I try to rerun it, almost all stages just skips. I believe, for any reason, it is just lazy evaluating my requests, but not actually performing it. I don't understand why, and how to force it.



Solution 1:[1]

postObject has 2 fields: client and post which has to be serialized.

I'm not sure that client is serialized properly. post object is potentially mutated from several partitions (on the same worker). So many things could go wrong here.

I propose tryng removing postObject and inlining its body into foreachPartition directly.

Addition:

Tried to run it myself:

sc.parallelize((1 to 10).toList).foreachPartition(row => {
        val client = HttpClientBuilder.create().build()
        val post = new HttpPost("https://google.com")
        post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
        val json_str = """{"people": [""" + row.toSeq.map(x => x.toString).mkString(",") + "]}"
        post.setEntity(new StringEntity(json_str))
        val response = client.execute(post)
        val entity = response.getEntity()
        println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))
        println(IOUtils.toString(entity.getContent()))
      })

Ran it both locally and in cluster. It completes successfully and prints 405 errors to worker logs. So requests definitely hit the server.

foreachPartition returns nothing as the result. To debug your issue you can change it to mapPartitions:

val responseCodes = sc.parallelize((1 to 10).toList).mapPartitions(row => {
        val client = HttpClientBuilder.create().build()
        val post = new HttpPost("https://google.com")
        post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
        val json_str = """{"people": [""" + row.toSeq.map(x => x.toString).mkString(",") + "]}"
        post.setEntity(new StringEntity(json_str))
        val response = client.execute(post)
        val entity = response.getEntity()
        println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))
        println(IOUtils.toString(entity.getContent()))
        Iterator.single(response.getStatusLine.getStatusCode)
      }).collect()

println(responseCodes.mkString(", "))

This code returns the list of response codes so you can analyze it. For me it prints 405, 405 as expected.

Solution 2:[2]

There is a way to do this without having to find out what exactly is not serializable. If you want to keep the structure of your code, you can make all fields @transient lazy val. Also, any call with side effects should be wrapped in a block. For example

val post = {
  val httpPost = new HttpPost("https://my-cool-api-endpoint")
  httpPost.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
  httpPost
}

That will delay the initialization of all fields until they are used by the workers. Each worker will have an instance of the object and you will be able to make invoke the makeHttpCall method.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Achilles