'How to improve performance of checking a condition for each row in a 10M row dataframe?
I am working on Python flask web services and the app will be receiving one file either in csv or xlsx. I read this file and convert it to a Pandas dataframe.
Now I need to iterate through each row of dataframe and check a specific condition. If a condition is met then I need to update a few columns in the same dataframe.
I have done it using below code but I am not happy with the performance.
def ExecuteInParallel(convertContract,ratesDf,inputDf):
for index, row in inputDf.iterrows():
currencyFound = ratesDf.query(
'CCY1 =="{0}" and CCY2 == "{1}"'
.format(
row[convertContract.INPUT_CURRENCY],
row[convertContract.RETURN_CURRENCY]
)
)
if len(currencyFound.index) == 0:
raise BadRequest(
"Given Currency combination not found with provided date.")
currentrate = currencyFound.Rate.values[0]
if convertContract.ROUNDING != None and convertContract.ROUNDING != "":
rounding = int(convertContract.ROUNDING)
if rounding > 0:
convertedamount = round(
float(row[convertContract.INPUT_AMOUNT]) * currentrate,
int(convertContract.ROUNDING)
)
inputDf.at[index,convertContract.RETURN_VALUE] = convertedamount
else:
convertedamount = float(row[convertContract.INPUT_AMOUNT]) * currentrate
inputDf.at[index,convertContract.RETURN_VALUE] = convertedamount
if convertContract.RETURN_RATE == "True":
inputDf.at[index,convertContract.RETURN_VALUE + "_FX Rate"] = currentrate
I have done some performance analysis and concluded it is taking about 470 seconds to iterate through 10k rows.
I want to perform it for 10M rows. So I tried thread programming in Python save above funtion call but with smaller dataframes. I created chunks of 500 rows dataframe and pass to above method its own chunk but there is not even a single second difference noticed.
How can I improve this?
def ConvertdataFramesValues(self,contract,ratesDf,inputDf):
try:
treadList = []
size = 500
list_of_dfs = list(inputDf.loc[i:i + size - 1,:] for i in range(0, len(inputDf),size))
for frame in list_of_dfs:
t1 = threading.Thread(target=ExecuteInParallel,args=(convertContract,ratesDf,frame))
treadList.append(t1)
t1.start()
for t in treadList:
t.join()
inputDf = pd.concat(list_of_dfs)
print(list_of_dfs[0].head())
return inputDf
except Exception as e:
msg = "unable to convert data frame values. " + str(e)
print(msg)
raise BadRequest(msg)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
