'Multiprocessing Pool implementation

I'm doing some intense computation and I'd like to speed up the process using all the computational power available (8 cores on my PC).

I'm doing some calculations on an electrical field and propagation using some functions. I'm calculating an entire XY slice (assuming propagation goes along Z) and retrieving the X and Y band (but I need to calculate the entire slice because of the fft2 implementation). And adding those bands to an array to create complete XZ and YZ slices.

So I have the function propagate that I want to execute in parallel and I thought about using a callback function that takes care of adding my band to the XZ and YZ arrays.

Since I don't know how bad it could be to pass an instance reference to the function that executes in parallel I preferred giving it all the parameters as simple as possible so I used pool.apply_async and iterated for all the values of z that I needed to propagate it to.

So the my class look like this :

class Calculator:
    def __init__(self, s:Simulation):
        # self.q = Queue()
        # self.manager = Manager()
        self.sim = s
    
    def start(self, U:Field, z_min, z_max, nbr_pnt):
        l = np.linspace(z_min, z_max, nbr_pnt)
        self.tot = nbr_pnt
        self.state = 0
        self.XZ_slice = np.zeros((self.sim.Res(), nbr_pnt))
        self.YZ_slice = np.zeros((self.sim.Res(), nbr_pnt))
        field = U.field()
        xx, yy = self.sim.xx_yy()
        k = self.sim.k()
        wavelength = self.sim.Wavelength()
        rho_sqr = xx*xx+yy*yy
        with Pool() as pool:
            for i, z in enumerate(l):
                pool.apply_async(self.propa_A, (i, z, field, rho_sqr, k, wavelength), callback=self.add_slice)


    def propa_A(self, i, z, F, rho_sqr, k, wavelength):
        prop_term = np.exp(1j*k/(2*z)*(rho_sqr))
        return (i,np.abs(1/(wavelength**2*z**2)*fft.fft2(F*prop_term))**2)

    def add_slice(self, result):
        i, F = result
        shape = F.shape
        self.XZ_slice[:, i] = F[:, shape//2]
        self.YZ_slice[:, i] = F[shape//2, :]
        self.state += 1
     
    def error(self, e):
        print(e)   

    def get_XZslice(self):
        return self.XZ_slice

    def get_YZslice(self):
        return self.YZ_slice

And in my main function, I made some kind of progress bar in the terminal to check on the progression

plt.figure()

print("[                    ]0%", end='\r')

c = Calculator(w)
z_min = 0.2
z_max = 1
nbr_pnt = 10000
c.start(U_0, z_min, z_max, nbr_pnt)
counter = 0
while(counter < nbr_pnt):
    counter = c.state
    progression = counter/nbr_pnt

    s = "[" + "="*int(progression*20) +">"+" "*int((1-progression)*20) + "]" + str(int(progression*100)) + "%"
    print(s, end="\r")
    sleep(1)
print("")

plt.subplot(211)
plt.imshow(c.get_XZslice())
plt.subplot(212)
plt.imshow(c.get_YZslice())

plt.show()

Assume U_0 represents an N*N (Res * Res) complex array

When I execute this script there is nothing that happens. No progression, no load on the CPU. Maybe apply_async is not the right choice. Do you guys have any idea on how to implement this better? It's the first time that I get into multiprocessing so excuse my mistakes.

Thanks in advance.

EDIT

As suggested I added a callback_error but it's never called



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source