'Luigi task methods execution order
What is the order in which Luigi executes the methods (run, output, requires). I understand requires is run as a first check for checking the validity of the task DAG, but shouldn't output be run after run()?
I'm actually trying to wait for a kafka message in run and based on that trigger a bunch of other tasks and return a LocalTarget. Like this:
def run(self):
for message in self.consumer:
self.metadata_key = str(message.value, 'utf-8')
self.path = os.path.join(settings.LUIGI_OUTPUT_PATH, self.metadata_key, self.batch_id)
if not os.path.exists(self.path):
os.mkdir(self.path)
with self.conn.cursor() as cursor:
all_accounts = cursor.execute('select domainname from tblaccountinfo;')
for each in all_accounts:
open(os.path.join(self.path,each)).close()
def output(self):
return LocalTarget(self.path)
However, I get an error saying:
Exception: path or is_tmp must be set
At the return LocalTarget(self.path) line. Why does luigi try to execute the def output() method till def run() is done?
Solution 1:[1]
It is not the execution order. Luigi will check for the file that we want to create using output() method is existing or not before making the task to pending status. So, it expects the variables to be resolved if you are using any. Here, you are using self.path, which is getting created in the run method. That's why the error.
Either you have to create the path in the class itself and consume in output method or create them in the output method itself and consume them in the run method as below
self.output().open('w').close()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Pathanjali Tallapragada |
