'Message passing between blocking Process and Asyncio gRPC server

I have started playing with Python asyncio and coroutines one week ago and I'm trying to experiment with CPU-bound tasks.

Basically, I have an async bidirectional communication between a client and a server. The client sends data to the server and the server is supposed to send a response back when a certain condition is verified. So far so good if I mock data sent back to the client and use await async.sleep(), both look to work properly and asynchronously. I also tried to run blocking synchronous code using run_in_executor, even there everything works nicely.

Now, I want the async server to communicate with a different script doing CPU-bound tasks (so running in a different process) and relying on a whole synchronous framework. The script needs to periodically get data from the server and respond with a result to the server which will then send it back to the client.

As I momentarily designed it, the sync script starts first, initializes, and when the time comes, it starts a different process responsible to run the async server and passing a sync multiprocessing.Queue

the code looks like the following:

# CPU-bound script

----
# A lot of things happen
----

first_queue = Queue(maxsize=1)
second_queue = Queue(maxsize=1)
server_process = Process(
            target=server.run,
            args=(first_queue,
                  second_queue))
        server_process.daemon = True
        server_process.start()

while true:
    # Something happens
    received_value = first_queue.get()
    print("Received value: ",received_value)
    # Something else
    second_queue.put(value)
...
...

On the other side the async server is currently implemented as follows:

def compute_statistics(parameter) -> None:
    print(f"Received {parameter}")


class SomethingService(
    something_pb2_grpc.SomethingServicer):

    def __init__(self, first_queue, second_queue):
        self._first_queue = first_queue
        self._second_queue = second_queue
    
   def make_action(self,
                   parameter: int) -> something_pb2.Action:
        self._second_queue.put(parameter)
        action = self._first_queue.get()
        print("Received Action: ", action)
        return something_pb2.Action(action=action)


    async def SomethingControl(self,request_iterator: AsyncIterable[something_pb2.TransmissionStatus], unused_context) -> AsyncIterable[something_pb2.Action]:
        async for status in request_iterator:
            compute_statistics(
                status.parameter
            )

            if status.parameter == something:
                result = await asyncio.get_event_loop().\
                    run_in_executor(None,
                                    self.make_action,
                                    status.parameter)
                yield result


async def serve(first_queue: Queue, second_queue: Queue) -> None:
    server = grpc.aio.server()
    something_pb2_grpc.add_SomethingServicer_to_server(
        SomethingService(first_queue, second_queue),
        server
    )
    server.add_insecure_port('[::]:50051')
    print('Listening...')
    await server.start()
    await server.wait_for_termination()


def run(first_queue: Queue,
        second_queue: Queue) -> None:
    logging.basicConfig()
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(
        serve(first_queue, second_queue))

Everything looks actually fine if I run it as it is, but If I add a simple time.sleep(1) right after self._second_queue.put(parameter) to simulate some weird behavior in the CPU-bound script then the server behaves like it is actually blocking and prints a "Received {parameter}" every 1 second (in order) while the client keeps sending. I expect the server to keep printing even if the thread running with make_action() is sleeping.

I have been reading that communication between sync and async processes can get kind of obscure, especially dealing with queues. Also, I know that one possibility might be taking it from the opposite side by creating a subprocess from the async server instead, but that did not sound right to me in the case I'm dealing with...What am I missing? Any suggestion is highly appreciated.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source