'Read and group json files by date element using pyspark

I have multiple JSON files (10 TB ~) on a S3 bucket, and I need to organize these files by a date element present in every json document.

What I think that my code needs to do

  • Read all json files in the s3 bucket.
  • Keep all documents which have the element "creation_date" between 2022-01-01 and 2022-04-01
  • Save them in another bucket in a parquet format.

I'm not sure that's the right thing to do, considering the size that I'm dealing it.

Here's an example of a json document. Each file has multiple of these documents.

  {
    "id": 123456,
    "creation_date": "2022-01-01T23:35:16",
    "params": {
      "doc_info": "AXBD",
      "return_date": "20/05/2021",
      "user_name": "XXXXXXXX",
      "value": "40,00"
    },
    "user_id": "1234567",
    "type": "TEST"
  }
]

Here's what I already tried on a DB notebook, but in fact, I can't use the code directly on a notebook. I necessarily need to write a spark code and run on an airflow dag, because I don't have write access on the bucket using directly from the notebook.

# Trying to read all the json files
df_test = spark.read.json("s3://my-bucket/**/**" + "/*.json")

# Filtering all documents that has the creation_date period that I want
df_test_filter = df_test.filter(F.col("creation_date").between('2022-01-01','2022-04-01'))

# Write parquet on another bucket
# In this test, I'm saving on a local bucket that I have write access.
df_test_filter.write.mode('overwrite').parquet("s3://my-local-test-bucket/")

That seems to work fine on a single json file that I use to test, but my questions are:

  • How can I do this without a databricks notebook, and using an airflow dag with pyspark?
  • Thinking in performance issues, there is a better way to do this?


Solution 1:[1]

Do you want to run the job only once or do you want to do it periodically?

One run

What you have should work well

# Trying to read all the json files
sdf = spark.read.json("s3://my-bucket/**/**/*.json")

The only thing I'd add is to partition the output by the date to speed up queries:

(
    # Filtering all documents that has the creation_date period that I want
    sdf.filter(F.col("creation_date").between('2022-01-01','2022-04-01'))
    # Partition by creation date so that's easier to query
    .partitionBy("creation_date")
    # Export the data
    .write.mode('append')
    .parquet("s3://my-local-test-bucket/")
)

Running it periodically

Here I wonder what is the file structure is. It is a good idea to have data partitioned by some dates and in this case it looks like you might have the input data partitioned by another date (maybe insert_date?).

Assuming that's the case I suggest each day you read that data and then you write it as parquet partitioned by the date you want.

This would be done by:

# Trying to read all the json files
sdf = spark.read.json(f"s3://my-bucket/insert_date={today:%Y-%m-%d}/*/")

sdf.partitionBy("creation_date").write.mode('append').parquet("s3://my-local-test-bucket/")

And later on you can simply retrive the data you need with:

sdf = (
    spark.read.json(f"s3://my-bucket/")
    .where(F.col("creation_date").between('2022-01-01','2022-04-01'))
)

Solution 2:[2]

To run PySpark jobs on AWS I recommend to use either AWS Glue or EMR.

The EMR is cheaper to run but AWS Glue is easier to configure.

Here is one example of how Glue job might look like.

Airflow has a Glue job operator that can trigger a Glue job from an Airflow DAG.

Regarding the performance optimization, your code looks reasonably optimal and it is unlikely for you get it working significantly faster.

One way to make date range selection faster is to store JSONs in different folders according to their creation_date.

You can store your data in the following folders:

s3://my-bucket/creation-date=2022-01-01/

s3://my-bucket/creation-date=2022-01-02/

if you do this you wouldn't need to read all the JSONs when you filter by a date range.

Solution 3:[3]

OK @fahabashev did good but missed some critical points.

To run PySpark jobs on AWS I recommend to use either AWS Glue or EMR.

The EMR is cheaper to run but AWS Glue is easier to configure.

Here is one example of how Glue job might look like.

Airflow has a Glue job operator that can trigger a Glue job from an Airflow DAG.

This all sounds good. You don't want to write files into directories in S3. File lookup is expensive in S3. (avoid the use of '*') for best performance write a large file in parquet and let the CPU's do filtering. Great choice using a zippable/splittable format(Parquet). Don't move the JSON files, suck them into Parquet as soon as you can. JSON parsing isn't expensive, reading all the characters that are required to house JSON is expensive. I have seen 10000% improvement in performancing moving off of JSON and using Parquet/ORC. I think you should start migration as soon as you can as multiple file lookups is where you are going to lose a lot of time.

Solution 4:[4]

'#hey this is the answer:

Trying to read all the files

sdf = spark.read.json("s3://my-bucket///*.json")'

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 villoro
Solution 2
Solution 3 Matt Andruff
Solution 4 raid