'Parallelizing different functions at the same time in python

I want to execute f1 and f2 at the same time. but the following code doesn't work!

from multiprocessing import Pool

def f1(x):
return x*x

def f2(x):
return x^2

if __name__ == '__main__':

    x1=10
    x2=20
    p= Pool(2)
    out=(p.map([f1, f2], [x1, x2]))

y1=out[0]
y2=out[1]


Solution 1:[1]

I believe you'd like to use threading.Thread and shared queue in your code.

from queue import Queue
from threading import Thread
import time

def f1(q, x):
    # Sleep function added to compare execution times.
    time.sleep(5)
    # Instead of returning the result we put it in shared queue.
    q.put(x * 2)

def f2(q, x):
    time.sleep(5)
    q.put(x ^ 2)

if __name__ == '__main__':
    x1 = 10
    x2 = 20
    result_queue = Queue()

    # We create two threads and pass shared queue to both of them.
    t1 = Thread(target=f1, args=(result_queue, x1))
    t2 = Thread(target=f2, args=(result_queue, x2))

    # Starting threads...
    print("Start: %s" % time.ctime())
    t1.start()
    t2.start()

    # Waiting for threads to finish execution...
    t1.join()
    t2.join()
    print("End:   %s" % time.ctime())

    # After threads are done, we can read results from the queue.
    while not result_queue.empty():
        result = result_queue.get()
        print(result)

Code above should print output similar to:

Start: Sat Jul  2 20:50:50 2016
End:   Sat Jul  2 20:50:55 2016
20
22

As you can see, even though both functions wait 5 seconds to yield their results, they do it in parallel so overall execution time is 5 seconds.

If you care about what function put what result in your queue, I can see two solutions that will allow to determine that. You can either create multiple queues or wrap your results in a tuple.

def f1(q, x):
    time.sleep(5)
    # Tuple containing function information.
    q.put((f1, x * 2))

And for further simplification (especially when you have many functions to deal with) you can decorate your functions (to avoid repeated code and to allow function calls without queue):

def wrap_result(func):
    def wrapper(*args):
        # Assuming that shared queue is always the last argument.
        q = args[len(args) - 1]
        # We use it to store the results only if it was provided.
        if isinstance(q, Queue):
            function_result = func(*args[:-1])
            q.put((func, function_result))
        else:
            function_result = func(*args)
        return function_result

    return wrapper

@wrap_result
def f1(x):
    time.sleep(5)
    return x * 2

Note that my decorator was written in a rush and its implementation might need improvements (in case your functions accept kwargs, for instance). If you decide to use it, you'll have to pass your arguments in reverse order: t1 = threading.Thread(target=f1, args=(x1, result_queue)).

A little friendly advice.

"Following code doesn't work" says nothing about the problem. Is it raising an exception? Is it giving unexpected results?

It's important to read error messages. Even more important - to study their meaning. Code that you have provided raises a TypeError with pretty obvious message:

File ".../stack.py", line 16, in <module> out = (p.map([f1, f2], [x1, x2]))

TypeError: 'list' object is not callable

That means first argument of Pool().map() have to be a callable object, a function for instance. Let's see the docs of that method.

Apply func to each element in iterable, collecting the results in a list that is returned.

It clearly doesn't allow a list of functions to be passed as it's argument.

Here you can read more about Pool().map() method.

Solution 2:[2]

I want to execute f1 and f2 at the same time. but the following code doesn't work! ...

out=(p.map([f1, f2], [x1, x2]))

The minimal change to your code is to replace the p.map() call with:

r1 = p.apply_async(f1, [x1])
out2 = f2(x2)
out1 = r1.get()

Though if all you want is to run two function calls concurrently then you don't need the Pool() here, you could just start a Thread/Process manually and use Pipe/Queue to get the result:

#!/usr/bin/env python
from multiprocessing import Process, Pipe

def another_process(f, args, conn):
    conn.send(f(*args))
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe(duplex=False)
    p = Process(target=another_process, args=(f1, [x1], child_conn))
    p.start()
    out2 = f2(x2)
    out1 = parent_conn.recv()
    p.join()

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Community
Solution 2