'PostgresSql update db used with Ayncio and Semaphore: couse deadlock error when Semaphore > 1

What I need: downloading data from API in async way and save/update it to pg database

Problem: geting an error - deadlock detaction: when BoundedSemaphore arg in main() > 1

Here is my async main function:

async def main(self, parameters, class_sql_alchemy, class_pydantic, url):

    async with httpx.AsyncClient() as client:
        resp = await client.post(url, headers=headers, json=data, 
        params={'per_page':100})
    
        total_pages = resp.json()["pagination"]["total_pages"]

        # BoundedSemaphore(1) > 1 cause deadlock error
        sem = asyncio.BoundedSemaphore(1)
        tasks = [
                asyncio.ensure_future(
                self.safe_download(i, headers, data, class_sql_alchemy, class_pydantic, url, sem))
                # creating task starts coroutine
                for i
                in range(total_pages)  
               ]
        await asyncio.gather(*tasks)  # await moment all downloads done

Here is my async safe_download function:

  async def safe_download(self, i, headers, data, class_sql_alchemy, class_pydantic, url, sem):
        async with sem:  # semaphore limits num of simultaneous downloads
            await self.download(i, headers, data, class_sql_alchemy, class_pydantic, url)

Here is my async download function:

  async def download(self, num_page, headers, data, class_sql_alchemy, class_pydantic, url):
        async with httpx.AsyncClient() as client:
            resp = await client.post(url, headers=headers, json=data, params={'page': num_page + 1,'per_page': 100})

            items: List[class_sql_alchemy] = [class_pydantic(**item).dict() for item in
                                              resp.json()['people']]
            list_of_item = []

            for item in items:
                list_of_item.append(item)
            async with settings.async_orm_session.begin() as session:
                await upsert(session, People, list_of_item)

Here my upsert function:

async def upsert(session, model, row):
    table = model.__table__
    stmt = postgresql.insert(table)
    primary_keys = [key.name for key in inspect(table).primary_key]
    update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}

    if not update_dict:
        raise ValueError("insert_or_update resulted in an empty update_dict")

    stmt = stmt.on_conflict_do_update(index_elements=primary_keys,
                                      set_=update_dict)

    await session.execute(stmt, row)

On startup:
- db connection

engine = create_async_engine(db_async_url, future=True, echo=True)
async_orm_session = sessionmaker(
    engine, expire_on_commit=False, class_= AsyncSession
)


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source