'Spark Structured Streaming: StructField(..., ..., False) always returns `nullable=true` instead of `nullable=false`

I'm using Spark Structured Streaming (3.2.1) with Kafka.

I'm trying to simply read JSON from Kafka using a defined schema.

My problem is that in defined schema I got non-nullable field that is ignored when I read messages from Kafka. I use the from_json functions that seems to ignore that some fields can't be null.

Here is my code example:

val schemaTest = new StructType()
  .add("firstName", StringType)
  .add("lastName", StringType)
  .add("birthDate", LongType, nullable = false)

val loader =
    spark
    .readStream
    .format("kafka")
    .option("startingOffsets", "earliest")
    .option("kafka.bootstrap.servers", "BROKER:PORT")
    .option("subscribe", "TOPIC")
    .load()

val df = loader.
     selectExpr("CAST(value AS STRING)")
    .withColumn("value", from_json(col("value"), schemaTest))
    .select(col("value.*"))

df.printSchema()

val q = df.writeStream
  .format("console")
  .option("truncate","false")
  .start()
  .awaitTermination()

I got this when I'm printing the schema of df which is different of my schemaTest:

root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- birthDate: long (nullable = true)

And received data are like that:

+---------+--------+----------+
|firstName|lastName|birthDate |
+---------+--------+----------+
|Toto     |Titi    |1643799912|
+---------+--------+----------+
|Tutu     |Tata    |null      |
+---------+--------+----------+

We also try to add option to change mode in from_json function from default one PERMISSIVE to others (DROPMALFORMED, FAILFAST) but in fact the second record that doesn't respect the defined schema is simply not considered as corrupted because the field birthDate is nullable..

Maybe I missed something but if it's not the case, I got following questions.

Do you know why the printSchema of df is not like my schemaTest ? (With non nullable field)

And also, how can I manage non-nullable value in my case ? I know that I can filter but I would like to know if there is an alternative using schema like it's supposed to work. And also, It's not quite simple to filter if I got a schema with lots of fields non-nullable.



Solution 1:[1]

This is actually the intended behavior of from_json function. You can read the following from the source code:

// The JSON input data might be missing certain fields. We force the nullability
// of the user-provided schema to avoid data corruptions. In particular, the parquet-mr encoder
// can generate incorrect files if values are missing in columns declared as non-nullable.
val nullableSchema = schema.asNullable

override def nullable: Boolean = true

If you have multiple fields which are mandatory then you can construct the filter expression from your schemaTest (or list of columns) and use it like this:

val filterExpr = schemaTest.fields
  .filter(!_.nullable)
  .map(f => col(f.name).isNotNull)
  .reduce(_ and _)

val df = loader
  .selectExpr("CAST(value AS STRING)")
  .withColumn("value", from_json(col("value"), schemaTest))
  .select(col("value.*"))
  .filter(filterExpr) 

Solution 2:[2]

I would like to propose a different way of doing :

 def isCorrupted(df: DataFrame): DataFrame = {
  val filterNullable = schemaTest
    .filter(e => !e.nullable)
    .map(_.name)

  filterNullable.foldLeft(df) { case ((accumulator), (columnName)) =>
      accumulator.withColumn("isCorrupted", when(col(columnName).isNull, 1).otherwise(0))
  } 
  .filter(col("isCorrupted") === lit(0)) 
  .drop(col("isCorrupted"))
  
}

val df = loader
  .selectExpr("CAST(value as STRING)")
  .withColumn("value", from_json(col("value"), schemaTest))
  .select(col("value.*"))
  .transform(isCorrupted)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 blackbishop
Solution 2