'Add new column while converting Json to Parquet
Json to parquet conversion is working fine, but I need to add a new column in parquet table containing timestamp so that I can query records in latest to oldest order.
Currently I am using the following script:
def toString(rec):
for key in rec:
rec[key] = str(rec[key])
return rec
def convertIntoParquet(sourceTableName):
# relationalize the json schema.
datasource = glueContext.create_dynamic_frame.from_catalog(database = glueSourceDatabase, table_name = sourceTableName, transformation_ctx = "datasource")
relationalizedDatasource = Relationalize.apply(frame = datasource, staging_path = glueTempStorage, name = jsonRootTableName, transformation_ctx = "relationalizedDatasource")
transactionData = relationalizedDatasource.select(jsonRootTableName)
modifiedTransactionData = Map.apply(frame = transactionData, f = toString)
# adding new partitions
modifiedTransactionData_df = modifiedTransactionData.toDF()
print("Total transactions in dataframe : " , modifiedTransactionData_df.count())
if(modifiedTransactionData_df.count() == 0):
return
modifiedTransactionData_df = modifiedTransactionData_df.repartition(col("panHash"), col("financialYear"), col("accountId"))
modifiedTransactionData = DynamicFrame.fromDF(modifiedTransactionData_df, glueContext, "modifiedTransactionData")
# converts in parquet format.
sink = glueContext.getSink(
connection_type = "s3",
path = glueRelationalizeOutputS3Path,
partitionKeys = ["panHash", "financialYear", "accountId"],
transformation_ctx = "sink")
sink.setFormat("glueparquet")
sink.writeFrame(modifiedTransactionData)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
