'How to use reader, writer from asyncio.open_connection in parallel?

How to read and write in parallel from a asyncio reader, writer pair provided by asyncio.open_connection?

I tried asyncio.open_connection on 2 different loops like:

async def read():
    reader, writer = await asyncio.open_connection('127.0.0.1', 5454)
    while True:
        readval = await reader.readline()
        print(f"read {readval}")

async def write():
    reader, writer = await asyncio.open_connection('127.0.0.1', 5454)
    while True:
        self.wsem.acquire()
        msg = self.wq.popleft()
        print("Writing " + msg)
        writer.write(msg.encode())
        await writer.drain()


threading.Thread(target=lambda: asyncio.run(write())).start()
threading.Thread(target=lambda: asyncio.run(read())).start()

but it seems like sometimes the write thread drains the read content from the read thread and it doesn't work well.

Then I tried sharing reader, writer between the 2 loops but it throws an exception

Exception in thread Thread-8:
Traceback (most recent call last):
  File "C:\Users\Lenovo\AppData\Local\Programs\Python\Python39\lib\threading.py", line 954, in _bootstrap_inner
    self.run()
  File "C:\Users\Lenovo\AppData\Local\Programs\Python\Python39\lib\threading.py", line 892, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Lenovo\PycharmProjects\testape-adb-adapter\adapter\device_socket.py", line 89, in <lambda>
    threading.Thread(target=lambda: asyncio.run(read())).start()
  File "C:\Users\Lenovo\AppData\Local\Programs\Python\Python39\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Users\Lenovo\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 642, in run_until_complete
    return future.result()
  File "C:\Users\Lenovo\PycharmProjects\testape-adb-adapter\adapter\device_socket.py", line 72, in read
    readval = await self.reader.readline()
  File "C:\Users\Lenovo\AppData\Local\Programs\Python\Python39\lib\asyncio\streams.py", line 540, in readline
    line = await self.readuntil(sep)
  File "C:\Users\Lenovo\AppData\Local\Programs\Python\Python39\lib\asyncio\streams.py", line 632, in readuntil
    await self._wait_for_data('readuntil')
  File "C:\Users\Lenovo\AppData\Local\Programs\Python\Python39\lib\asyncio\streams.py", line 517, in _wait_for_data
    await self._waiter
RuntimeError: Task <Task pending name='Task-2' coro=<DeviceSocket.connect.<locals>.read() running at C:\Users\Lenovo\PycharmProjects\testape-adb-adapter\adapter\device_socket.py:72> cb=[_run_until_complete_cb() at C:\Users\Lenovo\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py:184]> got Future <Future pending> attached to a different loop
async def read():
    connection_opened.wait()
    while True:
        print(f"reading")
        if self.reader.at_eof():
            continue
        readval = await self.reader.readline()
        print(f"read {readval}")
        self.rq.append(readval.decode())
        self.rsem.release(1)

async def write():
    self.reader, self.writer = await asyncio.open_connection('127.0.0.1', 5454)
    connection_opened.set()
    while True:
        self.wsem.acquire()
        msg = self.wq.popleft()
        print("Writing " + msg)
        self.writer.write(msg.encode())
        await self.writer.drain()

connection_opened = threading.Event()
threading.Thread(target=lambda: asyncio.run(write())).start()
threading.Thread(target=lambda: asyncio.run(read())).start()

I thought that should be a simple and rather common use case. What is the proper way to do that?



Solution 1:[1]

I suggest you change the thread functions into something like this:

t = self.loop.create_task(self.write)

and ends with:

loop.run_until_complete(t)

Because I'm missing the self.wq and self.wsem function and not sure what they means, I couldn't reproduce the Error message. Hope this sorts out the question for you.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 General Grievance