'Databricks/python - what is a best practice approach to create a robust long running job

I couldn't find a good overview how to create a job that has a moderate possibility of failure.

I am an experienced developer, but I am relatively new to databricks/spark. While I can program my way out of my problem, I'm looking for a best practice solution.

My scenario is reading a large number of rows out of a web API. The job takes about 36 hours to run. During these 36 hours, there is a high probability that I will encounter a fatal error while interacting with the API (Timeouts, disconnects while reading, invalid/unexpected return values, etc.). While I can increasingly make my job robust to these errors, ideally, I will not have to run the entire job again to recover. Ideally, I only need to run the failed cases.

My basic flow is like this:

  • Read in a curated set of IDs (100's thousands)
  • For each ID, call the web API to get details
  • Write the resulting output into a new table (ID + Details)

Approaches I have evaluated:

  1. Attempt to capture all errors in a blanket fasion and output failures into the resulting table. Recovery is then to read the failed rows as a source of IDs after patching whatever caused the failure.
  2. Partition the initial dataset into multiple files and cobble together something that schedules work on individual partitions. Then re-run a single partition if one of the items in it fails. After all succeed, aggregate the results. I think this is doable but with my limited understanding of databricks it looks pretty messy. I'd do my own partitioning and task scheduling. I'm hoping there is a better way.

The solution I imagine in my head is something like:

# Split the source table into 100 equal buckets
# Run only buckets 10,20,21 (presumably, those are the failed buckets)
# For each bucket, run the udf get_details
# If the bucket succeeds, put it's rows into aggregate_df.  Otherwise, into error_df
aggregate_df, error_df = df.split_table_evenly(bucket_count=100)
  .options(continue_on_task_failure=true)
  .filter(bucket=[10,20,21])
  .run_task_on_bucket(udf=get_details)


Solution 1:[1]

  • How many Categories can a video be associated with? How many Tags? If both are more than 1, there is very little excuse to keep them separate.

  • Using a many-to-many table to match Tags with Videos is costly. (I assume the tags include actor(s), director(s), genre(s), location(s), etc. That could easily lead to 100M rows in that table.

  • You may want to match 2 tags at once. Performance takes another hit.

  • If there is only one Category, put it in the video table -- no Join needed.

  • I have found that things like "Tag" should have the string in the many-to-many table, thereby merging tags_videos and tags tables:

    CREATE TABLE `tags` (
        video_id INT UNSIGNED NOT NULL,
        tag VARCHAR(100) NOT NULL,
        PRIMARY KEY(video_id, tag),  -- useful for getting tags for a video
        INDEX(tag, video_id)         -- useful for getting videos for a tag
          ) ENGINE=InnoDB;
        -- No auto_increment; it would just be clutter and slow things down
    

When searching for either of 2 tags:

  FROM videos AS v
  JOIN tags AS t
  WHERE t.tag IN ('comedy', 'John Wayne')

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Rick James