'Asyncio Tasks running sequentially

I've recently started working on python and its related concurrency aspects and I'm banging my head around asyncio.

Structure of Data: List of companies with users-list in them.

Goal: I want to execute gRPC calls in parallel, with a task to always run for a particular company. Also, the API call is on users-list, and is a batch call [not a single call for one company]

Ref I followed: https://docs.python.org/3/library/asyncio-queue.html [Modified a bit according to my use-case]

What I've done: Below 3 small functions with process_cname_vs_users having input of company_id vs users-list

async def update_data(req_id, user_ids, company_id): # <-- THIS IS THE ASYNC CALL ON CHUNK OF SOME USERS
    # A gRPC call to server here.


async def worker(worker_id, queue, company_vs_user_ids):
    while True:
        company_id = await queue.get()
        user_ids = cname_vs_user_ids.get(company_id)

        user_ids_chunks = get_data_in_chunks(user_ids, 20)
        for user_id_chunk in user_ids_chunks:
            try:
                await update_data(user_id_chunk, company_id)
            except Exception as e:
                print("error: {}".format(e))

        # Notify the queue that the "work item" has been processed.
        queue.task_done()


async def process_cname_vs_users(cname_vs_user_ids):
    queue = asyncio.Queue()

    for company_id in cname_vs_user_ids:
        queue.put_nowait(company_id)

    tasks = []
    for i in range(5):  # <- number of workers
        task = asyncio.create_task(
            worker(i, queue, cname_vs_user_ids))
        tasks.append(task)

    # Wait until the queue is fully processed.
    await queue.join()

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()

    try:
        # Wait until all worker tasks are cancelled.
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        print('Results:', responses)

    except Exception as e:
        print('Got an exception:', e)

Expectation: The tasks should be executed concurrently for 5 (no. of workers) companies.

Reality: Only first task is doing work for all companies sequentially.

Any help/suggestions will be helpful. Thanks in advance :)



Solution 1:[1]

So, finally, I figured out after reading more about concurrency in python. I have used futures.ThreadPoolExecutor as of now to achieve the desired output.

Solution:

async def update_data(req_id, user_ids, company_id): # <-- THIS IS THE ASYNC CALL ON CHUNK OF SOME USERS
    # A gRPC call to server here.

async def worker(cname_vs_user_ids):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_summary = {executor.submit(update_data, req_id, user_items, company_id) for
                          company_id, user_items in
                          cname_vs_user_ids.items()}
        for future in concurrent.futures.as_completed(future_summary):
            try:
                response = future.result()
            except Exception as error:
                print("ReqId: {}, Error occurred: {}".format(req_id, str(error)))
                executor.shutdown()
                raise error


async def process_cname_vs_users(cname_vs_user_ids):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(worker(cname_vs_user_ids))

The above solution worked wonders for me.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 user7630232