'PySpark flattening dataframe while appending supercolumn names
Say I have a PySpark dataframe df:
>>> df.printSchema()
root
|-- a: struct
|-- alpha: integer
|-- beta: string
|-- gamma: boolean
|-- b: string
|-- c: struct
|-- delta: string
|-- epsilon: struct
|-- omega: string
|-- psi: boolean
I know I can flatten the dataframe:
select_col_list = [col.replace("a", "a.*").replace("c", "c.*") for col in df.columns]
flat_df = df.select(*select_col_list)
This results in a schema like this:
root
|-- alpha: integer
|-- beta: string
|-- gamma: boolean
|-- b: string
|-- delta: string
|-- epsilon: struct
|-- omega: string
|-- psi: boolean
But I want to append the supercolumn's name to subcolumns when I flatten too, so I want the resulting schema to be like this:
root
|-- a_alpha: integer
|-- a_beta: string
|-- a_gamma: boolean
|-- b: string
|-- c_delta: string
|-- c_epsilon: struct
|-- omega: string
|-- psi: boolean
How do I do this?
Solution 1:[1]
I don't think there's an straightforward way to do it, but here's a hacky solution that I came up with.
- Define a list of the columns to be expanded and create a temporary
idcolumn usingpyspark.sql.functions.monotonically_increasing_id(). - Loop over all the columns in the dataframe and create a temporary dataframe for each one.
- If the column is in
cols_to_expand: Use.*to expand the column. Then rename all fields (exceptid) in the resultant (temporary) dataframe by with the corresponding prefix usingalias(). - If the column is not in
cols_to_expand: Select that column andidand store it in a temporary dataframe.
- If the column is in
- Store
temp_dfin a list. - Join all the dataframes in the list using
idand drop theidcolumn.
Code:
df = df.withColumn('id', f.monotonically_increasing_id())
cols_to_expand = ['a', 'c']
flat_dfs = []
for col in df.columns:
if col in cols_to_expand:
temp_df = df.select('id', col+".*")
temp_df = temp_df.select(
[
f.col(c).alias(col+"_"+c if c != 'id' else c) for c in temp_df.columns
]
)
else:
temp_df = df.select('id', col)
flat_dfs.append(temp_df)
flat_df = reduce(lambda x, y: x.join(y, on='id'), flat_dfs)
flat_df = flat_df.drop('id')
flat_df.printSchema()
The resulting schema:
flat_df.printSchema()
#root
# |-- a_alpha: integer (nullable = true)
# |-- a_beta: string (nullable = true)
# |-- a_gamma: boolean (nullable = true)
# |-- b: string (nullable = true)
# |-- c_delta: string (nullable = true)
# |-- c_epsilon: struct (nullable = true)
# | |-- omega: string (nullable = true)
# | |-- psi: boolean (nullable = true)
Solution 2:[2]
I actually found a way to do this today. First using the beautiful auto-flattening pyspark function by Evan V. Combine this with the rather brilliant solution to mass-renaming from proggeo and you can basically build up a list of names down the full tree of columns and alias them all as you select.
In my case I took the result of the flatten function and replaced all the "." characters with an "_" in the renaming. Result is as follows:
from pyspark.sql.types import StructType, ArrayType
def flatten(schema, prefix=None):
fields = []
for field in schema.fields:
name = prefix + '.' + field.name if prefix else field.name
dtype = field.dataType
if isinstance(dtype, ArrayType):
dtype = dtype.elementType
if isinstance(dtype, StructType):
fields += flatten(dtype, prefix=name)
else:
fields.append(name)
return fields
# Get actual field names, with nested '.' structure, and create equivalents with '_'
fields=flatten(df.schema)
fields_renamed = [field.replace(".","_") for field in fields]
# Select while aliasing for all fields
df=df.select(*[col(field).alias(new_field) for field,new_field in zip(fields,fields_renamed)])
Solution 3:[3]
From what I see, a solution would be to do something like this
const courses = await Course
.find({ published: true })
.populate({
path: 'lessons',
match: { free_preview: true },
select: 'name -_id'
})
.exec();
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | pault |
| Solution 2 | T. Shaffner |
| Solution 3 | Adrien De Peretti |
