'How should I pass a function that runs an external file to be able to use multiprocessing?
I recently saw a module that allows me to run my code simultaneously which happens to be what I need. However, as I was testing with function, I ran into some errors and needed help.
So basically I will need to run a code from an external python script named genODE
. Basically genODE
is my simulation file, so it finds the file spec, process it, and generates an output file.
Currently this is my code(Im running this in spyder(anaconda3) btw, in Windows if it helps),
import multiprocessing
import time
from astools import generateODEScript as genODE
if __name__ == '__main__':
start = time.perf_counter()
processes = []
for niteration in range(26, 31):
kwargs = {
'modelfile': f'./models/organism/organism_{niteration}.modelspec',
'mtype': 'ASM',
'solver': 'RK4',
'timestep': '1',
'endtime': '21600',
'lowerbound': '0;0',
'upperbound': '1e-3;1e-3',
'odefile': f'organism_{niteration}.py'
}
processes.append(multiprocessing.Process(target=genODE, kwargs=kwargs))
for process in processes:
process.start()
for process in processes:
process.join()
end = time.perf_counter()
print('5 simulations completed in seconds: ', (end-start))
Update: Whenever I try to run the script, it said that generateODEscript() takes 1 to 8 positional arguments but 9 were given
import time
import astools
import multiprocessing
if __name__ == '__main__':
processes = []
for niteration in range(1, 11):
kwargs = {
'modelfile': f'./models/organism/organism_{niteration}.modelspec',
'mtype': 'ASM',
'solver': 'RK4',
'timestep': '1',
'endtime': '21600',
'lowerbound': '0;0',
'upperbound': '1e-3;1e-3',
'odefile': f'organism_{niteration}.py'
}
start = time.perf_counter()
with multiprocessing.Pool(processes=5) as pool:
processes.append(pool.starmap(astools.generateODEScript, kwargs))
end = time.perf_counter()
print(f'Processes completed in {end-start: .2f}s')
Solution 1:[1]
Here's a framework suggestion for introducing some concurrency to this problem:
from concurrent.futures import ProcessPoolExecutor
def genODE():
pass
def runODE():
pass
def sim(n):
print(n)
genODE()
runODE()
def main():
with ProcessPoolExecutor() as executor:
executor.map(sim, range(1, 31))
if __name__ == '__main__':
main()
Leaving it up to OP to "fill in the gaps".
Depending on the OS and/or potential memory constraints, consideration should be given to specifying max_workers=N in the ProcessPoolExecutor constructor
Solution 2:[2]
This is a full working example for a solution with producer consumer pattern using queue and multi threaded.
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from queue import Queue
import numpy as np
import pandas as pd
def genODE(itmun):
dates = pd.date_range('20130101', periods=6)
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
pdf['itmun'] = itmun
return pdf
def runODE(pdf):
itnum = pdf['itmun'][0]
pdf['itmun'] = pdf['itmun'] + 1
return pdf
def producer(queue, itmun):
# apply your function to create the data
data = genODE(itmun)
# put the data in the queue
queue.put(data)
def consumer(queue):
while not queue.empty():
data = queue.get()
# do somework on the data by running your function
result_data = runODE(data)
itnum = result_data['itmun'][0]
result_data.to_csv(f'{itnum}test.csv')
queue.task_done()
return result_data
def main():
q = Queue()
futures = []
with ThreadPoolExecutor(max_workers=15) as executor:
for p in range(30):
producer_future = executor.submit(producer, q, p)
futures.append(producer_future)
for c in range(30):
consumer_future = executor.submit(consumer, q)
futures.append(consumer_future)
for future in as_completed(futures):
# do something with the result as soon as it is available such as saving or printing or nothing if you already done what you need in runODE()
print(future.result())
if __name__ == "__main__":
main()
Solution 3:[3]
Since genODE
takes keyword arguments, you need to pass to multiprocessing.Process
the kwargs argument specifying a dictionary of keyword/value pairs. So to start the two processes that you wanted with niteration
taking on values 1 and 2:
if __name__ == '__main__':
import time
import multiprocessing
from astools import generateODEScript as genODE
start = time.perf_counter()
processes = []
for niteration in (1, 2):
kwargs = {
'modelfile': f'./models/organism/Organism_{niteration}.modelspec',
'mtype': 'ASM',
'solver': 'RK4',
'timestep': '1',
'endtime': '21600',
'lowerbound': '0;0',
'upperbound': '1e-3;1e-3',
'odefile': f'organism_{niteration}.py'
}
processes.append(multiprocessing.Process(target=genODE, kwargs=kwargs))
for process in processes:
process.start()
for process in processes:
process.join()
print('process completed')
end = time.perf_counter()
print(end-start)
Update
You cannot use method starmap
if you want to pass keyword arguments that vary; you need to use method apply_async
.
If your worker function returns values you want to keep, then you need to save and process the AsyncResult
objects returned by apply_async
:
import time
import astools
import multiprocessing
if __name__ == '__main__':
start = time.perf_counter()
with multiprocessing.Pool(processes=5) as pool:
async_results = []
for niteration in range(1, 11):
kwargs = {
'modelfile': f'./models/organism/organism_{niteration}.modelspec',
'mtype': 'ASM',
'solver': 'RK4',
'timestep': '1',
'endtime': '21600',
'lowerbound': '0;0',
'upperbound': '1e-3;1e-3',
'odefile': f'organism_{niteration}.py'
}
async_results.append(pool.apply_async(astools.generateODEScript, kwargs=kwargs))
results = [async_result.get() for async_result in async_results]
pool.close()
pool.join()
end = time.perf_counter()
print(f'Processes completed in {end-start: .2f}s')
print(results)
If you don't care about the return value from your worker function and just want to wait for all tasks to complete, then the sequence pool.close(); pool.join()
is sufficient:
import time
import astools
import multiprocessing
if __name__ == '__main__':
start = time.perf_counter()
with multiprocessing.Pool(processes=5) as pool:
for niteration in range(1, 11):
kwargs = {
'modelfile': f'./models/organism/organism_{niteration}.modelspec',
'mtype': 'ASM',
'solver': 'RK4',
'timestep': '1',
'endtime': '21600',
'lowerbound': '0;0',
'upperbound': '1e-3;1e-3',
'odefile': f'organism_{niteration}.py'
}
pool.apply_async(astools.generateODEScript, kwargs=kwargs)
# Wait for all submitted tasks to complete:
pool.close()
pool.join()
end = time.perf_counter()
print(f'Processes completed in {end-start: .2f}s')
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 | |
Solution 3 |