'How to resolve asyncio.Future from a separate C++ thread with Pybind11

I created a simple Python C++ extension, supposed to do the following:

  1. Run a thread (from C++), producing ticker incremental updates
  2. Python should call await ticker.next_ticker(), which is implemented in C++, and returns an asyncio.Future object for awaiting.
  3. The pointer to this Future is saved inside the C++ Ticker instance.
  4. The C++ thread increments the ticker every 500ms, and resolves the current awaited Future. The idea is to notify Python about new events asynchronously, without callbacks.
  5. The C++ thread doesn't produce new updates, until Python consumed the previous one.

Python part:

import asyncio
from example import Ticker
from time import sleep

async def main():
    tk = Ticker()
    tk.subscribe()
    while True:
        print("[py] Awaiting to the ticker")
        ticker = await tk.next_ticker()
        print(f"[py] Ticker received: {ticker}")

asyncio.run(main())

C++ part:

#include <chrono>
#include <thread>
#include <iostream>

#include <pybind11/pybind11.h>
using namespace std;
namespace py = pybind11;

class Ticker {
    public:
        Ticker() {
            python_awaiting = false;
        }

        py::object next_ticker_async() {
            py::object loop = py::module_::import("asyncio.events").attr("get_event_loop")();
            fut = loop.attr("create_future")();
            python_awaiting = true;
            cout << "[cpp] Python awaiting for the next ticker" << endl;
            return fut;
        }

        void subscribe() {
            sub = std::thread([this] {
                cout << "[cpp] Ticker subscribtion started" << endl;
                int ticker = 1;
                while (true) {
                    std::this_thread::sleep_for(std::chrono::milliseconds(500));
                    if (python_awaiting) {
                        cout << "[cpp] Updating ticker" << endl;
                        py::gil_scoped_acquire acquire;
                        fut.attr("set_result")(ticker++);
                        cout << "[cpp] Ticker updated" << endl;
                        python_awaiting = false;
                    }
                }
            });
        }

    private:
        py::object fut;
        std::thread sub;
        bool python_awaiting;
};

PYBIND11_MODULE(example, m) {
    py::class_<Ticker>(m, "Ticker")
        .def(py::init<>())
        .def("subscribe", &Ticker::subscribe)
        .def("next_ticker", &Ticker::next_ticker_async);
}

When I compile the extension and run the python part, it writes the following logs:

[py] Awaiting to the ticker
[cpp] Python awaiting for the next ticker
[cpp] Ticker subscribtion started
[cpp] Updating ticker
[cpp] Ticker updated

stucks forever.

I.e. the C++ thread successfully resolves the Future, but Python doesn't receive it. I tested using the same approach in the same thread, and it actually worked.

I suspect that the problem is that when I return asyncio.Future as py::object from next_ticker_async to Python, it actually copied, and that's why Python doesn't receive the update.

I tried to return a pointer to the object, or casting it to py::handle, instead of py::object, but then got SIGSEGV (Address boundary error) when calling fut->attr("set_result")(ticker++);.

A also tried wrapping this object into shared_ptr, but it didn't compile with an error like Only custom holder objects supported for shared_ptr.

The question is - how to properly share the Python asyncio.Future between C++ threads, to make this work?

Full extension code on Github, with easy setup instructions.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source