'How to inject the information about load version into Kedro node?

I need to run a Kedro (v0.17.4) pipeline with a node that is supposed to process data with a different logic depending on the load version of the input.

As a simple and crude example assuming there is a catalog.yml file with this entry:

test_data_set:
  type: pandas.CSVDataSet
  filepath: data/01_raw/test.csv
  versioned: true

and there are multiple versions of test.csv (say '1' and '2') and I want to use the Catalog from the config file and run the following node/pipeline:

from kedro.config import ConfigLoader
from kedro.io import DataCatalog

conf_loader = ConfLoader(['conf/base'])
conf_catalog = conf_loader.get('catalog*', 'catalog/**')
io = DataCatalog.from_config(conf_catalog)

def my_node(my_data_set):
    
    #if version_of_my_data_set == '1':          # how to do this?
    #    print("do something with version 1")

    # ... do something else

    return

my_pipeline = Pipeline([node(func=my_node, inputs="test_data_set", outputs=None, name="process_versioned_data")])


SequentialRunner().run(my_pipeline, catalog=io)

I understand that runtime parameters or the load version are supposed to be separated from the logic in a node by design, but in my specific case it would still be useful to find a way to do this.

In general the pipeline will be executed via the API but also via the command line with the --load_version flag.

Solutions that I have considered but discarded:

  • store the load version somehow in the Kedro session and access it within the node via "get_current_session" (how?)
  • add load_version as a required input parameter for the node (would probably break compatibility with some upstream pipeline)

In short: Is there a good way to pass the information of the user specified load version of a dataset to a kedro node?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source