'Is it possible to write a dataframe into 2 files of different type?
We can use following api to write dataframe into local files.
df.write.parquet(path)
df.write.json(path)
However, Can I write into a parquet and a json in one time without compute the dataframe twice ? By the way , I dont want to cache the data in memory, because it's too big.
Solution 1:[1]
If you don't cache/persist the dataframe, then it'll will need re-computed for each output format.
Solution 2:[2]
We can implement an org.apache.spark.sql.execution.datasources.FileFormat to do such thing.
DuplicateOutFormat demo
/**
* Very Dangerous Toy Code. DO NOT USE IN PRODUCTION.
*/
class DuplicateOutFormat
extends FileFormat
with DataSourceRegister
with Serializable {
override def inferSchema(sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = {
throw new UnsupportedOperationException()
}
override def prepareWrite(sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
val format1 = options("format1")
val format2 = options("format2")
val format1Instance = DataSource.lookupDataSource(format1, sparkSession.sessionState.conf)
.newInstance().asInstanceOf[FileFormat]
val format2Instance = DataSource.lookupDataSource(format2, sparkSession.sessionState.conf)
.newInstance().asInstanceOf[FileFormat]
val writerFactory1 = format1Instance.prepareWrite(sparkSession, job, options, dataSchema)
val writerFactory2 = format2Instance.prepareWrite(sparkSession, job, options, dataSchema)
new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = ".dup"
override def newInstance(path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = {
val path1 = path.replace(".dup", writerFactory1.getFileExtension(context))
val path2 = path.replace(".dup", writerFactory2.getFileExtension(context))
val writer1 = writerFactory1.newInstance(path1, dataSchema, context)
val writer2 = writerFactory2.newInstance(path2, dataSchema, context)
new OutputWriter {
override def write(row: InternalRow): Unit = {
writer1.write(row)
writer2.write(row)
}
override def close(): Unit = {
writer1.close()
writer2.close()
}
}
}
}
}
override def shortName(): String = "dup"
}
SPI
we should make a SPI file /META-INF/services/org.apache.spark.sql.sources.DataSourceRegister, content: com.github.sparkdemo.DuplicateOutFormat.
demo usage
class DuplicateOutFormatTest extends FunSuite {
val spark = SparkSession.builder()
.master("local")
.getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
test("testDuplicateWrite") {
val data = Array(
("k1", "fa", "20210901", 16),
("k2", null, "20210902", 15),
("k3", "df", "20210903", 14),
("k4", null, "20210904", 13)
)
val tempDir = System.getProperty("java.io.tmpdir") + "spark-dup-test" + System.nanoTime()
val df = sc.parallelize(data).toDF("k", "col2", "day", "col4")
df.write
.option("format1", "csv")
.option("format2", "orc")
.format("dup").save(tempDir)
df.show(1000, false)
}
}
WARNING
Spark SQL couple some sth in DataFrameWriter#saveToV1Source and other source code, that we can't change. This custom DuplicateOutFormat is just for demo, lacking of test. Full demo in github.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | OneCricketeer |
| Solution 2 | tianzhipeng |
