'Inter-process communication between async and sync tasks using PyZMQ

On a single process I have a tasks running on a thread that produces values and broadcasts them and several consumer async tasks that run concurrently in an asyncio loop.

I found this issue on PyZMQ's github asking async <-> sync communication with inproc sockets which is what I also wanted and the answer was to use .shadow(ctx.underlying) when creating the async ZMQ Context.

I prepared this example and seems to be working fine:

import signal
import asyncio
import zmq
import threading
import zmq.asyncio
import sys
import time
import json


def producer(ctrl):
    # delay first push to give asyncio loop time
    # to start
    time.sleep(1)

    ctx = ctrl["ctx"]

    s = ctx.socket(zmq.PUB)

    s.bind(ctrl["endpoint"])

    v = 0
    while ctrl["run"]:
        payload = {"value": v, "timestamp": time.time()}

        msg = json.dumps(payload).encode("utf-8")

        s.send(msg)
        v += 1
        time.sleep(5)

    print("Bye")


def main():
    endpoint = "inproc://testendpoint"
    ctx = zmq.Context()
    actx = zmq.asyncio.Context.shadow(ctx.underlying)

    ctrl = {"run": True, "ctx": ctx, "endpoint": endpoint, }

    th = threading.Thread(target=producer, args=(ctrl,))
    th.start()

    try:
        asyncio.run(amain(actx, endpoint))
    except KeyboardInterrupt:
        pass

    print("Stopping thread")
    ctrl["run"] = False
    th.join()


async def amain(ctx, endpoint):
    s = ctx.socket(zmq.SUB)
    s.subscribe("")
    s.connect(endpoint)

    loop = asyncio.get_running_loop()

    def stop():
        try:
            print("Closing zmq async socket")
            s.close()
        except:
            pass

        raise KeyboardInterrupt

    loop.add_signal_handler(signal.SIGINT, stop)

    while True:
        event = await s.poll(1000)
        if event & zmq.POLLIN:
            msg = await s.recv()
            payload = json.loads(msg.decode("utf-8"))

            print("%f: %d" % (payload["timestamp"], payload["value"]))


if __name__ == "__main__":
    sys.exit(main())

Is it safe to use inproc://* between a thread and asyncio task in this way? The 0MQ context is thread safe and I'm not sharing sockets between the thread and the asyncio task, so I would say in general that this is thread safe, right? Or am I missing something that I should consider?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source