'Azure databricks spark - write to blob storage

I have a data frame with two columns - filepath (wasbs file path for blobs), string and want to write each string to a seperate blob with that file name. How can i do this?



Solution 1:[1]

You can only write to one wasb container at a time - not sure if this is part of your question, but I want to clarify either way. In addition, spark writes files to directories, not single files. If you want to accomplish exactly what youre asking for, you'll have to repartition to 1 partition and partition by filepath.

After that step you'll need to use the azure sdk to rename the files and move them up to the parent directory.

Solution 2:[2]

UPDATED ANSWER:

I found a much simpler way of accomplishing this using dbutils.fs.put. You would need to loop through each row of your DataFrame, calling dbutils.fs.put() for each row.

Assuming your input file (assumed CSV) with two columns looks something like:

filepath, stringValue
wasbs://[email protected]/demo1.txt,"demo string 1"
wasbs://[email protected]/demo2.txt,"demo string 2"
wasbs://[email protected]/demo3.txt,"demo string 3"
wasbs://[email protected]/demo4.txt,"demo string 4"
wasbs://[email protected]/demo5.txt,"demo string 5"

You can use the following to loop through each row in your input DataFrame:

df = spark.read.option("header", True).csv("wasbs://[email protected]/demo-data.csv")

rowList = df.rdd.collect()
for row in rowList:
  dbutils.fs.put(str(row[0]), str(row[1]), True)

The put method writes a given String out to a file, encoded in UTF-8, so using this you can loop through each record in your DataFrame, passing the first column in as the file path, and the second as the string contents to write to the file.

This also has the benefit of writing the string to a single file, so you don't need to go through the process of renaming and moving files.

OLD ANSWER:

Due to the distributed nature of Spark, writing a DataFrame to files results in a directory being created which will contain multiple files. You can use coalesce to force the processing to a single worker and file, whose name will start with part-0000.

DISCLAIMER: This is recommended only for small files, as larger data files can lead to out of memory exceptions.

To accomplish what you are attempting, you would need to loop through each row of your DataFrame, creating a new DataFrame for each row which contains only the string value you want written to the file.

Assuming your input file (assumed CSV) with two columns looks something like:

filepath, stringValue
wasbs://[email protected]/demo1,"demo string 1"
wasbs://[email protected]/demo2,"demo string 2"
wasbs://[email protected]/demo3,"demo string 3"
wasbs://[email protected]/demo4,"demo string 4"
wasbs://[email protected]/demo5,"demo string 5"

You can use the following to loop through each row in your input DataFrame:

from pyspark.sql import *
from pyspark.sql.types import StringType

df = spark.read.option("header", True).csv("wasbs://[email protected]/demo-data.csv")

rowList = df.rdd.collect()
for row in rowList:
  dfRow = spark.createDataFrame([str(row[1])], StringType())
  dfRow.coalesce(1).write.mode("overwrite").text(row[0])

This will result in directories being created in your Blob Storage account container named demo1, demo2, demo3, demo4, and demo5. Each of those will contain multiple files. The file within each directory whose name begins with part-0000 is the file that will contain your string value.

If you need those files to have different names, and be in a different location, you can then use dbutils.fs methods to handle moving the files and doing the renames. You can also use this to do any cleanup of the directories that were created, if desired.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Joe Widen
Solution 2