'Asyncio Tasks running sequentially
I've recently started working on python and its related concurrency aspects and I'm banging my head around asyncio.
Structure of Data: List of companies with users-list in them.
Goal: I want to execute gRPC calls in parallel, with a task to always run for a particular company. Also, the API call is on users-list, and is a batch call [not a single call for one company]
Ref I followed: https://docs.python.org/3/library/asyncio-queue.html [Modified a bit according to my use-case]
What I've done: Below 3 small functions with process_cname_vs_users having input of company_id vs users-list
async def update_data(req_id, user_ids, company_id): # <-- THIS IS THE ASYNC CALL ON CHUNK OF SOME USERS
# A gRPC call to server here.
async def worker(worker_id, queue, company_vs_user_ids):
while True:
company_id = await queue.get()
user_ids = cname_vs_user_ids.get(company_id)
user_ids_chunks = get_data_in_chunks(user_ids, 20)
for user_id_chunk in user_ids_chunks:
try:
await update_data(user_id_chunk, company_id)
except Exception as e:
print("error: {}".format(e))
# Notify the queue that the "work item" has been processed.
queue.task_done()
async def process_cname_vs_users(cname_vs_user_ids):
queue = asyncio.Queue()
for company_id in cname_vs_user_ids:
queue.put_nowait(company_id)
tasks = []
for i in range(5): # <- number of workers
task = asyncio.create_task(
worker(i, queue, cname_vs_user_ids))
tasks.append(task)
# Wait until the queue is fully processed.
await queue.join()
# Cancel our worker tasks.
for task in tasks:
task.cancel()
try:
# Wait until all worker tasks are cancelled.
responses = await asyncio.gather(*tasks, return_exceptions=True)
print('Results:', responses)
except Exception as e:
print('Got an exception:', e)
Expectation: The tasks should be executed concurrently for 5 (no. of workers) companies.
Reality: Only first task is doing work for all companies sequentially.
Any help/suggestions will be helpful. Thanks in advance :)
Solution 1:[1]
So, finally, I figured out after reading more about concurrency in python. I have used futures.ThreadPoolExecutor as of now to achieve the desired output.
Solution:
async def update_data(req_id, user_ids, company_id): # <-- THIS IS THE ASYNC CALL ON CHUNK OF SOME USERS
# A gRPC call to server here.
async def worker(cname_vs_user_ids):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_summary = {executor.submit(update_data, req_id, user_items, company_id) for
company_id, user_items in
cname_vs_user_ids.items()}
for future in concurrent.futures.as_completed(future_summary):
try:
response = future.result()
except Exception as error:
print("ReqId: {}, Error occurred: {}".format(req_id, str(error)))
executor.shutdown()
raise error
async def process_cname_vs_users(cname_vs_user_ids):
loop = asyncio.get_event_loop()
loop.run_until_complete(worker(cname_vs_user_ids))
The above solution worked wonders for me.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | user7630232 |
