'Message passing between blocking Process and Asyncio gRPC server
I have started playing with Python asyncio and coroutines one week ago and I'm trying to experiment with CPU-bound tasks.
Basically, I have an async bidirectional communication between a client and a server. The client sends data to the server and the server is supposed to send a response back when a certain condition is verified.
So far so good if I mock data sent back to the client and use await async.sleep(), both look to work properly and asynchronously. I also tried to run blocking synchronous code using run_in_executor, even there everything works nicely.
Now, I want the async server to communicate with a different script doing CPU-bound tasks (so running in a different process) and relying on a whole synchronous framework. The script needs to periodically get data from the server and respond with a result to the server which will then send it back to the client.
As I momentarily designed it, the sync script starts first, initializes, and when the time comes, it starts a different process responsible to run the async server and passing a sync multiprocessing.Queue
the code looks like the following:
# CPU-bound script
----
# A lot of things happen
----
first_queue = Queue(maxsize=1)
second_queue = Queue(maxsize=1)
server_process = Process(
target=server.run,
args=(first_queue,
second_queue))
server_process.daemon = True
server_process.start()
while true:
# Something happens
received_value = first_queue.get()
print("Received value: ",received_value)
# Something else
second_queue.put(value)
...
...
On the other side the async server is currently implemented as follows:
def compute_statistics(parameter) -> None:
print(f"Received {parameter}")
class SomethingService(
something_pb2_grpc.SomethingServicer):
def __init__(self, first_queue, second_queue):
self._first_queue = first_queue
self._second_queue = second_queue
def make_action(self,
parameter: int) -> something_pb2.Action:
self._second_queue.put(parameter)
action = self._first_queue.get()
print("Received Action: ", action)
return something_pb2.Action(action=action)
async def SomethingControl(self,request_iterator: AsyncIterable[something_pb2.TransmissionStatus], unused_context) -> AsyncIterable[something_pb2.Action]:
async for status in request_iterator:
compute_statistics(
status.parameter
)
if status.parameter == something:
result = await asyncio.get_event_loop().\
run_in_executor(None,
self.make_action,
status.parameter)
yield result
async def serve(first_queue: Queue, second_queue: Queue) -> None:
server = grpc.aio.server()
something_pb2_grpc.add_SomethingServicer_to_server(
SomethingService(first_queue, second_queue),
server
)
server.add_insecure_port('[::]:50051')
print('Listening...')
await server.start()
await server.wait_for_termination()
def run(first_queue: Queue,
second_queue: Queue) -> None:
logging.basicConfig()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
serve(first_queue, second_queue))
Everything looks actually fine if I run it as it is, but If I add a simple time.sleep(1) right after self._second_queue.put(parameter) to simulate some weird behavior in the CPU-bound script then the server behaves like it is actually blocking and prints a "Received {parameter}" every 1 second (in order) while the client keeps sending. I expect the server to keep printing even if the thread running with make_action() is sleeping.
I have been reading that communication between sync and async processes can get kind of obscure, especially dealing with queues. Also, I know that one possibility might be taking it from the opposite side by creating a subprocess from the async server instead, but that did not sound right to me in the case I'm dealing with...What am I missing? Any suggestion is highly appreciated.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
