'How to process streaming results from a Ray asyncio actor
Please consider this toy example:
import ray
import asyncio
@ray.remote
class Streamer:
async def stream(self):
# vars for our fake streaming library
STREAM_MSG = 'stream_msg_'
cnt = -1
while True:
# Fake representation of a streaming library that employs asyncio
# (IRL: streams JSON messages from a 3rd party API)
await asyncio.sleep(1)
cnt += 1
stream_msg = f'{STREAM_MSG}{cnt}'
print(stream_msg)
# TODO: How can stream_msg be passed back to the main process
# (or another process) asap without ending this loop?
if __name__ == '__main__':
streamer = Streamer.remote()
ray.get(streamer.stream.remote())
def process_stream_msgs():
# TODO How can this function (or another Ray actor) immediately
# pick up and process each stream_msg as soon as they become available?
pass
process_stream_msgs()
The streaming functionality (in the 3rd party library) needs to run in its own process and get its results out of its handler, asap, so the loop can continue - a backlog of unprocessed messages will cause the 3rd party API to close the connection without warning.
I need a function or actor (outside of the streaming process) to pick up each new message, asap, and do some transformational processing on it (FYI, this will be done in parallel.)
FYI (but not implemented in the example above), the results of the transformational processing are finally written to stdout (in the main process) so that they can be picked up by a calling GUI program (PyQt QProcess.)
I have attempted using the Ray Queue implementation, but that presented a new set of problems. The example below is a bit hacky but demonstrates the intention:
import ray
import asyncio
from ray.util.queue import Queue
@ray.remote
class Streamer:
async def stream(self, queue):
# vars for the fake streaming library
STREAM_MSG = 'stream_msg_'
cnt = -1
while True:
# Fake representation of a streaming library that employs asyncio
# (IRL: streams JSON messages from a 3rd party API)
await asyncio.sleep(1)
cnt += 1
stream_msg = f'{STREAM_MSG}{cnt}'
print(stream_msg)
# TODO: Presents new problems
queue.put(stream_msg)
if __name__ == '__main__':
queue = Queue()
streamer = Streamer.remote()
ray.get(streamer.stream.remote(queue))
def process_stream_msgs():
# TODO If queue is viable, it will be processed here (or another actor)
pass
process_stream_msgs()
When I run this, I get:
WARNING worker.py:1710 -- Using blocking ray.get inside async actor. This blocks the event loop. Please use `await` on object ref with asyncio.gather if you want to yield execution to the event loop instead.
I would be very grateful if anyone can tell me the best way of going about what I am trying to achieve.
Firstly, is there a preferable way of doing this without the need for a queue? (I have a feeling there will be performance implications with a queue.)
Secondly, if a queue is a viable option, what is the proper way to implement it?
Thanks
I
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
