'Is it possible to do dbutils io asynchronously?

I've written some code (based on https://stackoverflow.com/a/40199652/529618) that writes partitioned data to blob, and for the most part it's quite quick. The slowest part is that the one csv file per partition I have spark generate are named in a user-unfriendly way, so I do a simple rename operation to clean them up (and delete some excess files). This takes much longer than writing the data in the first place.

  # Organize the data into a folders matching the specified partitions, with a single CSV per partition
  from datetime import datetime
  def one_file_per_partition(df, path, partitions, sort_within_partitions, VERBOSE = False):
    extension = ".csv.gz" # TODO: Support multiple extention
    start = datetime.now()
    df.repartition(*partitions).sortWithinPartitions(*sort_within_partitions) \
      .write.partitionBy(*partitions).option("header", "true").option("compression", "gzip").mode("overwrite").csv(path)
    log(f"Wrote {get_df_name(df)} data partitioned by {partitions} and sorted by {sort_within_partitions} to:" +
        f"\n  {path}\n  Time taken: {(datetime.now() - start).total_seconds():,.2f} seconds")

    # Recursively traverse all partition subdirectories and rename + move the CSV to their root
    # TODO: This is very slow, it should be parallelizable
    def traverse(root, remaining_partitions):
      if VERBOSE: log(f"Traversing partitions by {remaining_partitions[0]} within folder: {root}")
      for folder in list_subfolders(root):
        subdirectory = os.path.join(root, folder)
        if(len(remaining_partitions) > 1):
          traverse(subdirectory, remaining_partitions[1:])
        else:
          destination = os.path.join(root, folder[len(f"{remaining_partitions[0]}="):]) + extension
          if VERBOSE: log(f"Moving file\nFrom:{subdirectory}\n  To:{destination}")
          spark_output_to_single_file(subdirectory, destination, VERBOSE)

    log(f"Cleaning up spark output directories...")
    start = datetime.now()
    traverse(path, partitions)
    log(f"Moving output files to their destination took {(datetime.now() - start).total_seconds():,.2f} seconds")
# Convert a single-file spark output folder into a single file at the specified location, and clean up superfluous artifacts
def spark_output_to_single_file(output_folder, destination_path, VERBOSE = False):
  output_files = [x for x in dbutils.fs.ls(output_folder) if x.name.startswith("part-")]
  if(len(output_files) == 0):
    raise FileNotFoundError(f"Could not find any output files (prefixed with 'part-') in the specified spark output folder: {output_folder}")
  if(len(output_files) > 1):
    raise ValueError(f"The specified spark folder has more than 1 output file in the specified spark output folder: {output_folder}\n" +
                     f"We found {len(output_files)}: {[x.name for x in output_files]}\n" +
                     f"This function should only be used for single-file spark outputs.")
  dbutils.fs.mv(output_files[0].path, destination_path)
  # Clean up all the other spark output generated to our temp folder
  dbutils.fs.rm(output_folder, recurse=True)
  if VERBOSE: log(f"Successfully wrote {destination_path}")

Here is a sample output:

2022-04-22 20:36:45.313963 Wrote df_test data partitioned by ['Granularity', 'PORTINFOID'] and sorted by ['Rank'] to: /mnt/.../all_data_by_rank

Time taken: 19.31 seconds

2022-04-22 20:36:45.314020 Cleaning up spark output directories...

2022-04-22 20:37:42.583850 Moving output files to their destination took 57.27 seconds

I believe the reason is that I'm processing the folders sequentially, and if I could simply do it in parallel, it would go much quicker.

The problem is that all IO on databricks is done with "dbutils", which is abstracting out mounted blob container and making this sort of thing very easy. I just can't find any information about doing async IO with this utility though.

Does anyone know how I could attempt to parallelize this activity?



Solution 1:[1]

The solution wound up being to abandon dbutils, which does not support parallelism in any way, and instead use os operations, which does:

import os
from datetime import datetime
from pyspark.sql.types import StringType

