'How should I pass a function that runs an external file to be able to use multiprocessing?

I recently saw a module that allows me to run my code simultaneously which happens to be what I need. However, as I was testing with function, I ran into some errors and needed help.

So basically I will need to run a code from an external python script named genODE. Basically genODE is my simulation file, so it finds the file spec, process it, and generates an output file.

Currently this is my code(Im running this in spyder(anaconda3) btw, in Windows if it helps),

import multiprocessing
import time
from astools import generateODEScript as genODE


if __name__ == '__main__':
    
    start = time.perf_counter()

    processes = []
    for niteration in range(26, 31):
        kwargs = {
            'modelfile': f'./models/organism/organism_{niteration}.modelspec',
            'mtype': 'ASM',
            'solver': 'RK4',
            'timestep': '1',
            'endtime': '21600',
            'lowerbound': '0;0',
            'upperbound': '1e-3;1e-3',
            'odefile': f'organism_{niteration}.py'
            
        }
        processes.append(multiprocessing.Process(target=genODE, kwargs=kwargs))

    for process in processes:
        process.start()
    for process in processes:
        process.join()
    end = time.perf_counter()
    print('5 simulations completed in seconds: ', (end-start))

Update: Whenever I try to run the script, it said that generateODEscript() takes 1 to 8 positional arguments but 9 were given

import time
import astools
import multiprocessing

if __name__ == '__main__':
    
    processes = []
    for niteration in range(1, 11):
        kwargs = {
            'modelfile': f'./models/organism/organism_{niteration}.modelspec',
            'mtype': 'ASM',
            'solver': 'RK4',
            'timestep': '1',
            'endtime': '21600',
            'lowerbound': '0;0',
            'upperbound': '1e-3;1e-3',
            'odefile': f'organism_{niteration}.py'
            
        }
    
        start = time.perf_counter()
        with multiprocessing.Pool(processes=5) as pool:
            processes.append(pool.starmap(astools.generateODEScript, kwargs))

    
        end = time.perf_counter()
        print(f'Processes completed in {end-start: .2f}s')


Solution 1:[1]

Here's a framework suggestion for introducing some concurrency to this problem:

from concurrent.futures import ProcessPoolExecutor

def genODE():
    pass
def runODE():
    pass
def sim(n):
    print(n)
    genODE()
    runODE()

def main():
    with ProcessPoolExecutor() as executor:
        executor.map(sim, range(1, 31))

if __name__ == '__main__':
    main()

Leaving it up to OP to "fill in the gaps".

Depending on the OS and/or potential memory constraints, consideration should be given to specifying max_workers=N in the ProcessPoolExecutor constructor

Solution 2:[2]

This is a full working example for a solution with producer consumer pattern using queue and multi threaded.

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from queue import Queue
import numpy as np
import pandas as pd


def genODE(itmun):
    dates = pd.date_range('20130101', periods=6)
    pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
    pdf['itmun'] = itmun
    return pdf

def runODE(pdf):
    itnum = pdf['itmun'][0]
    pdf['itmun'] = pdf['itmun'] + 1
    return pdf

def producer(queue, itmun):
    # apply your function to create the data
    data = genODE(itmun)
    # put the data in the queue
    queue.put(data) 

def consumer(queue):
    while not queue.empty(): 
        data = queue.get() 
        # do somework on the data by running your function
        result_data = runODE(data)
        itnum = result_data['itmun'][0]
        result_data.to_csv(f'{itnum}test.csv')
        queue.task_done()
        return result_data 


def main():
    q = Queue() 
    futures = []
    with ThreadPoolExecutor(max_workers=15) as executor:
        for p in range(30):
            producer_future = executor.submit(producer, q, p)
            futures.append(producer_future) 
        for c in range(30):
            consumer_future = executor.submit(consumer, q)
            futures.append(consumer_future)

        for future in as_completed(futures):
            # do something with the result as soon as it is available such as saving or printing or nothing if you already done what you need in runODE()
            print(future.result()) 

if __name__ == "__main__":
    main()

Solution 3:[3]

Since genODE takes keyword arguments, you need to pass to multiprocessing.Process the kwargs argument specifying a dictionary of keyword/value pairs. So to start the two processes that you wanted with niteration taking on values 1 and 2:

if __name__ == '__main__':
    import time
    import multiprocessing
    from astools import generateODEScript as genODE

    start = time.perf_counter()

    processes = []
    for niteration in (1, 2):
        kwargs = {
            'modelfile': f'./models/organism/Organism_{niteration}.modelspec',
            'mtype': 'ASM',
            'solver': 'RK4',
            'timestep': '1',
            'endtime': '21600',
            'lowerbound': '0;0',
            'upperbound': '1e-3;1e-3',
            'odefile': f'organism_{niteration}.py'
            
        }
        processes.append(multiprocessing.Process(target=genODE, kwargs=kwargs))

    for process in processes:
        process.start()
    for process in processes:
        process.join()
    print('process completed')
    end = time.perf_counter()
    print(end-start)

Update

You cannot use method starmap if you want to pass keyword arguments that vary; you need to use method apply_async.

If your worker function returns values you want to keep, then you need to save and process the AsyncResult objects returned by apply_async:

import time
import astools
import multiprocessing

if __name__ == '__main__':
    
    start = time.perf_counter()
    with multiprocessing.Pool(processes=5) as pool:
        async_results = []
        for niteration in range(1, 11):
            kwargs = {
                'modelfile': f'./models/organism/organism_{niteration}.modelspec',
                'mtype': 'ASM',
                'solver': 'RK4',
                'timestep': '1',
                'endtime': '21600',
                'lowerbound': '0;0',
                'upperbound': '1e-3;1e-3',
                'odefile': f'organism_{niteration}.py'
                
            }
            async_results.append(pool.apply_async(astools.generateODEScript, kwargs=kwargs))
        results = [async_result.get() for async_result in async_results]
        pool.close()
        pool.join()
    end = time.perf_counter()
    print(f'Processes completed in {end-start: .2f}s')
    print(results)

If you don't care about the return value from your worker function and just want to wait for all tasks to complete, then the sequence pool.close(); pool.join() is sufficient:

import time
import astools
import multiprocessing

if __name__ == '__main__':
    
    start = time.perf_counter()
    with multiprocessing.Pool(processes=5) as pool:
        for niteration in range(1, 11):
            kwargs = {
                'modelfile': f'./models/organism/organism_{niteration}.modelspec',
                'mtype': 'ASM',
                'solver': 'RK4',
                'timestep': '1',
                'endtime': '21600',
                'lowerbound': '0;0',
                'upperbound': '1e-3;1e-3',
                'odefile': f'organism_{niteration}.py'
                
            }
            pool.apply_async(astools.generateODEScript, kwargs=kwargs)
        # Wait for all submitted tasks to complete:
        pool.close()
        pool.join()
    end = time.perf_counter()
    print(f'Processes completed in {end-start: .2f}s')

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2
Solution 3