'python multiprocessing getting empty result
This is the regular function without using multiprocessing
def runMonte(loanpool, structured_securities, tolerance, NSIM):
structured_securities.addTranche(0.8, 0.05, 0)
structured_securities.addTranche(0.2, 0.08, 1)
tranche_percent = [0.8, 0.2]
coeff = [1.2, 0.8] # Tranche A has coeff of 1.2, Tranche B has coeff of 0.8
rates = [0.05, 0.08] # Tranche A has rate of 5%, Tranche B has rate of 8%
yields = []
while True:
new_rates = []
for index, tranche in enumerate(structured_securities.trancheList):
tranche.rate = rates[index] # give each tranche a new rate based on the original or modified rate
average_DIRR_AL = simulateWaterfall(loanpool, structured_securities, NSIM)
yields.append(Tranche.calculateYield(average_DIRR_AL[0][0], average_DIRR_AL[0][1]))
yields.append(Tranche.calculateYield(average_DIRR_AL[1][0], average_DIRR_AL[1][1]))
for index, tranches in enumerate(structured_securities.trancheList):
new_rates.append(Tranche.newTrancheRate(tranches.rate, coeff[index], yields[index]))
diffs = Tranche.diff(tranche_percent, rates, new_rates)
if diffs < tolerance:
break
rates = new_rates # modify the tranche rate to reflect the yields, return to the while loop
for index, tranche in enumerate(structured_securities.trancheList):
average_DIRR_AL[index].append(Tranche.DIRR_Rating(average_DIRR_AL[index][0]))
average_DIRR_AL[index].append(tranche.rate)
return average_DIRR_AL
And I am getting the result that I wanted, as following:
[[0.0025041053262339907, 28.31594686263471, 'Baa1', 0.06685305303811526], [0.005659780599164244, 11.074511263338902, 'Baa3', 0.06579310384106068]]
When I use the multiprocessing as followed, I just got empty result I use the below function to call the multiprocessing functions (just modify parts of my runMonte)
def runMonteParallel(loanpool, structured_securities, tolerance, NSIM, num_processes):
structured_securities.addTranche(0.8, 0.05, 0)
structured_securities.addTranche(0.2, 0.08, 1)
tranche_percent = [0.8, 0.2]
coeff = [1.2, 0.8] # Tranche A has coeff of 1.2, Tranche B has coeff of 0.8
rates = [0.05, 0.08] # Tranche A has rate of 5%, Tranche B has rate of 8%
yields = []
while True:
new_rates = []
for index, tranche in enumerate(structured_securities.trancheList):
tranche.rate = rates[index] # give each tranche a new rate based on the original or modified rate
average_DIRR_AL = runSimulationParallel(loanpool, structured_securities, NSIM, num_processes)
yields.append(Tranche.calculateYield(average_DIRR_AL[0][0], average_DIRR_AL[0][1]))
yields.append(Tranche.calculateYield(average_DIRR_AL[1][0], average_DIRR_AL[1][1]))
for index, tranches in enumerate(structured_securities.trancheList):
new_rates.append(Tranche.newTrancheRate(tranches.rate, coeff[index], yields[index]))
diffs = Tranche.diff(tranche_percent, rates, new_rates)
if diffs < tolerance:
break
rates = new_rates # modify the tranche rate to reflect the yields, return to the while loop
for index, tranche in enumerate(structured_securities.trancheList):
average_DIRR_AL[index].append(Tranche.DIRR_Rating(average_DIRR_AL[index][0]))
average_DIRR_AL[index].append(tranche.rate)
return average_DIRR_AL
Below is the multiprocessing function:
# doWork function can be any function with any argument
def doWork(input, output):
while True:
try:
f, args = input.get(timeout=1)
res = f(*args)
output.put(res)
except:
output.put('Done')
break
def runSimulationParallel(loan_pool, structured_securities, NSIM, num_processes):
input_queue = multiprocessing.Queue()
output_queue = multiprocessing.Queue()
# add 20 runMC function items into input_queue
for i in range(num_processes):
input_queue.put((simulateWaterfall, (loan_pool, structured_securities, NSIM / num_processes)))
# create 5 child processes
processes = [] # initialize an empty list of process
for i in range(num_processes):
p = multiprocessing.Process(target=doWork, args=(input_queue, output_queue))
p.start()
processes.append(p) # append all the processes
res = [] # result
# return the result list
while True:
r = output_queue.get()
if r != 'Done':
# r = np.array(r)
res.append(r)
else:
break
for p in processes:
p.terminate() # stop the process after done
for p in processes:
p.join() # calling main processes to wait until all the processes are finished
print(res) # print the result to test
sum_DIRR_AL_results = np.zeros((len(structured_securities.trancheList), 2))
for simulations in res:
for i, tranche in enumerate(simulations):
sum_DIRR_AL_results[i] += tranche[i]
DIRR_AL = sum_DIRR_AL_results / num_processes
DIRR_AL = DIRR_AL.tolist()
return DIRR_AL
Here are the results returned:
[]
[]
[]
[]
# I stepped into it to print the res, but only empty [] are returned
Seconds taken: 1.8176448345184326
[[0.0, 0.0, 'Aaa', 0.06493333333333333], [0.0, 0.0, 'Aaa', 0.0649362962962963]]
Below is the simulateWaterfall function that got called in both regular and multiprocessing functions
def simulateWaterfall(loanpool, structured_securities, NSIM):
if not isinstance(loanpool, LoanPool) or not isinstance(structured_securities, StructuredSecurities):
logging.error('Please enter the correct class type')
rating_metrics = [] # intialize an empty list to get the doWaterfall results
# create an array filled with zeros, could be used to filled in later
sum_DIRR_AL = np.zeros((len(structured_securities.trancheList), 2))
for i in range(NSIM):
# remember to reset each time of the simulation
loanpool.reset()
structured_securities.reset()
# obtain the results of rating metrics from the doWaterfall
rating_metrics.append(doWaterfall(loanpool, structured_securities)[3])
for simulations in rating_metrics:
for i, tranche_metric in enumerate(simulations):
if tranche_metric[2] != math.inf: # if AL not infinite
sum_DIRR_AL[i] += [tranche_metric[1], tranche_metric[2]]
else:
# if AL is infinite, get rid of the AL, only add up the DIRR to get the average
sum_DIRR_AL[i] += [tranche_metric[1], 0]
# return the average DIRR and WAL values for each tranche
# use ndarray here because it allows to divide directly instead of doing a list comprehension
DIRR_AL = sum_DIRR_AL / NSIM
DIRR_AL = DIRR_AL.tolist() # convert the array to list
return DIRR_AL
Updated: I found out the issue with the empty result. input_queue could not accept a float, but NSIM / num_processes is a float, so I did int(NSIM / num_processes) and the result list is no longer empty. However, I think my logic of getting the result from multi-processing is wrong (for loop is wrong, did not get me the result that I wanted), could anyone take a look at that? The result from multi-processing is now:
[[0.00243029267885915, 0.00243029267885915, 'Baa1', 0.06493480609938325], [11.079752982920553, 11.079752982920553, 'Ca', 0.07170816092109777]]
What I would expect is somehting like this:
[[0.0025041053262339907, 28.31594686263471, 'Baa1', 0.06685305303811526], [0.005659780599164244, 11.074511263338902, 'Baa3', 0.06579310384106068]]
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
