'2 dataframe column values not working in where clause
val creation_timestamp = df.groupBy().agg(min($"userCreation_timestamp").alias("ts")).col("ts")
df.filter(col("userCreation_timestamp").cast("timestamp") >= creation_timestamp).show()
or
df.where(col("userCreation_timestamp").cast("timestamp") >= creation_timestamp).show()
When running the code above to show the data, I obtain the following exception:
org.apache.spark.sql.AnalysisException: Resolved attribute(s).
org.apache.spark.sql.AnalysisException: Resolved attribute(s) ts#1658 missing from id#2,userCreation_timestamp#8,firstname#31 in operator !Filter (cast(userCreation_timestamp#8 as timestamp) >= ts#1658).;;
!Filter (cast(userCreation_timestamp#8 as timestamp) >= ts#1658)
+- Relation[id#02,userCreation_timestamp#8, 26 more fields] parquet
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:92)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:293)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:84)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:172)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:178)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:65)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:3306)
at org.apache.spark.sql.Dataset.filter(Dataset.scala:1463)
... 49 elided
df.where(col("userCreation_timestamp").cast("timestamp") >= "2022-03-11 18:36:48").show()
with literal value in where clause, code is working fine but when using dataframe then it is getting failed
Solution 1:[1]
You can firstly select the min timestamp as value and then use this value in where/filter function. Please find the below working sample:
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.{SparkSession, functions}
object QuestionStackOverflow extends App {
val spark = SparkSession.builder
.master("local[*]")
.appName("Sample App")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
import spark.sqlContext.implicits._
val df = Seq(
(1, "2022-03-11 18:36:48"),
(2, "2022-03-11 19:00:00"),
(3, "2022-03-11 20:00:00")
).toDF("id", "userCreation_timestamp")
.withColumn("ts", col("userCreation_timestamp").cast(TimestampType))
df.printSchema()
val creation_timestamp = df
.select(functions.min("ts"))
.head().get(0)
df.where(col("ts") > lit(creation_timestamp).cast(TimestampType))
.show()
}
Schema is:
root
|-- id: integer (nullable = false)
|-- userCreation_timestamp: string (nullable = true)
|-- ts: timestamp (nullable = true)
Output:
+---+----------------------+-------------------+
| id|userCreation_timestamp| ts|
+---+----------------------+-------------------+
| 2| 2022-03-11 19:00:00|2022-03-11 19:00:00|
| 3| 2022-03-11 20:00:00|2022-03-11 20:00:00|
+---+----------------------+-------------------+
If you are interested in similar issues regarding Spark, please visit my blog: https://bigdata-etl.com/tag/apache-spark/
Solution 2:[2]
You're getting this error because you're not passing the value of the ts column in your filter condition, but the column itself. As the ts column does not exist in df dataframe, you get a AnalysisException: Resolved attribute(s) ts#1658 missing exception.
If you want to pass the value of the column, you need to retrieve the first row of your aggregated dataframe, then retrieve the timestamp value in this row, and finally use lit to pass it to your condition:
import org.apache.spark.sql.functions.{min, lit, col}
val creation_timestamp = df.agg(min($"userCreation_timestamp")).head().getTimestamp(0)
df.filter(col("userCreation_timestamp").cast("timestamp") >= lit(creation_timestamp)).show()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Pawe? Cie?la |
| Solution 2 | Vincent Doba |
