'validate csv read columns against custom schema with pyspark

I have a set of custom schema for each CSV file I want to read. I want to be able to detect any additional and missing columns.

I wrote a script to compare the columns I have in a dataframes with the columns I have in list. However I don't have any required columns list per say but I have custom schema:

customSchema = StructType([
    StructField('foo', StringType(), False),
    StructField('bar', StringType(), True),
])

How can I use this custom schema to get the list of required columns? Or is there a better way to do this? Like forcing the read CSV to return an error if schema doesn't match?

right now I have something like this:

df = spark.read.csv(path, header=True, schema=customSchema)

# print with fake detect_missmatch function for the purpose of this example
print(detect_missmatch(df.columns, hardcoded_list_of_required_columns))

I'd like to avoid having this list of hardcoded_list_of_required_columns or at least be able to generate it from the custom schema I have already previously defined.



Solution 1:[1]

Maybe there is a better way but this should sufficient

import json
required_columns = [c for c in json.loads(df.schema.json())['fields'] if not c['nullable']]

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Benny Elgazar