'Is there a better/more succinct way to kill a Python process and its children?

My goal is to create a very simply web server with a single path that aborts the web server and its child process. I don't care how brutally they are killed - no cleanup is needed - however I would like to do it "properly" - which is to say in an orderly manner.

The idea is that I am running a process on a server, and I want to be able to kill it remotely by sending a web request.

I have found to very hard to come up with a way to reliably kill both the web server and the child processes.

After many hours effort and reading I have cobbled together a working solution from Stack Overflow and blog posts.

This code works, and kills the web server and the child. However, it is voodoo coding - which is to say I don't fully understand it - maybe it could be done better, maybe some of this code is not necessary.

The question: is the code below all necessary to reach my goal stated above? It seems like a very verbose way t get the job done.

Child process: test.py

import time

def main():
    while True:
        print('foo')
        time.sleep(5)

Web server:

import concurrent
import os
from test import main
import asyncio
from uvicorn import Config, Server
import signal
import psutil

def kill_child_processes(parent_pid, sig=signal.SIGTERM):
    try:
      parent = psutil.Process(parent_pid)
    except psutil.NoSuchProcess:
      return
    children = parent.children(recursive=True)
    for process in children:
      process.send_signal(sig)

from fastapi import FastAPI

app = FastAPI()

@app.get("/abort")
def root():
    kill_child_processes(os.getpid())
    return {"message": "Hello World!"}

async def exit():
    loop = asyncio.get_event_loop()
    print("Stop")
    loop.stop()

async def renderworkersynchronous():
    def handler(signum, frame):
        print('SIGINT for PID=', os.getpid())
        os.kill(os.getpid(), 9)

    def init():
        signal.signal(signal.SIGHUP, handler)
        signal.signal(signal.SIGTERM, handler)
        signal.signal(signal.SIGINT, handler)

    with concurrent.futures.ProcessPoolExecutor(max_workers=1, initializer=init) as executor:
        await asyncio.get_event_loop().run_in_executor(executor, main)

if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    try:
        config = Config(app=app, loop=loop)
        server = Server(config)
        task1 = loop.create_task(server.serve())
        task2 = loop.create_task(renderworkersynchronous())
        loop.run_until_complete(asyncio.gather(task1, task2, return_exceptions=True))
    finally:
        print("Close loop")
        loop.close()


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source