'Python - multiprocessing multiple large size files using pandas

I have a y.csv file. The file size is 10 MB and it contains data from Jan 2020 to May 2020.

I also have a separate file for each month. e.g. data-2020-01.csv. It contains detailed data. The file size of each month file is around 1 GB.

I'm splitting the y.csv by month and then process the data by loading the relevant month file. This process is taking too long when I go for large number of months. e.g. 24 months.

I would like to process the data faster. I have access to AWS m6i.8xlarge instance which has 32 vCPU and 128 GB memory.

I'm new to multiprocessing. So can someone guide me here?

This is my current code.

import pandas as pd

periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]

y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0)  # Filesize: ~10 MB


def process(_month_df, _index):
    idx = _month_df.index[_month_df.index.get_loc(_index, method='nearest')]
    for _, value in _month_df.loc[idx:].itertuples():

        up_delta = 200
        down_delta = 200

        up_value = value + up_delta
        down_value = value - down_delta

        if value > up_value:
            y.loc[_index, "result"] = 1
            return

        if value < down_value:
            y.loc[_index, "result"] = 0
            return


for x in periods:
    filename = "data-" + str(x[0]) + "-" + str(x[1]).zfill(2)  # data-2020-01
    filtered_y = y[(y.index.month == x[1]) & (y.index.year == x[0])]  # Only get the current month records
    month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True)  # Filesize: ~1 GB (data-2020-01.csv)

    for index, row in filtered_y.iterrows():
        process(month_df, index)


Solution 1:[1]

As commented in multiple pandas/threading questions, CSV files being IO bound, you can get some benefit from using a ThreadPoolExecutor.

At the same time, if you are going to perform aggregating operations, consider performing the read_csv also inside of your processor and use ProcessPoolExecutor instead.

If you are going to pass a lot of data between your multiprocesses you will also need a proper memory sharing method.

However I see the use of iterrows and itertuples In general those two instructions make my eyes bleed. Are you sure you cannot process the data in a vectorised mode?

This particular section I am not sure what it is supposed to do, and having M rows will make it very slow.

def process(_month_df, _index):
    idx = _month_df.index[_month_df.index.get_loc(_index, method='nearest')]
    for _, value in _month_df.loc[idx:].itertuples():

        up_delta = 200
        down_delta = 200

        up_value = value + up_delta
        down_value = value - down_delta

        if value > up_value:
            y.loc[_index, "result"] = 1
            return

        if value < down_value:
            y.loc[_index, "result"] = 0
            return

Below a vectorized code to find if it is going up or down, and in what row

df=pd.DataFrame({'vals': np.random.random(int(10))*1000+5000}).astype('int64')
print(df.vals.values)

up_value = 6000
down_value = 3000
valsup = df.vals.values + 200*np.arange(df.shape[0])+200
valsdown = df.vals.values - 200*np.arange(df.shape[0])-200

#! argmax returns 0 if all false
# idx_up = np.argmax(valsup > up_value)
# idx_dwn= np.argmax(valsdown < down_value)

idx_up = np.argwhere(valsup > up_value)
idx_dwn= np.argwhere(valsdown < down_value)
idx_up = idx_up[0][0] if len(idx_up) else -1
idx_dwn = idx_dwn[0][0] if len(idx_dwn) else -1


if idx_up < 0 and idx_dwn<0:
    print(f" Not up nor down")
if idx_up < idx_dwn or idx_dwn<0:
    print(f" Result is positive, in position {idx_up}")
else: 
    print(f" Result is negative, in position {idx_dwn}")

For the sake of completeness, benchmarking itertuples() and the argwhere approach for 1000 elements:

  • .itertuples(): 757µs
  • arange + argwhere: 60µs

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1