# Recursively traverse all partition subdirectories and rename + move the outputs to their root
# NOTE: The code to do this sequentially is much simpler, but very slow. The complexity arises from parallelising the file operations
def spark_output_to_single_file_per_partition(root, partitions, output_extension, VERBOSE = False):
  if VERBOSE: log(f"Cleaning up spark output directories...")
  start = datetime.now()
  # Helper to recursively collect information from all partitions and flatten it into a single list
  def traverse_partitions(root, partitions, fn_collect_info, currentPartition = None):
    results = [fn_collect_info(root, currentPartition)]
    return results if len(partitions) == 0 else results + \
      [result for subdir in [traverse_partitions(os.path.join(root, folder), partitions[1:], fn_collect_info, partitions[0]) for folder in list_subfolders(root)] for result in subdir]
  # Get the path of files to rename or delete. Note: We must convert to OS paths because we cannot parallelize use of dbutils
  def find_files_to_rename_and_delete(folder, partition):
    files = [x.name for x in dbutils.fs.ls(folder)]
    renames = [x for x in files if x[0:5] == "part-"]
    deletes = [f"/dbfs{folder}/{x}" for x in files if x[0:1] == "_"]
    if len(renames) > 0 and partition is None: raise Exception(f"Found {len(files)} partition file(s) in the root location: {folder}. Have files already been moved?")
    elif len(renames) > 1: raise Exception(f"Expected at most one partition file, but found {len(files)} in location: {folder}")
    elif len(renames) == 1: deletes.append(f"/dbfs{folder}/") # The leaf-folders (containing partitions) should be deleted after the file is moved
    return (deletes, None if len(renames) == 0 else (f"/dbfs{folder}/{renames[0]}", f"/dbfs{folder.replace(partition + '=', '')}{output_extension}"))
  # Scan the file system to find all files and folders that need to be moved and deleted
  if VERBOSE: log(f"Collecting a list of files that need to be renamed and deleted...")
  actions = traverse_partitions(root, partitions, find_files_to_rename_and_delete)
  # Rename all files in parallel using spark executors
  renames = [rename for (deletes, rename) in actions if rename is not None]
  if VERBOSE: log(f"Renaming {len(renames)} partition files...")
  spark.createDataFrame(renames, ['from', 'to']).foreach(lambda r: os.rename(r[0], r[1]))
  # Delete unwanted spark temp files and empty folders
  deletes = [path for (deletes, rename) in actions for path in deletes]
  delete_files = [d for d in deletes if d[-1] != "/"]
  delete_folders = [d for d in deletes if d[-1] == "/"]
  if VERBOSE: log(f"Deleting {len(delete_files)} spark outputs...")
  spark.createDataFrame(delete_files, StringType()).foreach(lambda r: os.remove(r[0]))
  if VERBOSE: log(f"Deleting {len(delete_folders)} empty folders...")
  spark.createDataFrame(delete_folders, StringType()).foreach(lambda r: os.rmdir(r[0]))
  log(f"Moving output files to their destination and cleaning spark artifacts took {(datetime.now() - start).total_seconds():,.2f} seconds")

This lets you generate partitioned data, with user-friendly names, and clean up all the spark temp files (_started..., _committed..., _SUCCESS) generated in the process.

Usage:

# Organize the data into a folders matching the specified partitions, with a single CSV per partition
def dataframe_to_csv_gz_per_partition(df, path, partitions, sort_within_partitions, rename_spark_outputs = True, VERBOSE = False):
  start = datetime.now()
  # Write the actual data to disk using spark
  df.repartition(*partitions).sortWithinPartitions(*sort_within_partitions) \
    .write.partitionBy(*partitions).option("header", "true").option("compression", "gzip").mode("overwrite").csv(path)
  log(f"Wrote {get_df_name(df)} data partitioned by {partitions} and sorted by {sort_within_partitions} to:" +
      f"\n  {path}\n  Time taken: {(datetime.now() - start).total_seconds():,.2f} seconds")
  # Rename outputs and clean up 
  spark_output_to_single_file_per_partition(path, partitions, ".csv.gz", VERBOSE)

For what it's worth, I also tried parallelizing with Pool, but the results were not as good. I haven't attempted importing and using any libraries that can do async io, I imagine this would perform the best.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1