'Flattening Nested JSON with Scala with Period "." in key name

I feel like I've met a catch 22. I'm trying to flatten REST API output with keys like "odata.id", "odata.type". The flattening code I've used before isn't working because of the "." in the key name. I can't rename columns without first flattening, but I can't seem to flatten as the fieldname calls are getting thrown off.

The JSON looks like this.

{
    "odata.metadata": "https://tenant.sharepoint.com/sites/siteName/_api/$metadata#SP.ApiData.Files12",
    "value": [
        {
            "odata.type": "SP.File",
            "odata.id": "https://tenant.sharepoint.com/sites/siteName/_api/Web/GetFileByServerRelativePath(decodedurl='/sites/relativePath.xlsx')",
            "odata.editLink": "Web/GetFileByServerRelativePath(decodedurl='/sites/fullPath.xlsx')",
            "CheckInComment": "",
            "CheckOutType": 2,
            "ContentTag": "{tag},6,7",
            "CustomizedPageStatus": 0,
            "ETag": "\"{tag},6\"",
            "Exists": true,
            "IrmEnabled": false,
            "Length": "28999",
            "Level": 1,
            "LinkingUri": "https://tenant.sharepoint.com/sites/filePath?d=id",
            "LinkingUrl": "https://tenant.sharepoint.com/sites/filePath?d=id",
            "MajorVersion": 6,
            "MinorVersion": 0,
            "Name": "fileName.xlsx",
            "ServerRelativeUrl": "/sites/relativePath.xlsx",
            "TimeCreated": "2022-03-23T11:30:28Z",
            "TimeLastModified": "2022-03-23T12:11:34Z",
            "Title": "",
            "UIVersion": 3072,
            "UIVersionLabel": "6.0",
            "UniqueId": "id"
        }
    ]
}

I'm using the following code that I've inherited. (I also tried with code from this Flatten nested json in Scala Spark Dataframe, but I get the same error.)

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, current_timestamp, lit, to_timestamp}
import org.apache.spark.sql.types._
def flattenDataframe(
                        df: DataFrame, 
                        keepParentFieldName: Boolean = false, 
                        excludeArrayFlattenCols: String = ""
                      ): DataFrame = {

    val fields = df.schema.fields
    val fieldNames = fields.map(x => x.name)
    val length = fields.length
    val dontFlattenList: List[String] = excludeArrayFlattenCols.split(";").map(_.trim).toList

    for(i <- 0 to fields.length-1){
      val field = fields(i)
      val fieldtype = field.dataType
      val fieldName = field.name
      fieldtype match {
        case arrayType: ArrayType =>
          println("ArrayType")
          if(! dontFlattenList.contains(fieldName)) {
            val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName)
            val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName) as $fieldName")
            println("array => " + fieldNamesAndExplode)//.mkString("Array(", ", ", ")"))
            val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
            return flattenDataframe(explodedDf,keepParentFieldName, excludeArrayFlattenCols)
          }
        case structType: StructType =>
          println("StructType")
          val childFieldnames = structType.fieldNames.map(childname => fieldName +"."+childname)
          val newfieldNames = fieldNames.filter(_!= fieldName) ++ childFieldnames
          val renamedcols = newfieldNames.map(
            x => (col(x.toString)).as(x.toString.replace(".", "_")))
          println("struct => "+renamedcols)//.mkString("Array(", ", ", ")"))
          val explodedf = df.select(renamedcols:_*)
          return flattenDataframe(explodedf,keepParentFieldName, excludeArrayFlattenCols)
        case _ =>
      }
    }
    var cols:Array[org.apache.spark.sql.Column] = null

    if(keepParentFieldName){
      cols = fieldNames.map(c => col(c.toString).as(c.toString))
    }
    else{
      cols = fieldNames.map(c => col(c.toString).as(c.toString.substring(c.toString.indexOf("_")+1)))
    }
    df.select(cols:_*)
  }

I get an error saying that the column odata doesn't exist. It's not reading the full fieldname with the ".". I'm wondering if at some step, I need to add a replace(".","") to the column name, but I'm not sure at which point to do this.

AnalysisException: No such struct field odata in CheckInComment, CheckOutType, ContentTag, CustomizedPageStatus, ETag, Exists, IrmEnabled, Length, Level, LinkingUri, LinkingUrl, MajorVersion, MinorVersion, Name, ServerRelativeUrl, TimeCreated, TimeLastModified, Title, UIVersion, UIVersionLabel, UniqueId, odata.editLink, odata.id, odata.type



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source