'Selecting nested fields from DataFrame that are not part of a certain schema

Let's say I have a DataFrame with this schema:

{
  a: IntegerType,
  b: { // nested struct
    c: StringType,
    d: IntegerType
  }
}

and I want to omit d with this other schema:

{
  a: IntegerType,
  b: { // nested struct
    c: StringType
  }
}

Is there a way to programmatically create a new DataFrame from the original that only has the columns in the second schema? If I do something like this:

val finalDf = df.select([col for col in otherSchema])

then it would still obtain d since b exists in both schemas and this only knows how to operate at the top-level column.

And similarly if I had a really complex schema where only the deepest level was changed, how would I do that? For example, original DataFrame has a schema like:

Array
  (Struct
    (Array
      (Struct
        (Array
          (Struct
            (a: INT, b: String)
          )
        )
      )
    )
  )

and I want to select only:

Array
  (Struct
    (Array
      (Struct
        (Array
          (Struct
            (a: INT)
          )
        )
      )
    )
  )


Solution 1:[1]

Here's a sketch of a transform that allows you to do something like that. The idea is to select data from an input struct into an output struct according to the new target schema.

  def transformSchema[T](target: StructType)(ds: Dataset[T]): DataFrame = {
    import org.apache.spark.sql.functions.{col, struct, transform}

    def toType(column: Column, currentDt: DataType, targetDt: DataType): Column = {
      (currentDt, targetDt) match {
        case (_, `currentDt`) => column
        case (StructType(currentFields), StructType(targetFields)) =>
          val columns = targetFields.toSeq.flatMap {
            case StructField(name, target, _, _) =>
              currentFields.collectFirst {
                case StructField(`name`, current, _, _) =>
                  toType(column.getField(name), current, target).as(name)
              }
          }
          struct(columns: _*)
        case (ArrayType(current, _), ArrayType(target, _)) =>
          transform(column, toType(_, current, target))

        // Note: Cases to handle incompatible target schemas are missing here
        case _ =>
          column
      }
    }

    ds
      .select(toType(struct(ds.columns.map(col): _*), ds.schema, target).as("col"))
      .select(target.fields.map(f => col("col").getField(f.name).as(f.name)): _*)
  }


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Moritz