+from configparser import ConfigParser
+from csv import DictReader
+from datetime import datetime, timedelta
from itertools import chain
from logging import getLogger
from logging.config import fileConfig
+from os import listdir
from pathlib import Path
+from sklearn import preprocessing
import numpy as np
import pandas as pd
logger = getLogger()
class Preprocessing:
- def __init__(self, dict_features,
- start, end, timestep,
- features = None):
+ '''
+ Generate a pandas dataframe from a dictionary of features per datetime, which
+ respects the starting and ending dates of the study, and its precision (the
+ time step) as passed to the constructor. Missing feature values are completed.
+
+ - Missing datetimes are added first with np.NaN feature values,
+ - The dataframe is then constructed based on the filled feature dictionary,
+ - NaN values are then filled with last known values.
+ '''
+
+ def __init__(self, config_file = None,
+ dict_features = None, dict_target = None):
+ '''
+ Constructor that defines all needed attributes and collects features.
+ '''
+ self._config = config_file
+
+ self._start = datetime.strptime(self._config['DATETIME']['start'],
+ '%m/%d/%Y %H:%M:%S')
+ self._end = datetime.strptime(self._config['DATETIME']['end'],
+ '%m/%d/%Y %H:%M:%S')
+ self._timestep = timedelta(hours =
+ self._config['DATETIME'].getfloat('hourStep'))
self._dict_features = dict_features
- self._start = start
- self._end = end
- self._timestep = timestep
+ self._dict_target = dict_target
+
+ self._full_dict = None
self._dataframe = None
+ self._datetimes = []
- if features != None:
- self._features = features
- else:
- self._features = set(chain.from_iterable([tuple(u.keys())
+ self._features = set(chain.from_iterable([tuple(u.keys())
for u in [*dict_features.values()]]))
+ feature_files = Path.cwd() / 'config' / 'features'
+ self._features = {feat : {'numerical': False} for feat in self._features}
+ for feature_file in listdir(feature_files):
+ if feature_file.endswith('csv'):
+ with open(feature_files / feature_file , "r") as f:
+ reader = DictReader(f, delimiter=',')
+ typed_names = {row['name']: row['type'] for row in reader}
+ for feature in self._features:
+ if feature.split('_')[0] in typed_names:
+ self._features[feature]['type'] = int(typed_names[feature.split('_')[0]])
+ elif feature_file.endswith('cfg'):
+ config = ConfigParser()
+ config.read(feature_files / feature_file)
+ for section in config:
+ if config.has_option(section, 'numerical'):
+ self._features[section]['numerical'] = config[section].getboolean('numerical')
+
+ self._numerical_columns = [k for k in self._features if self._features[k]['type'] == 1
+ or (self._features[k]['type'] == 3 and self._features[k]['numerical'])]
+
+ self._categorical_columns = [k for k in self._features if self._features[k]['type'] == 2
+ or (self._features[k]['type'] == 3 and not self._features[k]['numerical'])]
+
+
+
+ @property
+ def start(self):
+ return self._start
+
+ @start.setter
+ def start(self, x):
+ self._start = x
+
+
+ @property
+ def end(self):
+ return self._end
+
+ @end.setter
+ def end(self, x):
+ self._end = x
+
+
+ @property
+ def timestep(self):
+ return self._timestep
+
+ @timestep.setter
+ def timestep(self, x):
+ self._timestep = x
+
def _fill_dict(self):
+ '''
+ Add datetime keys in the dated feature dictionary that are missing. The
+ features are then set to np.NaN. Add missing features in existing datetimes
+ too.
+ '''
+ logger.info("Adding missing dates and filling missing features with NaN values")
current = self._start
while current <= self._end:
+ self._datetimes.append(current)
if current not in self._dict_features:
- self._dict_features[current] = {feature:np.NaN for feature in self._features}
+ self._dict_features[current] = {feature:np.NaN
+ for feature in self._features}
else:
- null_dict = {feature:np.NaN for feature in self._features}
+ null_dict = {feature:np.NaN
+ for feature in self._features}
null_dict.update(self._dict_features[current])
self._dict_features[current] = null_dict
current += self._timestep
+ for k in self._dict_features:
+ null_dict = {feature:np.NaN
+ for feature in self._features}
+ null_dict.update(self._dict_features[k])
+ self._dict_features[k] = null_dict
+
+ self._full_dict = {k: self._dict_features[k]
+ for k in sorted(self._dict_features.keys())}
+
@property
def full_dict(self):
- self._fill_dict()
- return {k: self._dict_features[k] for k in sorted(self._dict_features.keys())}
+ '''
+ Returns the fully filled dated feature dictionary, ordered by datetimes
+ '''
+ if self._full_dict is None:
+ self._fill_dict()
+ return self._full_dict
+
+
+ def _fill_nan(self):
+ '''
+ Fill NaN values, either by propagation or by interpolation (linear or splines)
+ '''
+ logger.info("Filling NaN numerical values in the feature dataframe")
+ # We interpolate (linearly or with splines) only numerical columns
+ # The interpolation
+ if self._config['PREPROCESSING']['fill_method'] == 'propagate':
+ self._dataframe[self._numerical_columns] =\
+ self._dataframe[self._numerical_columns].fillna(method='ffill')
+ elif self._config['PREPROCESSING']['fill_method'] == 'linear':
+ self._dataframe[self._numerical_columns] =\
+ self._dataframe[self._numerical_columns].interpolate()
+ elif self._config['PREPROCESSING']['fill_method'] == 'spline':
+ self._dataframe[self._numerical_columns] =\
+ self._dataframe[self._numerical_columns].interpolate(method='spline',
+ order=self._config['PREPROCESSING'].getint('order'))
+
+ # For the categorical columns, NaN values are filled by duplicating
+ # the last known value (forward fill method)
+ logger.info("Filling NaN categorical values in the feature dataframe")
+ self._dataframe[self._categorical_columns] =\
+ self._dataframe[self._categorical_columns].fillna(method='ffill')
+
+ # Uncomment this line to fill NaN values at the beginning of the
+ # dataframe. This may not be a good idea, especially for features
+ # that are available only for recent years, e.g., air quality
+ #self._dataframe = self._dataframe.fillna(method='bfill')
+
+ # Dropping rows that are not related to our datetime window (start/
+ # step / end)
+ logger.info("Dropping rows that are not related to our datetime window")
+ self._dataframe['datetime'] =\
+ self._dataframe.apply(lambda x: datetime(int(x.year), int(x.month), int(x.dayInMonth), int(x.hour)), axis=1)
+ self._dataframe['row_ok'] =\
+ self._dataframe.apply(lambda x:x.datetime in self._datetimes, axis=1)
+ self._dataframe = self._dataframe[self._dataframe['row_ok']]
+ self._dataframe = self._dataframe.drop(['datetime', 'row_ok'], axis=1)
+ logger.info("Rows dropped")
+
+
+ def _add_history(self):
+ '''
+ Integrating previous nb of interventions as features
+ '''
+ logger.info("Integrating previous nb of interventions as features")
+ nb_lines = self._config['HISTORY_KNOWLEDGE'].getint('nb_lines')
+ for k in range(1,nb_lines+1):
+ name = 'history_'+str(nb_lines-k+1)
+ self._dataframe[name] = [np.NaN]*k + list(self._dict_target.values())[:-k]
+ self._numerical_columns.append(name)
+ self._dataframe = self._dataframe[nb_lines:]
+
+
+
+ def _standardize(self):
+ '''
+ Normalizing numerical features
+ '''
+ logger.info("Standardizing numerical values in the feature dataframe")
+ # We operate only on numerical columns
+ self._dataframe[self._numerical_columns] =\
+ preprocessing.scale(self._dataframe[self._numerical_columns])
+
+
+
+ def _one_hot_encoding(self):
+ '''
+ Apply a one hot encoding for category features
+ '''
+ logger.info("One hot encoding for categorical feature")
+
+ # We store numerical columns
+ df_out = pd.DataFrame()
+ for col in self._numerical_columns:
+ df_out[col] = self._dataframe[col]
+ # The one hot encoding
+ for col in self._categorical_columns:
+ pd1 = pd.get_dummies(self._dataframe[col],prefix=col)
+ for col1 in pd1.columns:
+ df_out[col1] = pd1[col1]
+ self._dataframe = df_out
@property
def dataframe(self):
+ '''
+ Returns the feature dataframe, after creating it if needed.
+ '''
if self._dataframe is None:
- self._dataframe = pd.DataFrame.from_dict(self.full_dict, orient='index')
+ logger.info("Creating feature dataframe from feature dictionary")
+ self._dataframe = pd.DataFrame.from_dict(self.full_dict,
+ orient='index')
+ # Dealing with NaN values
+ self._fill_nan()
+ # Adding previous (historical) nb_interventions as features
+ self._add_history()
+ # Normalizing numerical values
+ self._standardize()
+ # Dealing with categorical features
+ self._one_hot_encoding()
return self._dataframe
+
@dataframe.setter
def dataframe(self, df):
self._dataframe = df
- def fill_na(self):
- self.dataframe = self.dataframe.fillna(method='ffill')
\ No newline at end of file