'Create a case class with different parameters with different datatypes for scala/spark code
I have a use case where I am using topbykey(k). Below is the following code
I have a case class called ranking as below
import org.apache.spark.sql.{Encoder, Encoders}
case class ranking(Names: Any*)
case object ranking {
def encoder: Encoder[ranking] = Encoders.product
}
Dataset<ranking> rankingDF = dfpolicy.map(
(MapFunction<Row, ranking>)
row -> {
return new ranking(
scala.collection.JavaConversions.asScalaBuffer(List.of(
row.getAs("id"),
row.getAs("rankingid"),
row.getAs("name")
)).toSeq()
);
},
Encoders.bean(ranking.class));
topbykey code looks like below
val temp = ranking.rdd
.keyBy(_.Names(0))
.topByKey(k)(orderByScoreThenId)
temp.toDS() (return type is Dataset[(Any, Array[ranking])])
I get the following error as below
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for Any
- field (class: "java.lang.Object", name: "_1")
- root class: "scala.Tuple2"
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotFindEncoderForTypeError(QueryExecutionErrors.scala:1000)
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:617)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:947)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:946)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:448)
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$6(ScalaReflection.scala:604)
at scala.collection.immutable.List.map(List.scala:293)
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:589)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:947)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:946)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:448)
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerForType$1(ScalaReflection.scala:437)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:947)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:946)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
at org.apache.spark.sql.catalyst.ScalaReflection$.serializerForType(ScalaReflection.scala:429)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:55)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:300)
at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder(SQLImplicits.scala:261)
at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder$(SQLImplicits.scala:261)
How do I resolve this? or is there was way to create case class with dynamic parameter and types so that I don't run into above issue?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
