'Running dask map_partition functions in multiple workers
I have a dask architecture implemented with five docker containers: a client, a scheduler, and three workers. I also have a large dask dataframe stored in parquet format in a docker volume. The dataframe was created with 3 partitions, so there are 3 files (one file per partition).
I need to run a function on the dataframe with map_partitions, where each worker will take one partition to process.
My attempt:
def my_function(dfx):
return dfx['abc'] = dfx['def'] + 1
df = dd.read_parquet(... path to parquet file)
client = Client('127.0.0.1:8786')
with joblib.parallel_backend('dask'):
df = df.map_partitions(my_function)
Is this the correct approach? how to tell dask to use the client variable in the with statement so the functions run on the workers? Do I need df.compute() to start the execution?
Note: removing the 'with' statement, this works fine if the dask client is run on Jupyter. Problem is when the Dask client is run on Docker as Dask creates the workers in the web application instead of the Docker containers.
UPDATE
docker compose file:
version: '3'
services:
web:
image: img-python-01
container_name: cont_flask
volumes:
- c:/visualcode-py:/code
- c:/conf:/conf
- vol_dask_data:/data
- vol_dask_model:/model
ports:
- "5000:5000"
working_dir: /code
environment:
- app.config=/conf/py.app.json
- common.config=/conf/py.common.json
- CUDA_VISIBLE_DEVICES=''
entrypoint:
- gunicorn
command:
- -t 7200
- -b 0.0.0.0:5000
- --reload
- app.frontend.app:app
scheduler:
image: img-python-01
container_name: cont_scheduler
ports:
- "8787:8787"
- "8786:8786"
entrypoint:
- dask-scheduler
worker:
image: img-python-01
depends_on:
- scheduler
environment:
- app.config=/conf/py.app.json
- common.config=/conf/py.common.json
- PYTHONPATH=/code
- MODEL_PATH=/model/rfc_model.pkl
- PREPROCESSING_PATH=/model/data_columns.pkl
- SCHEDULER_ADDRESS=scheduler
- SCHEDULER_PORT=8786
- CUDA_VISIBLE_DEVICES=''
working_dir: /code
volumes:
- c:/visualcode-py:/code
- c:/conf:/conf
- c:/winfiles:/winfiles
- vol_dask_data:/data
- vol_dask_model:/model
entrypoint:
- dask-worker
command:
- scheduler:8786
volumes:
vol_dask_data:
name: vol_dask_data
vol_dask_model:
name: vol_dask_model
Starts with docker-compose up -d --scale worker=4, the flask/gunicorn application runs on web.
Note: this configuration works fine when I run a client.submit(), workers run on containers.
UPDATE 2
This is the code that works with current docker compose file:
futures1 = client.submit(process_loans, exec_id, 1, dataset, w1)
futures2 = client.submit(process_loans, exec_id, 2, dataset, w2)
worker_responses = client.gather([futures1, futures2])
I see in the Dask dashboard that the function process_loans is running on the worker containers
Solution 1:[1]
The python snippet does not appear to use the dask API efficiently. It might be that your actual function is a bit more complex, so map_partitions cannot be avoided, but let's take a look at the simple case first:
def my_function(dfx):
# return dfx['abc'] = dfx['def'] + 1
# the above returns the result of assignment
# we need to separate the assignment and return statements
dfx['abc'] = dfx['def'] + 1
return dfx
df = dd.read_parquet(... path to parquet file)
client = Client('127.0.0.1:8786')
with joblib.parallel_backend('dask'):
df = df.map_partitions(my_function)
Another way to re-write the above (for this basic case) is to explicitly assign new column values:
df = dd.read_parquet(... path to parquet file)
df['abc'] = df['def'] + 1
Or to use the .assign method:
df = (
dd.read_parquet(path_to_parquet_file)
.assign(abc=lambda df: df['def'] + 1)
)
In terms of the other questions:
if a
clientis created outside the context,joblibwill use the existing client;for restricting computations to one worker per partition, the easiest one is to assign each worker a specific unit of resource
fooand require that each computation uses up onefooof the available resources, seedocs on resources;whether
.computeis necessary depends on what is happening downstream. If the data can fit into available memory and it is efficient to have data in memory, then.computeshould be executed. Otherwise, it is likely to be more efficient to delay any actual computations until the final step. For example, if the end result of this code is saving of the updated data to another set of parquet files, then there is no need to issue.computesincedaskwill trigger computations when.to_parquetis executed.
Solution 2:[2]
It seems to me that there is some confusion here between the various pieces of the system.
First let me point out that the function as given produces a syntax error. Maybe you meant
def my_function(dfx):
dfx['abc'] = dfx['def'] + 1
return dfx
(this was mentioned in another answer)
Secondly, why is joblib involved at all here? You do not seem to be submitting work to joblib anywhere, it is not being used at all. All you need are dask API calls and your client.
df2 = df.map_partitions(my_function)
and then do whatever it is that you wanted to do with df2. This dos not yet begin any execution, it makes a graph of operations to be carried out.
If you wanted to resolve the whole resultant dataset into client memory (this is probably not what you wanted!), you could do
out = df2.compute()
and this will use your distributed scheduler automatically. You could also be more explicit
f = client.compute(df2)
which returns a future that you can either wait on (f.result() and other functions in distributed) or allow to proceed in the background.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | |
| Solution 2 | mdurant |
