'Is it possible to write a dataframe into 2 files of different type?

We can use following api to write dataframe into local files.

df.write.parquet(path)
df.write.json(path)

However, Can I write into a parquet and a json in one time without compute the dataframe twice ? By the way , I dont want to cache the data in memory, because it's too big.



Solution 1:[1]

If you don't cache/persist the dataframe, then it'll will need re-computed for each output format.

Solution 2:[2]

We can implement an org.apache.spark.sql.execution.datasources.FileFormat to do such thing.

DuplicateOutFormat demo

/**
 * Very Dangerous Toy Code. DO NOT USE IN PRODUCTION.
 */
class DuplicateOutFormat
  extends FileFormat
    with DataSourceRegister
    with Serializable {
  override def inferSchema(sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = {
    throw new UnsupportedOperationException()
  }

  override def prepareWrite(sparkSession: SparkSession,
                            job: Job,
                            options: Map[String, String],
                            dataSchema: StructType): OutputWriterFactory = {
    val format1 = options("format1")
    val format2 = options("format2")
    val format1Instance = DataSource.lookupDataSource(format1, sparkSession.sessionState.conf)
      .newInstance().asInstanceOf[FileFormat]
    val format2Instance = DataSource.lookupDataSource(format2, sparkSession.sessionState.conf)
      .newInstance().asInstanceOf[FileFormat]

    val writerFactory1 = format1Instance.prepareWrite(sparkSession, job, options, dataSchema)
    val writerFactory2 = format2Instance.prepareWrite(sparkSession, job, options, dataSchema)

    new OutputWriterFactory {
      override def getFileExtension(context: TaskAttemptContext): String = ".dup"

      override def newInstance(path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = {
        val path1 = path.replace(".dup", writerFactory1.getFileExtension(context))
        val path2 = path.replace(".dup", writerFactory2.getFileExtension(context))
        val writer1 = writerFactory1.newInstance(path1, dataSchema, context)
        val writer2 = writerFactory2.newInstance(path2, dataSchema, context)
        new OutputWriter {
          override def write(row: InternalRow): Unit = {
            writer1.write(row)
            writer2.write(row)
          }

          override def close(): Unit = {
            writer1.close()
            writer2.close()
          }
        }
      }
    }
  }

  override def shortName(): String = "dup"
}

SPI

we should make a SPI file /META-INF/services/org.apache.spark.sql.sources.DataSourceRegister, content: com.github.sparkdemo.DuplicateOutFormat.

demo usage

class DuplicateOutFormatTest extends FunSuite {
  val spark = SparkSession.builder()
    .master("local")
    .getOrCreate()
  val sc = spark.sparkContext

  import spark.implicits._

  test("testDuplicateWrite") {
    val data = Array(
      ("k1", "fa", "20210901", 16),
      ("k2", null, "20210902", 15),
      ("k3", "df", "20210903", 14),
      ("k4", null, "20210904", 13)
    )
    val tempDir = System.getProperty("java.io.tmpdir") + "spark-dup-test" + System.nanoTime()
    val df = sc.parallelize(data).toDF("k", "col2", "day", "col4")
    df.write
      .option("format1", "csv")
      .option("format2", "orc")
      .format("dup").save(tempDir)
    df.show(1000, false)
  }

}

WARNING

Spark SQL couple some sth in DataFrameWriter#saveToV1Source and other source code, that we can't change. This custom DuplicateOutFormat is just for demo, lacking of test. Full demo in github.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 OneCricketeer
Solution 2 tianzhipeng