'python [oob] - data class with generator to get lazy evaluation

I'd like to build a class which describe a data object coming from DB, the table might be huge , so I was thinking using a generator and "release" row by row/chunks, I want to add to the class several function which allow etl-like, First, what's wrong with iter function - I fetch the first row only,



class Data(object):
    DB_LOCATION = "./data.db"

    def __init__(self, db_location=None):
        """Initialize db class variables"""
        if db_location is not None:
            self.connection = sqlite3.connect(db_location)
        else:
            self.connection = sqlite3.connect(self.DB_LOCATION)
        self.cur = self.connection.cursor()
        self.loop_ok = True

    def __iter__(self):
      while self.loop_ok:
        row = self.cur.execute("select * from customers").fetchone()
        if row:
          yield row
        else:
          self.loop_ok = False 

   def transform1(self):
        pass
   def transform2(self):
        pass
   def load(self):
        pass

test = iter(Data())
print(next(text))
# getting first row
print(next(text))
# getting first row again 

second, How could i pipeline the data through the class transformation functions? I think below approach might work for single row but not sure how implement it so it will process the entire rows in the table... (pseudo code )

test = iter(Data())
etl_steps = [
        transform_1, 
        transform_2, 
        load
        ]

for step in etl_steps:
     test = step(test)
for i in test:
    print(i)
###Update 24/05

I change the code to gain lazy evaluation for fetching data from the table ,



class Data(object):
    DB_LOCATION = "./data.db"

    def __init__(self, db_location=None):
        """Initialize db class variables"""
        if db_location is not None:
            self.connection = sqlite3.connect(db_location)
        else:
            self.connection = sqlite3.connect(self.DB_LOCATION)
        self.cur = self.connection.cursor()
        self.cur.execute("select * from customers")
    def __iter__(self):
        for row in self.cur.fetchall(): 
                yield row           

def transform1(tuple) -> list:
      return list(tuple)

def transform2(my_list):
      return list(map(lambda x: x.upper() if isinstance(x,str) else x, my_list))
def load():
      pass

test = iter(Data())

#Working as expected
print(transform2(transform1(next(test))))
#now getting second row
print(transform2(transform1(next(test))))

The thing I'm not sure is how the transformation functions should implemented to get a real "streaming" nature (row by row) , When I use yield and not return - it didn't work for (even when i wrap each one with iterator) . Second , this quite ugly solution to wrap each function with other .

I tried below but it didnt work: Getting too many result and doesnt look the transomrations applied as expected .


etl_steps = [
        transform1, 
        transform2
        ]

for step in etl_steps:
     test = step(test)

print(all(test))
for a in test:
  print(a)



Solution 1:[1]

Ok, I think I got it straight now. You want to use __init__ to execute the query. In my example here I just use a list instead of setting up the db file, etc. Then you use __iter__ to return self, and the __next__ method to fetch one row and return it.

Regarding the second question, you just make each "step" function a generator that loops over the input and yields the transformed results. Then you can nest them as you wish:

class Data(object):

    def __init__(self):
        pass # set up db connection
        self.cur = [ # execute cursor
            {'id': 1, 'firstName': 'Bob', 'lastName': 'Jones'},
            {'id': 2, 'firstName': 'Mary', 'lastName': 'Jane'},
            {'id': 3, 'firstName': 'Joe', 'lastName': 'Smith'},
            {'id': 4, 'firstName': 'Jane', 'lastName': 'Doe'}
        ]

    def __iter__(self):
        return self

    def __next__(self):
        try:
            row = self.cur.pop(0) # fetch next row from cursor
            return row
        except IndexError:
            raise StopIteration

def transform1(data):
    for row in data:
        row['firstName'] = row['firstName'].upper()
        yield row

def transform2(data):
    for row in data:
        row['midddleInitial'] = 'X'
        yield row

def load(data):
    for row in data:
        print(row)
        yield row


etl_steps = [transform1, transform2, load]

data = iter(Data())

for step in etl_steps:
    data = step(data)

all(data)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1