'python [oob] - data class with generator to get lazy evaluation
I'd like to build a class which describe a data object coming from DB, the table might be huge , so I was thinking using a generator and "release" row by row/chunks, I want to add to the class several function which allow etl-like, First, what's wrong with iter function - I fetch the first row only,
class Data(object):
DB_LOCATION = "./data.db"
def __init__(self, db_location=None):
"""Initialize db class variables"""
if db_location is not None:
self.connection = sqlite3.connect(db_location)
else:
self.connection = sqlite3.connect(self.DB_LOCATION)
self.cur = self.connection.cursor()
self.loop_ok = True
def __iter__(self):
while self.loop_ok:
row = self.cur.execute("select * from customers").fetchone()
if row:
yield row
else:
self.loop_ok = False
def transform1(self):
pass
def transform2(self):
pass
def load(self):
pass
test = iter(Data())
print(next(text))
# getting first row
print(next(text))
# getting first row again
second, How could i pipeline the data through the class transformation functions? I think below approach might work for single row but not sure how implement it so it will process the entire rows in the table... (pseudo code )
test = iter(Data())
etl_steps = [
transform_1,
transform_2,
load
]
for step in etl_steps:
test = step(test)
for i in test:
print(i)
###Update 24/05
I change the code to gain lazy evaluation for fetching data from the table ,
class Data(object):
DB_LOCATION = "./data.db"
def __init__(self, db_location=None):
"""Initialize db class variables"""
if db_location is not None:
self.connection = sqlite3.connect(db_location)
else:
self.connection = sqlite3.connect(self.DB_LOCATION)
self.cur = self.connection.cursor()
self.cur.execute("select * from customers")
def __iter__(self):
for row in self.cur.fetchall():
yield row
def transform1(tuple) -> list:
return list(tuple)
def transform2(my_list):
return list(map(lambda x: x.upper() if isinstance(x,str) else x, my_list))
def load():
pass
test = iter(Data())
#Working as expected
print(transform2(transform1(next(test))))
#now getting second row
print(transform2(transform1(next(test))))
The thing I'm not sure is how the transformation functions should implemented to get a real "streaming" nature (row by row) , When I use yield and not return - it didn't work for (even when i wrap each one with iterator) . Second , this quite ugly solution to wrap each function with other .
I tried below but it didnt work: Getting too many result and doesnt look the transomrations applied as expected .
etl_steps = [
transform1,
transform2
]
for step in etl_steps:
test = step(test)
print(all(test))
for a in test:
print(a)
Solution 1:[1]
Ok, I think I got it straight now. You want to use __init__ to execute the query. In my example here I just use a list instead of setting up the db file, etc. Then you use __iter__ to return self, and the __next__ method to fetch one row and return it.
Regarding the second question, you just make each "step" function a generator that loops over the input and yields the transformed results. Then you can nest them as you wish:
class Data(object):
def __init__(self):
pass # set up db connection
self.cur = [ # execute cursor
{'id': 1, 'firstName': 'Bob', 'lastName': 'Jones'},
{'id': 2, 'firstName': 'Mary', 'lastName': 'Jane'},
{'id': 3, 'firstName': 'Joe', 'lastName': 'Smith'},
{'id': 4, 'firstName': 'Jane', 'lastName': 'Doe'}
]
def __iter__(self):
return self
def __next__(self):
try:
row = self.cur.pop(0) # fetch next row from cursor
return row
except IndexError:
raise StopIteration
def transform1(data):
for row in data:
row['firstName'] = row['firstName'].upper()
yield row
def transform2(data):
for row in data:
row['midddleInitial'] = 'X'
yield row
def load(data):
for row in data:
print(row)
yield row
etl_steps = [transform1, transform2, load]
data = iter(Data())
for step in etl_steps:
data = step(data)
all(data)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
