'Filter rows of snowflake table while reading in pyspark dataframe
I have a huge snowflake table. I want to do some transformation on the table in pyspark. My snowflake table has a column called 'snapshot'. I only want to read the current snapshot data in pyspark dataframe and do transformation on the filtered data.
So, Is there a way to apply filtering the rows while reading the snowflake table in spark dataframe (I don't want to read the entire snowflake table in memory because it is not efficient) or do I need to read entire snowflake table (in spark dataframe) and then apply filter to get the latest snapshot something as below?
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
snowflake_database="********"
snowflake_schema="********"
source_table_name="********"
snowflake_options = {
"sfUrl": "********",
"sfUser": "********",
"sfPassword": "********",
"sfDatabase": snowflake_database,
"sfSchema": snowflake_schema,
"sfWarehouse": "COMPUTE_WH"
}
df = spark.read \
.format(SNOWFLAKE_SOURCE_NAME) \
.options(**snowflake_options) \
.option("dbtable",snowflake_database+"."+snowflake_schema+"."+source_table_name) \
.load()
df = df.where(df.snapshot == current_timestamp()).collect()
Solution 1:[1]
There are forms of filters (filter or where functionality of Spark DataFrame) that Spark doesn't pass to the Spark Snowflake connector. That means, in some situations, you may get more records than you expect.
The safest way would be to use a SQL query directly:
df = spark.read \
.format(SNOWFLAKE_SOURCE_NAME) \
.options(**snowflake_options) \
.option("query","SELECT X,Y,Z FROM TABLE1 WHERE SNAPSHOT==CURRENT_TIMESTAMP()") \
.load()
Of course, if you want to use filter/where functionality of Spark DataFrame, check the Query History in Snowflake UI to see if the query generated has the right filter applied.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Sergiu |
