'Async Multiprocessing

Hi I am trying to send this processing to different cores as they are all independent of one another, however none of them are awaiting so the tasks never run. I thought that was what futures were for?


async def process_object(filename):
    # await 1 - download file from S3
    
    # await 2 - parse XML file

if "__main__" == __name__:
    objects = get_objects(
        bucket_name=bucket_name, prefix=prefix, file_extension=".xml", top_n=top_n
    )

    futures = []
    with concurrent.futures.ProcessPoolExecutor(
        multiprocessing.cpu_count()
    ) as executor:
        futures = [executor.submit(process_object, filename) for filename in objects]
    concurrent.futures.wait(futures)



Solution 1:[1]

You don't need to use asyncio if you're submitting a task to ProcessPoolExecutor. Those tasks will be executed in another process so they are already running concurrently without the use of asyncio. Your process_object function never runs with your current code because a coroutine must be awaited before it will execute.

That is, you want something like:

def process_object(filename):
    # download file
    # parse file    
    ...

if "__main__" == __name__:
    objects = get_objects(
        bucket_name=bucket_name, prefix=prefix, file_extension=".xml", top_n=top_n
    )

    futures = []
    with concurrent.futures.ProcessPoolExecutor(
        multiprocessing.cpu_count()
    ) as executor:
        futures = [executor.submit(process_object, filename) for filename in objects]
    concurrent.futures.wait(futures)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 larsks