'How to pass futures around to different endpoints in dask

I have a dask cluster (not asynchronous) setup in FastAPI, and I have created two endpoints. One submits the function to the dask cluster and the other gets polled by vuejs every so often and once the status is "finished" it stops polling. However dask has the issue that it forgets about futures if it thinks you do not need them. So I have a dictionary to hold onto them (similar to here. So I used that in my code and in my local container it worked. However deployed in pods on kubernetes I am able to print out the future key and future status to the logs, but the future is not there. The code for this is like as follows:

futures = {}

@router.post('/simulate')
async def simulate(request: Request, background_tasks: BackgroundTasks) -> object:

    data = await request.json()
    if data is None or not data:
        raise HTTPException(status_code=422, detail={
            "message": "Json body can't be empty"})

    try:
        data_is_cached = await get_cache(key=json.dumps(data, cls=CustomJSONEncoder))
        if data_is_cached:
            return {
                "is_cached": True,
                "result": data_is_cached
            }
        else:
            # logic to create inputs_dict using data here
            future = client.submit(run_simulation, inputs_dict)
            futures[future.key] = [future, data]
            return {
                "is_cached": False,
                "key": future.key
            }
    except Exception as e:
        traceback.print_exc()
        raise HTTPException(status_code=500, detail={
            "message": str(e)})


@router.get('/get_status/{key_id}')
async def check_status(key_id: str, request: Request, background_tasks: BackgroundTasks) -> object:

    try:
        future = futures[key_id][0]
        data = futures[key_id][1]

        if 'finished' in future.status:
            result = future.result()
            background_tasks.add_task(set_cache, key=json.dumps(data, cls=CustomJSONEncoder), data=result)
            return {
                "statusFinished": True,
                "result": result
            }
        elif "pending" in future.status:
            return {
                "statusFinished": False,
                "result": "pending"
            }
        else:
            return {
                "statusFinished": False,
                "result": "error"
            }
    except Exception as e:
        traceback.print_exc()
        raise HTTPException(status_code=500, detail={
            "message": str(e)})

I then looked at dask datasets as a dataset can be shared across dask workers. But this always leave the state in "pending" I have to explicitly call future.result() why defies the purpose of what I am trying to do. This was in my container in localhost.

Here the code for this is:

@router.post('/simulate')
async def simulate(request: Request, background_tasks: BackgroundTasks) -> object:

    data = await request.json()
    if data is None or not data:
        raise HTTPException(status_code=422, detail={
            "message": "Json body can't be empty"})

    try:
        data_is_cached = await get_cache(key=json.dumps(data, cls=CustomJSONEncoder))
        if data_is_cached:
            return {
                "is_cached": True,
                "result": data_is_cached
            }
        else:
            # logic to create inputs_dict using data here
            future = client.submit(run_simulation, inputs_dict)
            client.datasets[future.key] = future
            client.datasets[f"{future.key}-data"] = data
            return {
                "is_cached": False,
                "key": future.key
            }
    except Exception as e:
        traceback.print_exc()
        raise HTTPException(status_code=500, detail={
            "message": str(e)})


@router.get('/get_status/{key_id}')
async def check_status(key_id: str, request: Request, background_tasks: BackgroundTasks) -> object:

    try:
        future = client.get_dataset(key_id)
        data = client.get_dataset(f"{key_id}-data")

        if 'finished' in future.status:
            result = future.result()
            background_tasks.add_task(set_cache, key=json.dumps(data, cls=CustomJSONEncoder), data=result)
            return {
                "statusFinished": True,
                "result": result
            }
        elif "pending" in future.status:
            return {
                "statusFinished": False,
                "result": "pending"
            }
        else:
            return {
                "statusFinished": False,
                "result": "error"
            }
    except Exception as e:
        traceback.print_exc()
        raise HTTPException(status_code=500, detail={
            "message": str(e)})

I have the dask-worker to have 1 worker and 1 thread. Is there not a way to hold on to these futures properly?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source