'StreamLogWriter' object has no attribute 'close'

I have to run airflow for below file. Once I run it I get "StreamLogWriter' object has no attribute 'close'" error following.

I have used two functions over there and I am assuming the problem lies in the fit_model function. Can you let me know the exact problem here?

initial_model_function.py

def load_preprocess(**kwargs):

    # load and preprocess creditcard data for initial model fit

    train_df_path = "Data/creditcard.csv"
    data_fraud = pd.read_csv(train_df_path)
    params = yaml.safe_load(open("params.yaml"))["params"]
    C=params["C"]
    kernel=params["kernel"]
    degree=params["degree"]
    gamma=params["gamma"]

    #divide dependent and independent variables
    y=data_fraud['Class']
    x=data_fraud.drop('Class',axis=1)

    # split data between train and test sets
    x_train,x_test,y_train,y_test=train_test_split(x,y,test_size=0.2)
    x_test.to_csv("data/x_test.csv")
    y_test.to_csv("data/y_test.csv")


    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')

    # normalize data

    x_train /= 255
    x_test /= 255

    # Set train samples apart that will serve as streaming data later on

    x_stream = x_train[:20000]
    y_stream = y_train[:20000]

    x_train = x_train[20000:]
    y_train = y_train[20000:]

    stream_sample = [x_stream, y_stream]

    pickle.dump(stream_sample, open(os.getcwd() + kwargs['path_stream_sample'], "wb"))

    # Store test set

    test_set = [x_test, y_test]

    pickle.dump(test_set, open(os.getcwd() + kwargs['path_test_set'], "wb"))

    return x_train, y_train, x_test, y_test

def fit_model(**kwargs):

    # fit model along preprocessed data and constructed model framework

    ti = kwargs['ti']
    loaded = ti.xcom_pull(task_ids='load_preprocess')

    logging.info('variables successfully fetched from previous task')

    x_train = loaded[0]
    y_train = loaded[1]
    x_test = loaded[2]
    y_test = loaded[3]

    # convert class vectors to binary class matrices

    #y_train = keras.utils.to_categorical(y_train, num_classes)
    #y_test = keras.utils.to_categorical(y_test, num_classes)

    # construct & fit

    SVM = svm.SVC(C=C, kernel=kernel, degree=degree, gamma=gamma)
    SVM.fit(x_train,y_train)
    Pkl_Filename = "models/Fraud_SVM.pkl"  

    with open(Pkl_Filename, 'wb') as file:  
        pickle.dump(SVM, file)


    # prediction true positive false positive rates
    y_train_pred = SVM.predict(x_train)
    train_fpr, train_tpr, tr_thresholds = roc_curve(y_train, y_train_pred)
    # reading test data
    x_test = pd.read_csv('data/x_test.csv')
    x_test = x_test.iloc[1: , :]
    y_test = pd.read_csv('data/y_test.csv')
    x_test = x_test.iloc[: , 1:]
    y_test = y_test.iloc[1: , :]
    y_test=y_test.iloc[: , 1:]
    Pkl_Filename = "models/Fraud_SVM.pkl"  
    #loading the model package
    with open(Pkl_Filename, 'rb') as file:  
        SVM = pickle.load(file)

    #eavaluate model value
    predictions_SVM = SVM.predict(x_test)

    test_fpr, test_tpr, te_thresholds = roc_curve(y_test, predictions_SVM)
    print(test_fpr)
    print(test_tpr)

    accuracy=accuracy_score(predictions_SVM, y_test)*100
    print(accuracy)
    conf_mat = confusion_matrix(y_test,predictions_SVM)
    print(conf_mat)
    

    model.save(os.getcwd() + kwargs['initial_model_path'])

initial_model_DAG.py

import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

from src.models.initial_model_functions import load_preprocess, fit_model

PATH_STREAM_SAMPLE = "/data/stream_sample.p"
PATH_TEST_SET = "/data/test_set.p"
INITIAL_MODEL_PATH = "/models/current_model/initial_model.H5"


args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(1),      # this in combination with catchup=False ensures the DAG being triggered from the current date onwards along the set interval
    'provide_context': True,                            # this is set to True as we want to pass variables on from one task to another
}

dag = DAG(
    dag_id='initial_model_DAG',
    default_args=args,
    schedule_interval= '@once',             # set interval
    catchup=False,                          # indicate whether or not Airflow should do any runs for intervals between the start_date and the current date that haven't been run thus far
)


task1 = PythonOperator(
    task_id='load_preprocess',
    python_callable=load_preprocess,        # function to be executed
    op_kwargs={'path_stream_sample': PATH_STREAM_SAMPLE,        # input arguments
               'path_test_set': PATH_TEST_SET},
    dag=dag,
)

task2 = PythonOperator(
    task_id='fit_model',
    python_callable=fit_model,
    op_kwargs={'initial_model_path': INITIAL_MODEL_PATH},
    dag=dag,
)

task1 >> task2                  # set task priority


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source