'asyncio - How do I add a new async task to my "main" loop from inside sync code executed with run_in_executor?

Apologies for any incorrect terminology or poor description - I'm new to asyncio. Please feel free to edit/correct/improve my question.

I have a scenario where my asyncio application needs to use run_in_executor to run a sync function from a third-party library. This in turn calls a sync function in my code at certain times. I want this function to be able to then add tasks to my main loop.

I tried creating a new shorter-lived mini-loop within the sync function/thread, but it turns out another (tortoise-orm with sqlite) requires the task to be in the main loop (it uses an asyncio lock).

I'm wondering if there is a best-practice way of achieving this, my attempts so far are messy and have mixed results. I'm not sure if I'm thinking correctly that this is a valid use of contextvar / asyncio.run_coroutine_threadsafe.

Any tips would be appreciated.

A simplified example:


def add_item_to_database(item: dict) -> None:
    # Is there a way to obtain the "main" loop here? contextvar?
    # Do I use this with asyncio.run_coroutine_threadsafe?
    # TODO
    raise NotImplementedError(
        "How do I now schedule/submit an async function back "
        "on my main loop?"
    )

import asyncio


async def my_coro() -> None:
    loop = asyncio.get_running_loop()
    # third party sync function will be long-running,
    # and occasionally call my sync "callback"
    # add_item_to_database function from another module.
    await loop.run_in_executor(
        None,
        third_party_sync_function_that_will_call_add_item_to_database
    )


async def main() -> None:
    # other 'proper' asyncio coroutine tasks also exist - omitted here.
    tasks = [asyncio.create_task(my_coro)]
    await asyncio.gather(*tasks)


asyncio.run(main())



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source