from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import confusion_matrix
import pandas as pd
import numpy as np
import random
import re
import os
from .model_error_metrics import *
from ..preprocessing.general import *
from ..preprocessing.plausibility_classes import *
from ..detection.baseline import *
from ..detection.basics import *
from ..detection.statistical import *
from ..detection.ml import *
from ..correction.baseline import *
from ..correction.statistical import *
from ..correction.ml import *
def sample_weights_denseweight(df_fea_list, df_tar_list, detMLConfig):
dw = DenseWeight(alpha=1.0)
data_losses_train = dw.fit(train_part["value_plaus"].to_numpy(dtype="float"))
return sample_weight_train_list
def sample_weights_root(df_fea_list, df_tar_list, detMLConfig):
data_losses_train = relevance_fct_root(train_part["value_plaus"])
return sample_weight_train_list
def is_none_or_nan(tp_ind):
# Check if tp_ind is None
if tp_ind is None:
return True
# Check if tp_ind is NaN
#if np.issubdtype(tp_ind, np.number) and np.isnan(tp_ind):
if isinstance(tp_ind, np.number) and np.isnan(tp_ind):
return True
return False
def add_eval_of_model(id, model_name, data_set_name, cv, conf_matrx, cm_ind = [None, None, None, None]):
tn, fp, fn, tp = conf_matrx
tn_ind, fp_ind, fn_ind, tp_ind = cm_ind
new_row = {"ID": id,
"model": model_name,
"data_set": data_set_name,
"cv": cv,
"tn": tn, "fp": fp, "fn": fn, "tp": tp,
"tpr": (np.float64(tp) / (tp + fn)) if (tp + fn) > 0 else np.nan,
"tnr": (np.float64(tn) / (tn + fp)) if (tp + fn) > 0 else np.nan,
"fpr": (np.float64(fp) / (tn + fp)) if (tp + fn) > 0 else np.nan,
"fnr": (np.float64(fn) / (tp + fn)) if (tp + fn) > 0 else np.nan,
"recall": (np.float64(tp) / (tp + fn)) if (tp + fn) > 0 else np.nan,
"precision": (np.float64(tp) / (tp + fp)) if (tp + fn) > 0 else np.nan,
"f-score": (
2 * (np.float64(tp) / (tp + fp)) * (np.float64(tp) / (tp + fn)) /
(
(np.float64(tp) / (tp + fp)) + (np.float64(tp) / (tp + fn))
)
) if (tp + fp) > 0 and (tp + fn) > 0else np.nan,
"n_obs": tn+fp+fn+tp
}
# add row to model_eval_df of detMLConfig class instance
return new_row
def add_errclass_eval_of_model(df, target_pred, models_name, data_set_name, detMLConfig, evaluationHandler):
df["TN"] = (df[detMLConfig.colname_target_det] == False) & (df[target_pred] == False)
df["FP"] = (df[detMLConfig.colname_target_det] == False) & (df[target_pred] == True)
df["FN"] = (df[detMLConfig.colname_target_det] == True) & (df[target_pred] == False)
df["TP"] = (df[detMLConfig.colname_target_det] == True) & (df[target_pred] == True)
df_errorclass_gr = df[['error_class', 'TN', 'FP', 'FN', 'TP']].groupby("error_class").sum().reset_index()
for cur_err_class in list(df["error_class"].unique()):
new_row = {"ID": detMLConfig.colname_raw,
"model": models_name,
"data_set": data_set_name,
"errorclass": cur_err_class,
"tn": df_errorclass_gr[df_errorclass_gr["error_class"]==cur_err_class]["TN"].sum(),
"fp": df_errorclass_gr[df_errorclass_gr["error_class"]==cur_err_class]["FP"].sum(),
"fn": df_errorclass_gr[df_errorclass_gr["error_class"]==cur_err_class]["FN"].sum(),
"tp": df_errorclass_gr[df_errorclass_gr["error_class"]==cur_err_class]["TP"].sum(),
}
# add row to model_errorclass_eval of detMLConfig class instance
evaluationHandler.append_row_to_det_errorclass_eval(new_row)
def get_conf_matrix(y_true, y_pred):
tn = sum((y_true == False) &
(y_pred == False))
fp = sum((y_true == False) &
(y_pred == True))
fn = sum((y_true == True) &
(y_pred == False))
tp = sum((y_true == True) &
(y_pred == True))
conf_matrx = tn, fp, fn, tp
return conf_matrx
def getEvaluation(df, target_pred, model_name, data_set_name, Config, evaluationHandler, cv ="_"):
# compute confusion matrix values
#try:
# tn, fp, fn, tp = confusion_matrix(
# df[[detMLConfig.colname_target_det]],
# df[target_pred]).ravel()
#except:
if Config.target_sensor_uncertainty:
target_det = (abs(df[Config.colname_raw] - df[Config.colname_target_corr]) < Config.target_sensor_uncertainty)
else:
target_det = df[Config.colname_target_det]
conf_matrx = get_conf_matrix(target_det, df[target_pred])
# compute confusion matrix values for indicator function, value-based
if np.issubdtype(df[Config.colname_target_corr].dtype, np.number):
if Config.target_sensor_uncertainty:
target_det = (abs(df[Config.colname_raw] - df[Config.colname_target_corr]) < Config.target_sensor_uncertainty)
else:
target_det = df[Config.colname_target_det]
relevance_series = relevance_fct_indicator(df[Config.colname_target_corr].reset_index(drop=True), (1.25, float("inf")))
tn = sum(((target_det == False) &
(df[target_pred] == False)).astype(int).reset_index(drop=True)*relevance_series)
fp = sum(((target_det == False) &
(df[target_pred] == True)).astype(int).reset_index(drop=True)*relevance_series)
fn = sum(((target_det == True) &
(df[target_pred] == False)).astype(int).reset_index(drop=True)*relevance_series)
tp = sum(((target_det == True) &
(df[target_pred] == True)).astype(int).reset_index(drop=True)*relevance_series)
conf_matrx_ind = [tn, fp, fn, tp]
else:
conf_matrx_ind = [None, None, None, None]
# compute confusion matrix values event-based
if Config.colname_isEvent:
if Config.target_sensor_uncertainty:
target_det = (abs(df[Config.colname_raw] - df[Config.colname_target_corr]) < Config.target_sensor_uncertainty)
else:
target_det = df[Config.colname_target_det]
relevance_series = (df[Config.colname_isEvent] == True).astype(int).reset_index(drop=True)
tn = sum(((target_det == False) &
(df[target_pred] == False)).astype(int).reset_index(drop=True)*relevance_series)
fp = sum(((target_det == False) &
(df[target_pred] == True)).astype(int).reset_index(drop=True)*relevance_series)
fn = sum(((target_det == True) &
(df[target_pred] == False)).astype(int).reset_index(drop=True)*relevance_series)
tp = sum(((target_det == True) &
(df[target_pred] == True)).astype(int).reset_index(drop=True)*relevance_series)
conf_matrx_ev = [tn, fp, fn, tp]
'''
if Config.target_sensor_uncertainty:
gt_unc = (abs(df[Config.colname_raw] - df[Config.colname_target_corr]) < Config.target_sensor_uncertainty)
tn = sum(((gt_unc == False) &
(df[target_pred] == False)).astype(int).reset_index(drop=True))
fp = sum(((gt_unc == False) &
(df[target_pred] == True)).astype(int).reset_index(drop=True))
fn = sum(((gt_unc == True) &
(df[target_pred] == False)).astype(int).reset_index(drop=True))
tp = sum(((gt_unc == True) &
(df[target_pred] == True)).astype(int).reset_index(drop=True))
conf_matrx_unc = [tn, fp, fn, tp]
'''
# add evaluation line to df
new_row = add_eval_of_model(Config.colname_raw, model_name, data_set_name, cv, conf_matrx, conf_matrx_ind)
evaluationHandler.append_row_to_det_eval(new_row)
add_errclass_eval_of_model(df, target_pred, model_name, data_set_name, Config, evaluationHandler)
# compute confusion matrix values for indicator function, value-based
if np.issubdtype(df[Config.colname_target_corr].dtype, np.number):
new_row = add_eval_of_model(Config.colname_raw, model_name, f"r-1.25 {data_set_name}", cv, conf_matrx_ind)
evaluationHandler.append_row_to_det_eval(new_row)
# compute confusion matrix values event-based
if Config.colname_isEvent:
new_row = add_eval_of_model(Config.colname_raw, model_name, f"isEvent {data_set_name}", cv, conf_matrx_ev)
evaluationHandler.append_row_to_det_eval(new_row)
'''
if Config.target_sensor_uncertainty:
new_row = add_eval_of_model(Config.colname_raw, model_name, f"uncertainty {data_set_name}", cv, conf_matrx_unc)
evaluationHandler.append_row_to_det_eval(new_row)
'''
[docs]class EvaluationHandler:
r"""
The EvaluationHandler class manages the evaluation of machine learning models, specifically handling metrics for
both classification and regression tasks. It includes functionality to append new rows of evaluation data,
and retrieve stored evaluations. It also supports cross-validation and train-test evaluations.
Parameters
----------
Config : object
Configuration object that defines model settings for evaluation.
dataSetHandler : object
Object responsible for managing the dataset during evaluation.
Attributes
----------
COLUMN_LIST_EVAL_CAT : list
List of columns for storing categorical evaluation metrics (e.g., tn, fp, fn, tp, recall, precision).
COLUMN_LIST_ERRCLASS_EVAL_CAT : list
List of columns for storing evaluation results based on error classes (e.g., tnr).
COLUMN_LIST_EVAL_REG : list
List of columns for storing regression evaluation metrics (e.g., mse, rmse, mae, nse).
det_eval : pandas DataFrame
DataFrame that stores categorical evaluation results.
det_errorclass_eval : pandas DataFrame
DataFrame that stores evaluation results based on error classes.
corr_eval : pandas DataFrame
DataFrame that stores regression evaluation results.
Methods
-------
get_det_eval()
Retrieves the current categorical evaluation DataFrame (det_eval).
get_det_errorclass_eval()
Retrieves the current error class evaluation DataFrame (det_errorclass_eval).
append_row_to_det_eval(new_row)
Appends a new row of categorical evaluation data to the det_eval DataFrame.
append_row_to_det_errorclass_eval(new_row)
Appends a new row of error class evaluation data to the det_errorclass_eval DataFrame.
append_row_to_corr_eval(new_row)
Appends a new row of regression evaluation data to the corr_eval DataFrame.
"""
# Class constant
COLUMN_LIST_EVAL_CAT = [
"ID", "model", "data_set", "cv",
"tn", "fp", "fn", "tp",
"tpr", "tnr", "fpr", "fnr",
# "tpr_ind1.25",
# "fpr-r-1.25", "fnr-r-1.25",
"recall", "precision", "f-score",
"n_obs"]
COLUMN_LIST_ERRCLASS_EVAL_CAT = ["ID", "model", "data_set", "errorclass", "tnr"]
COLUMN_LIST_EVAL_REG = ["ID", "model", "data_set", "cv", "mse", "mse-rel-root", "mse-rel-ind 1.25",
"ind1-rel-ind 1.25",
"rate-rel-ind 1.25", "rmse", "mae", "nse", "n_obs",
"sera"]
[docs] def __init__(self, Config, dataSetHandler):
self.COLUMN_LIST_EVAL_CAT = EvaluationHandler.COLUMN_LIST_EVAL_CAT
self.COLUMN_LIST_EVAL_REG = EvaluationHandler.COLUMN_LIST_EVAL_REG
self.det_eval = pd.DataFrame(columns=EvaluationHandler.COLUMN_LIST_EVAL_CAT)
self.det_errorclass_eval = pd.DataFrame(columns=EvaluationHandler.COLUMN_LIST_ERRCLASS_EVAL_CAT)
self.corr_eval = pd.DataFrame(columns=EvaluationHandler.COLUMN_LIST_EVAL_REG)
@staticmethod
def __drop_empty_or_all_na_columns(df):
# Drop columns where all values are NaN
df_cleaned = df.dropna(axis=1, how='all')
# Optionally, drop columns where there are no non-NA values
df_cleaned = df_cleaned.loc[:, df_cleaned.notna().any(axis=0)]
return df_cleaned
[docs] def get_det_eval(self):
"""
Return the current DataFrame.
"""
return self.det_eval
[docs] def get_det_errorclass_eval(self):
"""
Return the current DataFrame.
"""
return self.det_errorclass_eval
[docs] def append_row_to_det_eval(self, new_row):
"""
Append a new row to the DataFrame.
The new row should be in a format that can be converted to a DataFrame.
"""
new_row_df = pd.DataFrame([new_row])
new_row_df = EvaluationHandler.__drop_empty_or_all_na_columns(new_row_df)
if not self.det_eval.empty:
self.det_eval = pd.concat([self.det_eval, new_row_df], ignore_index=True)
else:
self.det_eval = new_row_df
[docs] def append_row_to_det_errorclass_eval(self, new_row):
"""
Append a new row to the DataFrame.
The new row should be in a format that can be converted to a DataFrame.
"""
new_row_df = pd.DataFrame([new_row])
new_row_df = EvaluationHandler.__drop_empty_or_all_na_columns(new_row_df)
if not self.det_errorclass_eval.empty:
self.det_errorclass_eval = pd.concat([self.det_errorclass_eval, new_row_df], ignore_index=True)
else:
self.det_errorclass_eval = new_row_df
[docs] def append_row_to_corr_eval(self, new_row):
"""
Append a new row to the DataFrame.
The new row should be in a format that can be converted to a DataFrame.
"""
new_row_df = pd.DataFrame([new_row])
#new_row_df = EvaluationHandler.__drop_empty_or_all_na_columns(new_row_df)
if not self.corr_eval.empty:
self.corr_eval = pd.concat([self.corr_eval, new_row_df], ignore_index=True)
else:
self.corr_eval = new_row_df
def det_evaluate_method(func, dataSetHandler, config, *args):
d_part = {}
for i in range(dataSetHandler.list_len):
d_part[i] = {}
for i in range(dataSetHandler.list_len):
if "STAT" in f"{func}" or "BASIC" in f"{func}":
d_part[i]["train"] = func(
dataSetHandler.get_train_features()[i][config.colname_raw].copy(deep=True),
#None,
#detMLConfig,
*args)
d_part[i]["test"] = func(
dataSetHandler.get_test_features()[i][config.colname_raw].copy(deep=True),
#None,
#detMLConfig,
*args)
elif "ML" in f"{func}":
class_instance = func()
class_instance.fit(
dataSetHandler.get_train_features(exclude_columns=config.exclude_cols)[i].copy(deep=True),
dataSetHandler.get_train_targets(extract_single_target=config.colname_target_det)[i].copy(
deep=True),
config,
*args)
d_part[i]["train"] = class_instance.predict(
dataSetHandler.get_train_features(exclude_columns=config.exclude_cols)[i].copy(deep=True))
d_part[i]["test"] = class_instance.predict(
dataSetHandler.get_test_features(exclude_columns=config.exclude_cols)[i].copy(deep=True))
if config.det_ML_model_savefolder:
match = re.search(r"<class '[^']*\.(\w+)'>", f"{func}")
if match:
short_funcname = match.group(1)
else:
short_funcname = "model"
if not os.path.exists(f"{config.det_ML_model_savefolder}\\{config.colname_raw}"):
os.makedirs(f"{config.det_ML_model_savefolder}\\{config.colname_raw}")
class_instance.save_model(f"{config.det_ML_model_savefolder}\\{config.colname_raw}\\{short_funcname}_cv{i}.pkl")
data = {"train_event_absnr": [dataSetHandler.train_event_absnr[i]],
"train_event_rel": [dataSetHandler.train_eventrates[i]],
"train_timerange": [f"{dataSetHandler.get_train_features()[i].index.min(), dataSetHandler.get_train_features()[i].index.max()}"]
}
pd.DataFrame(data).to_csv(f"{config.det_ML_model_savefolder}\\{config.colname_raw}\\additional_info_cv{i}.csv", index=False)
elif "BASE" in f"{func}":
d_part[i]["train"] = func(
dataSetHandler.get_train_features()[i].copy(deep=True),
dataSetHandler.get_train_targets()[i].copy(deep=True),
config,
*args)
d_part[i]["test"] = func(
dataSetHandler.get_test_features()[i].copy(deep=True),
dataSetHandler.get_test_targets()[i].copy(deep=True),
config,
*args)
else:
print(f"Error in data and function handler regarding {func}.")
pass
return d_part
def extract_functions_and_parameters(methods):
functions = []
parameters = []
for cur_meth in methods:
#if multiple functions and/ or additional parameters are given
if isinstance(cur_meth, tuple):
# multiple functions are given
if isinstance(cur_meth[0], tuple):
# Handle nested tuple
functions_chain = []
parameters_chain = []
for nested_meth in cur_meth:
functions_chain.append(nested_meth[0])
parameters_chain.append(nested_meth[1:])
functions.append(functions_chain)
parameters.append(parameters_chain)
# additional parameters are given
else:
functions.append(cur_meth[0])
parameters.append(cur_meth[1:])
else:
functions.append(str(cur_meth))
parameters.append(())
return functions, parameters
[docs]def detection_evaluation_wrapper(config,
dataSetHandler,
det_ML_methods=[],
det_stat_methods=[],
):
r"""
Wraps error detection modeling and evaluation, applying both machine learning and statistical methods
for anomaly detection. This function manages the execution of models, performs cross-validation if configured,
and stores the results in a nested dictionary structure and a class instance of the evaluationHandler.
Parameters
----------
config : class instance of Config
A mutable class instance containing the configuration settings for model evaluation, such as
column names, cross-validation settings, and other options.
dataSetHandler : class instance of DataSetHandler
An instance responsible for storing and handling the dataset. It manages both the features
and targets for training and testing data.
det_ML_methods : list, optional
A list of machine learning methods for detection (default is an empty list). These methods
will be applied for error detection modeling.
det_stat_methods : list, optional
A list of statistical methods for detection (default is an empty list). These methods will
be applied for error detection modeling.
Returns
-------
d : dict
A nested dictionary, where each key corresponds to a machine learning or statistical method.
The value is a sub-dictionary that contains the train and test predictions (as pandas Series)
and the generated machine learning models.
evaluationHandler : object
A list containing the generated machine learning models for each method used in the detection.
"""
# initialization of returned parameters
d = {}
evaluationHandler = EvaluationHandler(config, dataSetHandler)
meth_functions, meth_parameters = extract_functions_and_parameters(det_stat_methods + det_ML_methods)
for i, cur_method in enumerate(meth_functions):
run_len = len(cur_method) if isinstance(cur_method, list) else 1
for j in range(run_len):
# if ground truth is numerical
cur_method_unchained = cur_method[j] if isinstance(cur_method, list) else cur_method
cur_meth_param_unchained = meth_parameters[i][j] if isinstance(meth_parameters[i], list) else meth_parameters[i]
if np.issubdtype(dataSetHandler.get_train_targets()[0][config.colname_target_corr].dtype, np.number):
if len(meth_parameters[i])> 0:
d[str(cur_method)] = det_evaluate_method(
cur_method_unchained,
dataSetHandler,
config,
*cur_meth_param_unchained)
else:
d[str(cur_method)] = det_evaluate_method(
cur_method_unchained,
dataSetHandler,
config)
# if more than one run is executed
if run_len > 1:
if j > 0:
for cur_cv_key in d[str(cur_method)].keys():
d[str(cur_method)][cur_cv_key]["train"] = np.maximum(d[str(cur_method)][cur_cv_key]["train"], int_results[cur_cv_key]["train"])
int_results[cur_cv_key]["train"] = np.maximum(d[str(cur_method)][cur_cv_key]["train"], int_results[cur_cv_key]["train"])
d[str(cur_method)][cur_cv_key]["test"] = np.maximum(d[str(cur_method)][cur_cv_key]["test"], int_results[cur_cv_key]["test"])
int_results[cur_cv_key]["test"] = np.maximum(d[str(cur_method)][cur_cv_key]["test"], int_results[cur_cv_key]["test"])
if j == 0:
int_results = d[str(cur_method)]
# evaluate models
for cur_ml_fct in d.keys():
for cur_cv_set in d[cur_ml_fct].keys():
if cur_cv_set == 'ml_model':
continue
train_data = {
f"{config.colname_raw}": dataSetHandler.get_train_features()[int(cur_cv_set)][config.colname_raw],
f"{config.colname_target_det}": dataSetHandler.get_train_targets()[int(cur_cv_set)][config.colname_target_det],
#f"hasGoodDQ_{cur_ml_fct}": dataSetHandler.get_train_targets()[int(cur_cv_set)][detMLConfig.colname_target_det],
f"hasGoodDQ_{cur_ml_fct}": pd.Series(d[cur_ml_fct][int(cur_cv_set)]["train"]),
"error_class": dataSetHandler.get_train_features()[int(cur_cv_set)]["error_class"],
f"{config.colname_isEvent}": dataSetHandler.get_train_features()[int(cur_cv_set)][f"{config.colname_isEvent}"],
f"{config.colname_target_corr}": dataSetHandler.get_train_targets()[int(cur_cv_set)][f"{config.colname_target_corr}"]
}
getEvaluation(
df=pd.DataFrame(train_data),
target_pred=f"hasGoodDQ_{cur_ml_fct}",
model_name = f"{cur_ml_fct}",
data_set_name = "train",
Config= config,
evaluationHandler= evaluationHandler,
cv = cur_cv_set)
test_data = {
f"{config.colname_raw}": dataSetHandler.get_test_features()[int(cur_cv_set)][config.colname_raw],
f"{config.colname_target_det}": dataSetHandler.get_test_targets()[int(cur_cv_set)][config.colname_target_det],
#f"hasGoodDQ_{cur_ml_fct}": dataSetHandler.get_test_targets()[int(cur_cv_set)][detMLConfig.colname_target_det],
f"hasGoodDQ_{cur_ml_fct}": pd.Series(d[cur_ml_fct][int(cur_cv_set)]["test"]),
"error_class": dataSetHandler.get_test_features()[int(cur_cv_set)]["error_class"],
f"{config.colname_isEvent}": dataSetHandler.get_test_features()[int(cur_cv_set)][f"{config.colname_isEvent}"],
f"{config.colname_target_corr}": dataSetHandler.get_test_targets()[int(cur_cv_set)][f"{config.colname_target_corr}"]
}
getEvaluation(
df=pd.DataFrame(test_data),
target_pred=f"hasGoodDQ_{cur_ml_fct}",
model_name = f"{cur_ml_fct}",
data_set_name = "test",
Config= config,
evaluationHandler= evaluationHandler,
cv = cur_cv_set)
d.update({'isCV': config.cross_validation})
d.update({'train_test_IDs': config.train_test_IDs})
return d, evaluationHandler
def feature_generation(df):
# Wrapping feature generation
# feature - generaation function as parameter(usage of different features)?
# df_fea = feature_generation(df)
return df
def wrapper_model_application(ml_fct_list, df):
# hier: Nutzen von gespeicherten Modellen
#
return pd.Series()
def add_eval_of_model_reg(config, model_name, data_set_name, cv, y_gt, y_pred):
# drop na's
df = pd.DataFrame([y_gt, y_pred], index=["y_gt", "y_pred"]).transpose().dropna()
y_gt = df["y_gt"]
y_pred = df["y_pred"]
new_row = {
"ID": config.colname_raw,
"model": model_name,
"data_set": data_set_name,
"cv": cv,
"mse": mean_squared_error(y_gt, y_pred, squared=True) if not y_gt.empty else np.nan,
"mse-rel-root": mean_squared_error(y_gt, y_pred, sample_weight = relevance_fct_root(y_gt)) if sum(relevance_fct_root(y_gt))> 0 else np.nan,
"mse-rel-ind 1.25": mean_squared_error(y_gt, y_pred, sample_weight = relevance_fct_indicator(y_gt, (1.25, float("inf")))) if sum(relevance_fct_indicator(y_gt, (1.25, float("inf"))))> 0 else np.nan,
"ind1-rel-ind 1.25": indicator_error_relevance(y_gt, y_pred, (0, 1), relevance_fct_indicator, (1.25,float("inf"))),
"rate-rel-ind 1.25": rate_error_relevance(y_gt, y_pred, relevance_fct_indicator, (1.25,float("inf"))),
"rmse": mean_squared_error(y_gt, y_pred, squared=False) if not y_gt.empty else np.nan,
"mae": mean_absolute_error(y_gt, y_pred) if not y_gt.empty else np.nan,
"nse": NSE(y_pred, y_gt),
"n_obs": len(y_gt),
#"abs_vol_error": abs_vol_error(y_pred, y_gt),
#"rel_vol_error": rel_vol_error(y_pred, y_gt),
"sera": squared_error_relevance_area(y_gt, y_pred, relevance_fct_root(y_gt))[0]
}
# add row to model_errorclass_eval of detMLConfig class instance
return new_row
def getEvaluation_reg(df, colname_pred, model_name, data_set_name, config, evaluationHandler, cv ="-"):
#all data
new_row = add_eval_of_model_reg(config, model_name, data_set_name, cv, df[config.colname_target_corr], df[colname_pred])
evaluationHandler.append_row_to_corr_eval(new_row)
# compute confusion matrix values for indicator function, value-based
if np.issubdtype(df[config.colname_target_corr].dtype, np.number):
relevance_series = relevance_fct_indicator(df[config.colname_target_corr], (1.25, float("inf")))
new_row = add_eval_of_model_reg(config, model_name, f"r-1.25 {data_set_name}", cv,
df[config.colname_target_corr][relevance_series > 0.0], df[colname_pred][relevance_series > 0.0])
evaluationHandler.append_row_to_corr_eval(new_row)
# compute event-based
if config.colname_isEvent:
relevance_series = df[config.colname_isEvent].astype(int)#.reset_index(drop=True)
new_row = add_eval_of_model_reg(config, model_name, f"isEvent {data_set_name}", cv,
df[config.colname_target_corr][relevance_series > 0.0], df[colname_pred][relevance_series > 0.0])
evaluationHandler.append_row_to_corr_eval(new_row)
def isIncorrect_evaluate(series_isCorrect, series_raw, series_target):
return (series_isCorrect == False) | (series_raw != series_target)
def corr_evaluate_method(func, dataSetHandler, config, dataSetHandlerColumn_errorDetection, *args):
d_part = {}
for i in range(dataSetHandler.list_len):
d_part[i] = {}
for i in range(dataSetHandler.list_len):
if "STAT" in f"{func}":
d_part[i]["train"] = func(
dataSetHandler.get_train_features()[i].copy(deep=True),#[config.colname_raw]
None,
dataSetHandlerColumn_errorDetection,
config,
*args)
d_part[i]["test"] = func(
dataSetHandler.get_test_features()[i].copy(deep=True),
None,
dataSetHandlerColumn_errorDetection,
config,
*args)
elif "ML" in f"{func}":
class_instance = func()
class_instance.fit(
dataSetHandler.get_train_features(exclude_columns=config.exclude_cols)[i].copy(deep=True),
dataSetHandler.get_train_targets(extract_single_target=config.colname_target_corr)[i].copy(
deep=True),
dataSetHandlerColumn_errorDetection,
config,
*args)
d_part[i]["train"] = class_instance.predict(
dataSetHandler.get_train_features(exclude_columns=config.exclude_cols)[i].copy(deep=True),
dataSetHandlerColumn_errorDetection,
config)
d_part[i]["test"] = class_instance.predict(
dataSetHandler.get_test_features(exclude_columns=config.exclude_cols)[i].copy(deep=True),
dataSetHandlerColumn_errorDetection,
config)
if config.corr_ML_model_savefolder:
match = re.search(r"<class '[^']*\.(\w+)'>", f"{func}")
if match:
short_funcname = match.group(1)
else:
short_funcname = "model"
if not os.path.exists(f"{config.corr_ML_model_savefolder}\\{config.colname_raw}"):
os.makedirs(f"{config.corr_ML_model_savefolder}\\{config.colname_raw}")
class_instance.save_model(f"{config.corr_ML_model_savefolder}\\{config.colname_raw}\\{short_funcname}_cv{i}.pkl")
data = {"train_event_absnr": [dataSetHandler.train_event_absnr[i]],
"train_event_rel": [dataSetHandler.train_eventrates[i]],
"train_timerange": [f"{dataSetHandler.get_train_features()[i].index.min(), dataSetHandler.get_train_features()[i].index.max()}"]
}
pd.DataFrame(data).to_csv(f"{config.corr_ML_model_savefolder}\\{config.colname_raw}\\additional_info_cv{i}.csv", index=False)
elif "BASE" in f"{func}":
d_part[i]["train"] = func(
dataSetHandler.get_train_features()[i].copy(deep=True),
dataSetHandler.get_train_targets()[i].copy(deep=True),
config,
*args)
d_part[i]["test"] = func(
dataSetHandler.get_test_features()[i].copy(deep=True),
dataSetHandler.get_test_targets()[i].copy(deep=True),
config,
*args)
else:
print(f"{func} not available.")
pass
return d_part
[docs]def correction_evaluation_wrapper(config,
dataSetHandler,
dataSetHandlerColumn_errorDetection,
corr_ML_methods=[],
corr_stat_methods=[],
):
r"""
Wraps error correction modeling and evaluation, applying machine learning and statistical methods for
correcting detected errors. This function manages the application of correction models, performs cross-validation if
configured, and stores the results for further evaluation.
Parameters
----------
config : class instance of Config
A mutable class instance that contains configuration settings for correction models, such as column names,
cross-validation flags, and other relevant options.
dataSetHandler : class instance of DataSetHandler
An instance responsible for storing and handling the dataset, managing both the feature and target data
for training and testing.
dataSetHandlerColumn_errorDetection : str
The column in the dataset that marks whether an error has been detected. This column is used to identify
erroneous entries for correction.
corr_ML_methods : list, optional
A list of machine learning methods to be applied for error correction modeling (default is an empty list).
corr_stat_methods : list, optional
A list of statistical methods to be applied for error correction modeling (default is an empty list).
Returns
-------
d : dict
A nested dictionary where each key corresponds to a correction method (either machine learning or statistical).
The value is a sub-dictionary that contains the train and test predictions, the corrected target values,
and other evaluation results.
evaluationHandler : EvaluationHandler
An instance of `EvaluationHandler` that contains the evaluation results for the applied correction models.
Notes
-----
- For each correction method, the results from multiple runs (if applicable) are combined using averaging for
both train and test datasets.
"""
# initialization of returned parameters
d = {}
evaluationHandler = EvaluationHandler(config, dataSetHandler)
'''
for cur_method in detMLConfig.corr_ML_methods + detMLConfig.corr_stat_methods:
cur_method = cur_method # f"{cur_method}".split(" ")[1]
d[cur_method] = {}
for i in range(dataSetHandler.list_len):
d[cur_method][i] = {}
'''
d["add_info"] = {}
pred_dict = {}
for i in range(dataSetHandler.list_len):
d["add_info"][i] = 1
#meth_functions = [cur_meth[0] if isinstance(cur_meth, tuple) else str(cur_meth) for cur_meth in
# config.corr_stat_methods + config.corr_ML_methods]
#meth_parameters = [cur_meth[1:] if isinstance(cur_meth, tuple) else () for cur_meth in
# config.corr_stat_methods + config.corr_ML_methods]
meth_functions, meth_parameters = extract_functions_and_parameters(corr_stat_methods + corr_ML_methods)
for i, cur_method in enumerate(meth_functions):
run_len = len(cur_method) if isinstance(cur_method, list) else 1
for j in range(run_len):
cur_method_unchained = cur_method[j] if isinstance(cur_method, list) else cur_method
cur_meth_param_unchained = meth_parameters[i][j] if isinstance(meth_parameters[i], list) else \
meth_parameters[i]
if len(meth_parameters[i]) > 0:
d[str(cur_method)] = corr_evaluate_method(
cur_method_unchained,
dataSetHandler,
config,
dataSetHandlerColumn_errorDetection,
*cur_meth_param_unchained)
else:
d[str(cur_method)] = corr_evaluate_method(
cur_method_unchained,
dataSetHandler,
config,
dataSetHandlerColumn_errorDetection)
# if more than one run is executed
if run_len > 1:
if j > 0:
for cur_cv_key in d[str(cur_method)].keys():
d[str(cur_method)][cur_cv_key]["train"] = pd.concat([d[str(cur_method)][cur_cv_key]["train"], int_results[cur_cv_key]["train"]], axis = 1).mean(axis = 1)
int_results[cur_cv_key]["train"] = pd.concat([d[str(cur_method)][cur_cv_key]["train"], int_results[cur_cv_key]["train"]], axis = 1).mean(axis = 1)
d[str(cur_method)][cur_cv_key]["test"] = pd.concat([d[str(cur_method)][cur_cv_key]["test"], int_results[cur_cv_key]["test"]], axis = 1).mean(axis = 1)
int_results[cur_cv_key]["test"] = pd.concat([d[str(cur_method)][cur_cv_key]["test"], int_results[cur_cv_key]["test"]], axis = 1).mean(axis = 1)
if j == 0:
int_results = d[str(cur_method)]
# evaluate models
for cur_ml_fct in d.keys():
if cur_ml_fct in ["add_info"]:
continue
for cur_cv_set in d[cur_ml_fct].keys():
if cur_cv_set in ['ml_model']:
continue
print(cur_ml_fct)
evaluate_train = isIncorrect_evaluate(series_isCorrect = ~dataSetHandler.get_train_features()[int(cur_cv_set)][dataSetHandlerColumn_errorDetection],
series_raw = dataSetHandler.get_train_features()[int(cur_cv_set)][config.colname_raw],
series_target = dataSetHandler.get_train_targets()[int(cur_cv_set)][config.colname_target_corr])
train_data = {
f"{config.colname_target_corr}": dataSetHandler.get_train_targets()[int(cur_cv_set)][evaluate_train][config.colname_target_corr],
#f"corr_{cur_ml_fct}": dataSetHandler.get_train_targets()[int(cur_cv_set)][evaluate_train][config.colname_target_corr],
f"corr_{cur_ml_fct}": pd.Series(d[cur_ml_fct][int(cur_cv_set)]["train"][evaluate_train]),
"error_class": dataSetHandler.get_train_features()[int(cur_cv_set)][evaluate_train]["error_class"],
f"{config.colname_isEvent}": dataSetHandler.get_train_features()[int(cur_cv_set)][evaluate_train][config.colname_isEvent],
}
getEvaluation_reg(
df=pd.DataFrame(train_data),
colname_pred=f"corr_{cur_ml_fct}",
model_name=f"{cur_ml_fct}",
data_set_name="train",
config=config,
evaluationHandler=evaluationHandler,
cv=cur_cv_set)
evaluate_test = isIncorrect_evaluate(
series_isCorrect=~dataSetHandler.get_test_features()[int(cur_cv_set)][dataSetHandlerColumn_errorDetection],
series_raw=dataSetHandler.get_test_features()[int(cur_cv_set)][config.colname_raw],
series_target=dataSetHandler.get_test_targets()[int(cur_cv_set)][config.colname_target_corr])
test_data = {
f"{config.colname_target_corr}": dataSetHandler.get_test_targets()[int(cur_cv_set)][evaluate_test][
config.colname_target_corr],
#f"corr_{cur_ml_fct}": dataSetHandler.get_test_targets()[int(cur_cv_set)][evaluate_test][config.colname_target_corr],
f"corr_{cur_ml_fct}": pd.Series(d[cur_ml_fct][int(cur_cv_set)]["test"][evaluate_test]),
"error_class": dataSetHandler.get_test_features()[int(cur_cv_set)][evaluate_test]["error_class"],
f"{config.colname_isEvent}": dataSetHandler.get_test_features()[int(cur_cv_set)][evaluate_test][
config.colname_isEvent],
}
getEvaluation_reg(
df=pd.DataFrame(test_data),
colname_pred=f"corr_{cur_ml_fct}",
model_name=f"{cur_ml_fct}",
data_set_name="test",
config=config,
evaluationHandler=evaluationHandler,
cv=cur_cv_set)
d.update({'isCV': config.cross_validation})
d.update({'train_test_IDs': config.train_test_IDs})
return d, evaluationHandler