'How to terminate Python's `ProcessPoolExecutor` when parent process dies?

Is there a way to make the processes in concurrent.futures.ProcessPoolExecutor terminate if the parent process terminates for any reason?

Some details: I'm using ProcessPoolExecutor in a job that processes a lot of data. Sometimes I need to terminate the parent process with a kill command, but when I do that the processes from ProcessPoolExecutor keep running and I have to manually kill them too. My primary work loop looks like this:

with concurrent.futures.ProcessPoolExecutor(n_workers) as executor:
    result_list = [executor.submit(_do_work, data) for data in data_list]
    for id, future in enumerate(
            concurrent.futures.as_completed(result_list)):
        print(f'{id}: {future.result()}')

Is there anything I can add here or do differently to make the child processes in executor terminate if the parent dies?



Solution 1:[1]

You can start a thread in each process to terminate when parent process dies:

def start_thread_to_terminate_when_parent_process_dies(ppid):
    pid = os.getpid()

    def f():
        while True:
            try:
                os.kill(ppid, 0)
            except OSError:
                os.kill(pid, signal.SIGTERM)
            time.sleep(1)

    thread = threading.Thread(target=f, daemon=True)
    thread.start()

Usage: pass initializer and initargs to ProcessPoolExecutor

with concurrent.futures.ProcessPoolExecutor(
        n_workers,
        initializer=start_thread_to_terminate_when_parent_process_dies,  # +
        initargs=(os.getpid(),),                                         # +
) as executor:

This works even if the parent process is SIGKILL/kill -9'ed.

Solution 2:[2]

I would suggest two changes:

  1. Use a kill -15 command, which can be handled by the Python program as a SIGTERM signal rather than a kill -9 command.
  2. Use a multiprocessing pool created with the multiprocessing.pool.Pool class, whose terminate method works quite differently than that of the concurrent.futures.ProcessPoolExecutor class in that it will kill all processes in the pool so any tasks that have been submitted and running will be also immediately terminated.

Your equivalent program using the new pool and handling a SIGTERM interrupt would be:

from multiprocessing import Pool
import signal
import sys
import os
...

def handle_sigterm(*args):
    #print('Terminating...', file=sys.stderr, flush=True)
    pool.terminate()
    sys.exit(1)


# The process to be "killed", if necessary:
print(os.getpid(), file=sys.stderr)
pool = Pool(n_workers)
signal.signal(signal.SIGTERM, handle_sigterm)
results = pool.imap_unordered(_do_work, data_list)
for id, result in enumerate(results):
    print(f'{id}: {result}')

Solution 3:[3]

You could run the script in a kill-cgroup. When you need to kill the whole thing, you can do so by using the cgroup's kill switch. Even a cpu-cgroup will do the trick as you can access the group's pids.

Check this article on how to use cgexec.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 aaron
Solution 2 Booboo
Solution 3 ThePsyjo