'Inter-process communication between async and sync tasks using PyZMQ
On a single process I have a tasks running on a thread that produces values and broadcasts them and several consumer async tasks that run concurrently in an asyncio loop.
I found this issue on PyZMQ's github asking async <-> sync communication
with inproc sockets which is what I also wanted and the answer was to use .shadow(ctx.underlying) when
creating the async ZMQ Context.
I prepared this example and seems to be working fine:
import signal
import asyncio
import zmq
import threading
import zmq.asyncio
import sys
import time
import json
def producer(ctrl):
# delay first push to give asyncio loop time
# to start
time.sleep(1)
ctx = ctrl["ctx"]
s = ctx.socket(zmq.PUB)
s.bind(ctrl["endpoint"])
v = 0
while ctrl["run"]:
payload = {"value": v, "timestamp": time.time()}
msg = json.dumps(payload).encode("utf-8")
s.send(msg)
v += 1
time.sleep(5)
print("Bye")
def main():
endpoint = "inproc://testendpoint"
ctx = zmq.Context()
actx = zmq.asyncio.Context.shadow(ctx.underlying)
ctrl = {"run": True, "ctx": ctx, "endpoint": endpoint, }
th = threading.Thread(target=producer, args=(ctrl,))
th.start()
try:
asyncio.run(amain(actx, endpoint))
except KeyboardInterrupt:
pass
print("Stopping thread")
ctrl["run"] = False
th.join()
async def amain(ctx, endpoint):
s = ctx.socket(zmq.SUB)
s.subscribe("")
s.connect(endpoint)
loop = asyncio.get_running_loop()
def stop():
try:
print("Closing zmq async socket")
s.close()
except:
pass
raise KeyboardInterrupt
loop.add_signal_handler(signal.SIGINT, stop)
while True:
event = await s.poll(1000)
if event & zmq.POLLIN:
msg = await s.recv()
payload = json.loads(msg.decode("utf-8"))
print("%f: %d" % (payload["timestamp"], payload["value"]))
if __name__ == "__main__":
sys.exit(main())
Is it safe to use inproc://* between a thread and asyncio task in this way? The 0MQ
context is thread safe and I'm not sharing sockets between the thread and the
asyncio task, so I would say in general that this is thread safe, right? Or am I
missing something that I should consider?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
