'Fastest way to chunk and calculate sum, min, max in a 70 GB csv file

I have a 70 GB csv file (data.csv) that has the following structure Timestamp,Price,Volume.

1649289600174,100,100
1649289600176,105,100
1649289600178,109,100
1649289600179,104,100
1649289600181,101,100
1649289600182,100,100
1649289600188,90,100

I can read line by line like this using python.

with open("data.csv") as infile:
    for line in infile:
        row = line.split(",")
        current_price = row[1]

If the prices moves +10 or -10, then I would like to take a snapshot of that chunk and calculate open, close, high, low values for those prices. Also volume_sum for the volume. In the above sample, price moves from 100 to 90.

  • open is the first price found in the chunk. In the above sample it is 100
  • close is the last price found in the chunk. In the above sample it is 90
  • high is the max(price) found in the chunk. In the above sample it is 109
  • low is the min(price) found in the chunk. In the above sample it is 90
  • volume_sum is the sum(volume) found in the chunk. In the above sample it is 700

These open, close, high, low, volume_sum becomes a new record. The new chunk will be measured from the close price, which is 90. i.e. If the price goes to 100 or goes to 80, that would become a new chunk.

I'm on Ubuntu. Since, I have a very large file, I'm looking for a more efficient way to do this. [I don't want to use pandas for this. It is slow for my use case]

Can someone guide me?

Thanks



Solution 1:[1]

Assumptions:

  • first chunk starts with the price found in the first row
  • when switching chunks the volume of the corresponding 'switch' row is not counted in the new chunk
  • though not mentioned in the question/description, we'll also provide the open/close times for each chunk
  • ouput format: openTime,closeTime,openPrc,closePrc,highPrc,lowPrc,volume
  • when we reach the end of the file go ahead and print out the last chunk regardless of whether or not we crossed the +/- 10 theshold (and assuming there are at least 2 data rows in the last chunk)

Adding a few more lines to the input:

$ cat data.csv
1649289600174,100,100
1649289600176,105,100
1649289600178,109,100
1649289600179,104,100
1649289600181,101,100
1649289600182,100,100
1649289600188,90,100
1649289600190,83,90
1649289600288,95,60
1649289600388,79,35
1649289600488,83,100

One awk idea:

awk -v mvDN=-10 -v mvUP=10 '

function print_stats() {
    if (openTime && openTime != closeTime)
       print openTime,closeTime,openPrc,closePrc,highPrc,lowPrc,volume

    openTime=closeTime
    openPrc=highPrc=lowPrc=closePrc
    volume=( NR==1 ? currVol : 0 )
}

BEGIN { FS=OFS="," }
      { closeTime=$1
        currPrc=$2
        currVol=$3

        closePrc=currPrc
        volume+=currVol
        highPrc=(currPrc > highPrc ? currPrc : highPrc)
        lowPrc= (currPrc < lowPrc  ? currPrc : lowPrc)

        if ( (currPrc - openPrc <= mvDN) || (currPrc - openPrc >= mvUP) )
           print_stats()
      }
END   { print_stats() }
' data.csv

This generates:

1649289600174,1649289600188,100,90,109,90,700
1649289600188,1649289600388,90,79,95,79,185
1649289600388,1649289600488,79,83,83,79,100

Solution 2:[2]

As others said, use a database or some other tools like Spark if you want to do this. 70GB is a lot. That being said, you can just iterate over the lines, keep track of the relevant values in a dictionary buffer and then write and empty the buffer when you find the relevant differences. You should pack all of this in a class but the gist of it is the following:

def convert(tup):
    tup = tup.split(",")
    return tup[0], int(tup[1]), int(tup[2])


def write_buffer(buffer, close_price, min_price, max_price, close_stamp, volume_sum):
    buffer["close_price"] = close_price
    buffer["min_price"] = min_price
    buffer["max_price"] = max_price
    buffer["close_stamp"] = close_stamp
    buffer["volume_sum"] = volume_sum


def buffer_to_file(buffer, path_to_file):
    # Ensure insertion order of keys
    keys = [
        "open_stamp",
        "open",
        "close_price",
        "min_price",
        "max_price",
        "volume_sum",
        "close_stamp",
    ]
    with open(path_to_file, "w") as f:
        for k in keys[:-1]:
            f.write(f"{buffer[k]}, ")
        f.write(f"{buffer[keys[-1]]}")  # do not write the last val with a comma
        f.write("\n")  # new line for each buffer write


in_path = "read-large-file.txt"
out_path = "out.txt"


def compute_stats(in_path, out_path):
    with open(in_path) as f:
        time, open_p, vol = convert(f.readline())
        min_p, max_p, volume_sum = open_p, open_p, vol
        buffer = {"open_stamp": time, "open": open_p}
        for line in f:
            t, p, v = convert(line)
            volume_sum += v
            if p > max_p:
                max_p = p
            if p < min_p:
                min_p = p
            if abs(p - open_p) >= 10:
                write_buffer(buffer, p, min_p, max_p, t, volume_sum)
                buffer_to_file(buffer, out_path)
                # Reset buffer and values
                min_p, max_p, volume_sum = p, p, v
                buffer = {"open_stamp": t, "open": p}


compute_stats(in_path, out_path)

Solution 3:[3]

There is no much you can do as you need to read all the lines. Python I/O is better than it was, but it does not match low level languages like C or Rust or AWK.

One thing you can do is use bigger memory chunk, you may also want to try to work with bytes. maybe you can find a way to split the job in smaller chunk and use async or mp.

Anyway 70GB file is huge.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Nathan Furnal
Solution 3 RomainL.