'Spark fails to parse field from elasticsearch

I`m trying to read data from ElasticSearch to Spark using Scala. The problematic field in elasticsearch is defined as object. When I tried without schema got an empty object, with schema getting an exception.

Object in ES looks like

{
 "field1" : "value1",
 "field2" : "value2",
 "field3" : "value",
 "field4" : true,
 "field5" : true,
 "field6" : true,
 "field7" : "value7",
 "field8" : "value8",
 "field9" : "value9",
 "field10" : "value10"
},
...
,
{
 "field1" : "value1",
 "field2" : "value2",
 "field3" : "value",
 "field4" : true,
 "field5" : true,
 "field6" : true,
 "field7" : "value7",
 "field8" : "value8",
 "field9" : "value9",
 "field10" : "value10"
}

I tried a lot of different options. For example:

 case class Element(
                           field1: String,
                           field2: String,
                           field3: String,
                           field4: Boolean,
                           field5: Boolean,
                           field6: Boolean,
                           field7: String,
                           field8: String,
                           field9: String,
                           field10: String
                         )

  case class Elements(innerElement: Array[Element])
  val elementsSchema = ScalaReflection.schemaFor[Elements].dataType.asInstanceOf[StructType]
  val customSchema = StructType(Array(
    StructField("docField1", StringType, true),
    StructField("docField2", StringType, true),
    ...
    StructField("docField20", elementsSchema, true),
    StructField("docField21", StringType, true)
  ))

Exception:

Caused by: java.lang.RuntimeException: scala.collection.convert.Wrappers$JListWrapper is not a valid external type for schema of struct<Element:array<struct<field1:string,field2:string,field3:string,field4:boolean,field5:boolean,field6:boolean,field7:string,field8:string,field9:string,field10:string>>>

Tried with .option("es.read.field.as.array.include", "elements")

Tried to use different API spark.read.format("org.elasticsearch.spark.sql") and sc.esRDD

I would like some advice, thanks.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source