'pyspark pivot function unable to create column: Cannot resolve column name "In" among (A, Out)

I have a input dataframe df with following columns: A, B, C.

| A | B  | C  |
| 1 | a1 | x1 |
| 1 | a1 | x2 |
| 1 | a2 | x3 |
| 1 | a3 | -  |
| 2 | a4 | x4 |
| 2 | a5 | x5 |
| 2 | a6 | x6 |
| 2 | a6 | x7 |
| 2 | a6 | x8 |
| 2 | a7 | -  |

I am creating a pyspark dataframe as below:

from pyspark.sql import functions as F
df1 = df.groupBy("A", "B")\
        .agg(F.countDistinct("C").alias("UniqueCount"))\
        .withColumn("InOrOut", F.when(F.col("UniqueCount") == 0,F.lit("Out"))\
                                      .otherwise(F.lit("In")))

df1 comes out to be:

| A  | B  | UniqueCount  | InOrOut |
| 1  | a1 | 2            | In      |
| 1  | a2 | 1            | In      |
| 1  | a3 | 0            | Out     |
| 2  | a4 | 1            | In      |
| 2  | a5 | 1            | In      |
| 2  | a6 | 3            | In      |
| 2  | a7 | 0            | Out     |

Then I am using pivot on above as below:

    df2 = df1.groupBy("A")\
             .pivot("InOrOut")\
             .agg(*[F.countDistinct(F.col(x)).alias(x) for x in ["B"]])\
             .na.fill(value=0,subset=["In", "Out"])

I was expecting df2 to be:

| A  |  In  | Out  |
| 1  |  2   | 1    |
| 2  |  3   | 1    |

Instead, I am getting below error Cannot resolve column name "In" among (A, Out). i have verified that df1 is created and has the data with condition where UniqueCount == 0 as where UniqueCount <> 0



Solution 1:[1]

Its working out as expected -

Spark - v3.1.2

Data Preparation

s = StringIO("""
A,B,C
1,a1,x1 
1,a1,x2 
1,a2,x3 
1,a3,None
2,a4,x4 
2,a5,x5 
2,a6,x6 
2,a6,x7 
2,a6,x8 
2,a7,None
""")

df = pd.read_csv(s,delimiter=',')

sparkDF = sql.createDataFrame(df).withColumn('C',F.when(F.col('C') == 'None',F.lit(None)).otherwise(F.col('C')))

sparkDF.show()

+---+---+----+
|  A|  B|   C|
+---+---+----+
|  1| a1| x1 |
|  1| a1| x2 |
|  1| a2| x3 |
|  1| a3|null|
|  2| a4| x4 |
|  2| a5| x5 |
|  2| a6| x6 |
|  2| a6| x7 |
|  2| a6| x8 |
|  2| a7|null|
+---+---+----+

Aggregate

sparkDF_agg = sparkDF.groupBy("A", "B")\
                    .agg(F.countDistinct("C").alias("UniqueCount"))\
                    .withColumn("InOrOut", F.when(F.col("UniqueCount") == 0,F.lit("Out"))\
                                                  .otherwise(F.lit("In")))


sparkDF_agg.show()

+---+---+-----------+-------+
|  A|  B|UniqueCount|InOrOut|
+---+---+-----------+-------+
|  1| a3|          0|    Out|
|  1| a1|          2|     In|
|  2| a5|          1|     In|
|  2| a6|          3|     In|
|  2| a4|          1|     In|
|  2| a7|          0|    Out|
|  1| a2|          1|     In|
+---+---+-----------+-------+

Pivot

sparkDF_agg.groupBy("A")\
             .pivot("InOrOut")\
             .agg(*[F.countDistinct(F.col(x)).alias(x) for x in ["B"]])\
             .na.fill(value=0,subset=["In", "Out"])\
             .show()

+---+---+---+
|  A| In|Out|
+---+---+---+
|  1|  2|  1|
|  2|  3|  1|
+---+---+---+

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Vaebhav