'scala spark partitionby and get current partition name

I'm using scala spark and have a DataFrame:

Source | Column1 | Column2
A         ...        ...
B         ...        ...
B         ...        ...
C         ...        ...
B         ...        ...
C         ...        ...
A         ...        ...

I was looking into partitionBy (https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/DataFrameWriter.html) but I have a specific requirement where I have to save each partition to a separate directory. Ideally, it would look like this:

df.write.partitionBy("source").saveAsTable($"{CURRENT_SOURCE_VALUE}")

Is it possible to accomplish this using partitionBy or should try doing something else like looping over each row using rdd , or possibly groupBy? etc. any pointers would be helpful I'm fairly new to apache spark. Something like this (https://stackoverflow.com/a/43998102) but I don't think it's possible in Apache Spark Scala.

EDIT

The location (path) for each source will come from a separate map like so: var sourceLocation = Map[String, String] //where the key is the source name (A, B, C) and the value is the path (/MyCustomPathForA/.../, /MyCustomPathForB/.../, etc.) where each base path (root) could be different.



Solution 1:[1]

You were really close to getting this on your own.

As you pointed out:

public DataFrameWriter partitionBy(String... colNames) Partitions the output by the given columns on the file system. If specified, the output is laid out on the file system similar to Hive's partitioning scheme. As an example, when we partition a dataset by year and then month, the directory layout would look like:

  • year=2016/month=01/ - year=2016/month=02/

Partitioning is one of the most widely used techniques to optimize physical data layout. It provides a coarse-grained index for skipping unnecessary data reads when queries have predicates on the partitioned columns. In order for partitioning to work well, the number of distinct values in each column should typically be less than tens of thousands.

This is applicable for all file-based data sources (e.g. Parquet, JSON) staring Spark 2.1.0.

So you selected the correct tool, I might just change your code to:

df.write.partitionBy("source").save($"{CURRENT_SOURCE_VALUE}")

and you would see:

ls $"{CURRENT_SOURCE_VALUE}"
Source=A    Source=B     Source=C

EDIT If you want to filter the data and write it per path here's your adjustment.

val locationMap = Map(("A","/MyCustomPathForA/.../"),("B","/MyCustomPathForB/.../"))
df.cache() // this is important so you don't re-read from disk
locationMap.map{ 
  case(source,path) =>  { 
    df.where( df("source") === lit(source) ).write.save( path );  
  }
} 

Things that should be discussed: THis has the advantage that your unlikely to have to re-read the entire set from disk, and hopefully just use what's in memory. This is super fast an not usually a performance concern.

Moving data in HDFS is not a concern. Moves are very fast and you should worry if you moved had to move data around after the fact. The data is not actually moved only the path in the NameNode's RAM is changed. (The pathing is just an in-memory construct in HDFS).

Moving Data in Amazon S3 is a concern. Listing files in S3 is slow if you use wildcards, and will take more time in accordance with the number of files. It will also take time to physically move files in proportion to the size of file.

If you really wanted to only every write the data once and wanted to use distribute by to help enable your writes, you could write your own data writer. Here's a tutorial on how to do that. If you using S3 maybe consider this, if you are in HDFS I wouldn't bother.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1