'How to await on another co-routine?

Here's what I'd like to do:

req.seq = 123
result = await client.sendReq(req)
doResult(result)

Simple enough? However, in this case the client delegates to a websocket that handles multiple asynchronous requests on one socket - the messages it receives are intended for one of many co-routines that are waiting on request responses.

What this means in practice for the implementation of client.sentReq? Something like:

  • client.sentReq must do: socket.awaitSend(req)
  • a handler is registered somewhere: seq2handler[req.seq] = handler
  • in another coroutine client.recvReq eventually receives a response, looks up seq2handler[response.seq](response) and causes client.sendReq to return the received response.

In other words, client.sentReq must be a future that is awaiting another co-routine. How can this be done? For every answer containing a new thread, a fairy dies.

In practice, what I have now is:

async def doResult(result): ....

req.seq = 123
result = await client.sendReq(req, doResult)

which is obviously less nice, particularly for the handling of exceptions.



Solution 1:[1]

You can use Condition, more less:

class Handler:
   def __init__(self):
      self.condition = asyncio.Condition()
      self.response = None

   async def dispatch(self, msg):
       self.response = msg
       async with self.condition:
          msg.notify_all()
    
   async def awaitResponse(self):
       async with self.condition:
           self.condition.wait()
       return self.response
    
async def listener():
   while True:
        msg = await readSocket()
        handler = handler[msg.seq]
        del handler[msg.seq]
        loop.create_task(handler.dispatch(msg))

async def sendReqAwaitResponse(req):
    req.seq = nextSeq()
    handler = Handler()
    seq2Handler[req.seq] = handler
    await _send(req)
    return handler.awaitResponse()
    

async def thing()
   req.seq = 123
   result = await sendReqAwaitResponse(req)
   doResult(result)

async def main():
   # Connect to socket run listener
   # thing() called some time later.


seq2Handler = {}
loop = asyncio.get_event_loop()

if __name__=='__main__':
   loop.run_until_complete(main())

There's are some bugs with Condition in Python <3.10. See here for a workaround. How can I use asyncio.Condition within a Task in Python < v3.10

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 user48956