'PostgresSql update db used with Ayncio and Semaphore: couse deadlock error when Semaphore > 1
What I need: downloading data from API in async way and save/update it to pg database
Problem: geting an error - deadlock detaction: when BoundedSemaphore arg in main() > 1
Here is my async main function:
async def main(self, parameters, class_sql_alchemy, class_pydantic, url):
async with httpx.AsyncClient() as client:
resp = await client.post(url, headers=headers, json=data,
params={'per_page':100})
total_pages = resp.json()["pagination"]["total_pages"]
# BoundedSemaphore(1) > 1 cause deadlock error
sem = asyncio.BoundedSemaphore(1)
tasks = [
asyncio.ensure_future(
self.safe_download(i, headers, data, class_sql_alchemy, class_pydantic, url, sem))
# creating task starts coroutine
for i
in range(total_pages)
]
await asyncio.gather(*tasks) # await moment all downloads done
Here is my async safe_download function:
async def safe_download(self, i, headers, data, class_sql_alchemy, class_pydantic, url, sem):
async with sem: # semaphore limits num of simultaneous downloads
await self.download(i, headers, data, class_sql_alchemy, class_pydantic, url)
Here is my async download function:
async def download(self, num_page, headers, data, class_sql_alchemy, class_pydantic, url):
async with httpx.AsyncClient() as client:
resp = await client.post(url, headers=headers, json=data, params={'page': num_page + 1,'per_page': 100})
items: List[class_sql_alchemy] = [class_pydantic(**item).dict() for item in
resp.json()['people']]
list_of_item = []
for item in items:
list_of_item.append(item)
async with settings.async_orm_session.begin() as session:
await upsert(session, People, list_of_item)
Here my upsert function:
async def upsert(session, model, row):
table = model.__table__
stmt = postgresql.insert(table)
primary_keys = [key.name for key in inspect(table).primary_key]
update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
if not update_dict:
raise ValueError("insert_or_update resulted in an empty update_dict")
stmt = stmt.on_conflict_do_update(index_elements=primary_keys,
set_=update_dict)
await session.execute(stmt, row)
On startup:
- db connection
engine = create_async_engine(db_async_url, future=True, echo=True)
async_orm_session = sessionmaker(
engine, expire_on_commit=False, class_= AsyncSession
)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
