'Scala GraphX NullPointerException when accessing Variables within .joinVertices

I'm implementing Node2Vec in Spark, GraphX. There, I try to access two double valued parameters p and q within a function transformVertexData

  def transformVertexData(vertexId: VertexId, nodeAttr: NodeAttrMultiPregel, inMsg: Array[(Long, Msg)]): NodeAttrMultiPregel = {
    //val bcP = context.broadcast(config.p)
    //val bcQ = context.broadcast(config.q)

    //val bcP = this.p.value
    //val bcQ = this.q.value

    val pq_array = pq.collect()
    val bcP = pq_array(0)
    val bcQ = pq_array(1)
    ...

which is giving me

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 22) (10.28.2.44 executor 0): java.lang.NullPointerException

pq is var pq: RDD[Double] = null and initialized separately

def setup(context: SparkContext, param: Main.Params): this.type = {
    this.config = param
    this.pq = context.parallelize(Array(config.p, config.q))
    ...

The function transformVertexData is called by

var g = graphMultiPregel.mapVertices( (vid, vdata) => transformVertexData(vid, vdata, initialMsg) ).cache()

which should simply merge the messages with each node if it has received something.

If I run that code locally, everything works fine. On a cluster, I get the NullPointerException. As you can see at the beginning, I tried broadcasting, just referencing the variables locally, and now collecting but none of them worked. I assume, the function transformVertexData is shipped to the local Partition where it acts on, but I don't have a clear picture in my mind, nor a solution. I would appreciate your help.

J.Bug.

EDIT:

I call the setup function with defaultParams, which is a object containing parameters.

      PregelMN2V.setup(context, defaultParams, defaultParams.p, defaultParams.q)

I noticed that using the transformVertexData the following way

  def transformVertexData(vertexId: VertexId, nodeAttr: NodeAttrMultiPregel, inMsg: Array[(Long, Msg)]): NodeAttrMultiPregel = {
    val bcP = this.config.p
    val bcQ = this.config.q

is crashing on the cluster where this.config is defined as

var config: Main.Params = null

May it be that the code cannot reference? / ship? the object to the corresponding partitions?

PS: I used this.config in all of these snippets as a basis structure to store p and q.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source