'Spark caching - when to cache after foreachbatch (Spark Streaming)
I'm currently reading from a Kafka topic using spark streaming. Then, ForEachBatch (df), I do some transformations. I first filter the df batch by an id (df_filtered - i can do this filter n amount of times), then create a dataframe based on that filtered df (new_df_filtered - because the the data comes as a json message and I want to convert it to a normal column structure, providing it the schema), and finally writing in to 2 sinks. Here's a sample of the code:
def sink_process(self, df: DataFrame, current_ids: list):
df.repartition(int(os.environ.get("SPARK_REPARTITION_NUMBER")))
df.cache()
for id in current_ids:
df_filtered = self.df_filter_by_id(df, id) #this returns the new dataframe with the schema. Uses a .where and then a .createDataFrame
first_row = df_filtered.take(1) #making sure that this filter action returns any data
if first_row:
df_filtered.cache()
self.sink_process(df_filtered, id)
df_filtered.unpersist()
df.unpersist()
My question is where should I cache this data for optimal performance. Right now I cached the batch before applying any transformations, which I have come to realise that at that point is not really doing anything, as it's only cached when the first action occurs. So following this logic, i'm only really caching this df when i'm reaching that .take, right? But at this point, i'm also caching that filtered df. The idea behind caching the batch data before the filter was that if had a log different ids, i wasn't fetching the data every time I was doing the filter, but I might have gotten this all wrong.
Can anyone please help clarify what would be the best approach? Maybe only caching the df_filtered one which is going to be used for the different sinks?
Thanks
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
