'How to improve performance of checking a condition for each row in a 10M row dataframe?

I am working on Python flask web services and the app will be receiving one file either in csv or xlsx. I read this file and convert it to a Pandas dataframe.

Now I need to iterate through each row of dataframe and check a specific condition. If a condition is met then I need to update a few columns in the same dataframe.

I have done it using below code but I am not happy with the performance.

def ExecuteInParallel(convertContract,ratesDf,inputDf):
    for index, row in inputDf.iterrows():
        currencyFound = ratesDf.query(
            'CCY1 =="{0}" and CCY2 == "{1}"'
            .format(
                row[convertContract.INPUT_CURRENCY],
                row[convertContract.RETURN_CURRENCY]
            )
        )
        if len(currencyFound.index) == 0:
            raise BadRequest(
                "Given Currency combination not found with provided date.")

        currentrate = currencyFound.Rate.values[0]
        if convertContract.ROUNDING != None and convertContract.ROUNDING != "":
            rounding = int(convertContract.ROUNDING)
            if rounding > 0:
                convertedamount = round(
                    float(row[convertContract.INPUT_AMOUNT]) * currentrate,
                    int(convertContract.ROUNDING)
                )
                inputDf.at[index,convertContract.RETURN_VALUE] = convertedamount
        else:
            convertedamount = float(row[convertContract.INPUT_AMOUNT]) * currentrate
            inputDf.at[index,convertContract.RETURN_VALUE] = convertedamount
        if convertContract.RETURN_RATE == "True":
            inputDf.at[index,convertContract.RETURN_VALUE + "_FX Rate"] = currentrate

I have done some performance analysis and concluded it is taking about 470 seconds to iterate through 10k rows.

I want to perform it for 10M rows. So I tried thread programming in Python save above funtion call but with smaller dataframes. I created chunks of 500 rows dataframe and pass to above method its own chunk but there is not even a single second difference noticed.

How can I improve this?

def ConvertdataFramesValues(self,contract,ratesDf,inputDf):
    try:
        treadList = []
        size = 500
        list_of_dfs = list(inputDf.loc[i:i + size - 1,:] for i in range(0, len(inputDf),size))
        for frame in list_of_dfs:
           
            t1 = threading.Thread(target=ExecuteInParallel,args=(convertContract,ratesDf,frame))
            treadList.append(t1)
            t1.start()

        for t in treadList:
            t.join()

        inputDf = pd.concat(list_of_dfs)
        print(list_of_dfs[0].head())
        return inputDf

    except Exception as e:
        msg = "unable to convert data frame values. " + str(e)
        print(msg)
        raise BadRequest(msg)


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source