'Is there a way to have a process async await the release of a multiprocess lock without blocking the event loop?

The point of the program is to launch a separate process that operates a web socket. The main process is meant to collect processed messages through a shared Queue. My idea was to make a shared Condition between the processes so that the web socket process will notify through it when there is data to collect. The problem is that using .wait() blocks the event loop in the main process, which has to be avoided in this program. Here's the code I have tried so far, using what I have learned from other similar threads.


def blocks(mutex):
    mutex.wait()


class EodStream(object):
    def __init__(self, mode, callback):
        self.mode = mode
        self.callback = callback
        self.q = aioprocessing.AioQueue()
        self.mutex = aioprocessing.AioCondition()
        self.thread = None
        self.symbols = []
        self.keep_running = False

    def add_symbols(self, symbols):
        for symbol in symbols:
            if symbol not in self.symbols:
                self.symbols.append(symbol)

    async def start_stream(self):
        self.thread = aioprocessing.AioProcess(target=create_socket, args=(self.mode, self.symbols, self.q, self.mutex, ))
        self.thread.start()

        async def message_read():
            while self.keep_running:
                self.mutex.acquire()

                with ProcessPoolExecutor() as executor:
                    loop = asyncio.get_event_loop()
                    futures = [loop.run_in_executor(executor, blocks, self.mutex)]
                    await asyncio.gather(*futures)

                while not self.q.empty():
                    data = json.loads(self.q.get())
                    asyncio.create_task(self.callback(data))

                self.mutex.release()

        self.keep_running = True
        asyncio.create_task(message_read())

Unfortunately this fails with Condition objects should only be shared between processes through inheritance

A rather obvious solution is to use asyncio.sleep() instead of condition.wait(), but it seems rather wasteful.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source