'Create a case class with different parameters with different datatypes for scala/spark code

I have a use case where I am using topbykey(k). Below is the following code

I have a case class called ranking as below


import org.apache.spark.sql.{Encoder, Encoders}

case class ranking(Names: Any*)

case object ranking {
  def encoder: Encoder[ranking] = Encoders.product
}

Dataset<ranking> rankingDF = dfpolicy.map(
                (MapFunction<Row, ranking>)
                        row -> {
                            return new ranking(
                                    scala.collection.JavaConversions.asScalaBuffer(List.of(
                                            row.getAs("id"),
                                            row.getAs("rankingid"),
                                            row.getAs("name")
                                    )).toSeq()
                                   );
                        },
                Encoders.bean(ranking.class));

topbykey code looks like below

 val temp =  ranking.rdd
      .keyBy(_.Names(0))
      .topByKey(k)(orderByScoreThenId)

    temp.toDS() (return type is Dataset[(Any, Array[ranking])])

I get the following error as below

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for Any
- field (class: "java.lang.Object", name: "_1")
- root class: "scala.Tuple2"
    at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotFindEncoderForTypeError(QueryExecutionErrors.scala:1000)
    at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:617)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:947)
    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:946)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
    at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:448)
    at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$6(ScalaReflection.scala:604)
    at scala.collection.immutable.List.map(List.scala:293)
    at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:589)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:947)
    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:946)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
    at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:448)
    at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerForType$1(ScalaReflection.scala:437)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:947)
    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:946)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
    at org.apache.spark.sql.catalyst.ScalaReflection$.serializerForType(ScalaReflection.scala:429)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:55)
    at org.apache.spark.sql.Encoders$.product(Encoders.scala:300)
    at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder(SQLImplicits.scala:261)
    at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder$(SQLImplicits.scala:261)

How do I resolve this? or is there was way to create case class with dynamic parameter and types so that I don't run into above issue?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source