'StreamLogWriter' object has no attribute 'close'
I have to run airflow for below file. Once I run it I get "StreamLogWriter' object has no attribute 'close'" error following.
I have used two functions over there and I am assuming the problem lies in the fit_model function. Can you let me know the exact problem here?
initial_model_function.py
def load_preprocess(**kwargs):
# load and preprocess creditcard data for initial model fit
train_df_path = "Data/creditcard.csv"
data_fraud = pd.read_csv(train_df_path)
params = yaml.safe_load(open("params.yaml"))["params"]
C=params["C"]
kernel=params["kernel"]
degree=params["degree"]
gamma=params["gamma"]
#divide dependent and independent variables
y=data_fraud['Class']
x=data_fraud.drop('Class',axis=1)
# split data between train and test sets
x_train,x_test,y_train,y_test=train_test_split(x,y,test_size=0.2)
x_test.to_csv("data/x_test.csv")
y_test.to_csv("data/y_test.csv")
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
# normalize data
x_train /= 255
x_test /= 255
# Set train samples apart that will serve as streaming data later on
x_stream = x_train[:20000]
y_stream = y_train[:20000]
x_train = x_train[20000:]
y_train = y_train[20000:]
stream_sample = [x_stream, y_stream]
pickle.dump(stream_sample, open(os.getcwd() + kwargs['path_stream_sample'], "wb"))
# Store test set
test_set = [x_test, y_test]
pickle.dump(test_set, open(os.getcwd() + kwargs['path_test_set'], "wb"))
return x_train, y_train, x_test, y_test
def fit_model(**kwargs):
# fit model along preprocessed data and constructed model framework
ti = kwargs['ti']
loaded = ti.xcom_pull(task_ids='load_preprocess')
logging.info('variables successfully fetched from previous task')
x_train = loaded[0]
y_train = loaded[1]
x_test = loaded[2]
y_test = loaded[3]
# convert class vectors to binary class matrices
#y_train = keras.utils.to_categorical(y_train, num_classes)
#y_test = keras.utils.to_categorical(y_test, num_classes)
# construct & fit
SVM = svm.SVC(C=C, kernel=kernel, degree=degree, gamma=gamma)
SVM.fit(x_train,y_train)
Pkl_Filename = "models/Fraud_SVM.pkl"
with open(Pkl_Filename, 'wb') as file:
pickle.dump(SVM, file)
# prediction true positive false positive rates
y_train_pred = SVM.predict(x_train)
train_fpr, train_tpr, tr_thresholds = roc_curve(y_train, y_train_pred)
# reading test data
x_test = pd.read_csv('data/x_test.csv')
x_test = x_test.iloc[1: , :]
y_test = pd.read_csv('data/y_test.csv')
x_test = x_test.iloc[: , 1:]
y_test = y_test.iloc[1: , :]
y_test=y_test.iloc[: , 1:]
Pkl_Filename = "models/Fraud_SVM.pkl"
#loading the model package
with open(Pkl_Filename, 'rb') as file:
SVM = pickle.load(file)
#eavaluate model value
predictions_SVM = SVM.predict(x_test)
test_fpr, test_tpr, te_thresholds = roc_curve(y_test, predictions_SVM)
print(test_fpr)
print(test_tpr)
accuracy=accuracy_score(predictions_SVM, y_test)*100
print(accuracy)
conf_mat = confusion_matrix(y_test,predictions_SVM)
print(conf_mat)
model.save(os.getcwd() + kwargs['initial_model_path'])
initial_model_DAG.py
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from src.models.initial_model_functions import load_preprocess, fit_model
PATH_STREAM_SAMPLE = "/data/stream_sample.p"
PATH_TEST_SET = "/data/test_set.p"
INITIAL_MODEL_PATH = "/models/current_model/initial_model.H5"
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1), # this in combination with catchup=False ensures the DAG being triggered from the current date onwards along the set interval
'provide_context': True, # this is set to True as we want to pass variables on from one task to another
}
dag = DAG(
dag_id='initial_model_DAG',
default_args=args,
schedule_interval= '@once', # set interval
catchup=False, # indicate whether or not Airflow should do any runs for intervals between the start_date and the current date that haven't been run thus far
)
task1 = PythonOperator(
task_id='load_preprocess',
python_callable=load_preprocess, # function to be executed
op_kwargs={'path_stream_sample': PATH_STREAM_SAMPLE, # input arguments
'path_test_set': PATH_TEST_SET},
dag=dag,
)
task2 = PythonOperator(
task_id='fit_model',
python_callable=fit_model,
op_kwargs={'initial_model_path': INITIAL_MODEL_PATH},
dag=dag,
)
task1 >> task2 # set task priority
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
