'Spark Version bump causes error with `exceptAll`
I've recently bumped an old PySpark app to Python3 and Spark 3.1.2. Trying to learn how to do old Spark 2 stuff using the newest version we have available.
Previously, I was able to generate 2 tables, join them in PySpark and SQL using an external DB and compare the column's values.
from random import randint, random, randrange
from string import ascii_lowercase
from datetime import datetime as dt
def genString():
size = randint(50, 200)
bal = [c.encode('ascii') for c in ascii_lowercase]
return str(b''.join([bal[int(random() * 26)] for _ in range(size)]))
def generate_data(num_to_insert):
data_a = []
data_b = []
for i in range(num_to_insert):
data_a.append((i+1, randrange(1,40000), dt.now()))
data_b.append((i+1, genString(), genString()))
raw_df_a = spark.createDataFrame(data_a).collect()
df_a = spark.createDataFrame(raw_df_a, ["id", "random_no", "created_at"])
raw_df_b = spark.createDataFrame(data_b).collect()
df_b = spark.createDataFrame(raw_df_b, ["id", "email_address", "campaign_name"])
return df_a, df_b
df_a, df_b = generate_data(2000)
# TODO - Fix join???
df_result = df_a.join(df_b, on=['id'], how='inner')
I then read the tgt_df from external storage, and compare the 2 dataframes using the exceptAll method.
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType
tgt_df.orderBy("id").select(
col("id").cast(IntegerType()).alias("id"),
col("random_no").cast(IntegerType()).alias("random_no"),
col("campaign_name").cast(StringType()).alias("campaign_name"),
col("email_address").cast(StringType()).alias("email_address")
).exceptAll(
df_result.orderBy("id").select(
col("id").cast(IntegerType()).alias("id"),
col("random_no").cast(IntegerType()).alias("random_no"),
col("campaign_name").cast(StringType()).alias("campaign_name"),
col("email_address").cast(StringType()).alias("email_address")
)
).show()
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 47.0 failed 4 times, most recent failure: Lost task 0.3 in stage 47.0 (TID 447) (10.239.251.181 executor 1): org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: id#310
I get this error...
Not sure why this was working with Spark 2, but even more not sure why this isn't working with 3.1.2.
Note - I know I swapped campaign name and email address. This should fail on it's own, but the exceptAll call is failing rather than my test.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
