]> AND Private Git Repository - predictops.git/blobdiff - predictops/learn/preprocessing.py
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Refactoring, fin du lever/coucher de soleil, et début de sentinelles
[predictops.git] / predictops / learn / preprocessing.py
index 833e48316bffa1c51affc6885210b71f61b2c1d1..55cffbd2a0e094a610580e1e4dbcdf8adc80de5d 100644 (file)
@@ -1,7 +1,9 @@
+from configparser import ConfigParser
 from itertools import chain
 from logging import getLogger
 from logging.config import fileConfig
 from pathlib import Path
+from sklearn import preprocessing
 
 import numpy as np
 import pandas as pd
@@ -9,6 +11,7 @@ import pandas as pd
 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
 logger = getLogger()
 
+
 class Preprocessing:
     '''
     Generate a pandas dataframe from a dictionary of features per datetime, which
@@ -18,30 +21,73 @@ class Preprocessing:
      - Missing datetimes are added first with np.NaN feature values,
      - The dataframe is then constructed based on the filled feature dictionary,
      - NaN values are then filled with last known values.
-
     '''
-    def __init__(self, dict_features,
-                 start, end, timestep,
-                 features = None):
+
+    def __init__(self, config_file=None,
+                 start=None, end=None, timestep=None,
+                 dict_features=None, dict_target=None):
         '''
         Constructor that defines all needed attributes and collects features.
         '''
-        logger.info("Entering  NaN values in the feature dataframe")
-        self._dict_features = dict_features
+        self._config = config_file
+
         self._start = start
         self._end = end
         self._timestep = timestep
+        self._dict_features = dict_features
+        self._dict_target = dict_target
+
         self._full_dict = None
         self._dataframe = None
         self._datetimes = []
-        # If features are not provided to the constructor, then we collect
-        # any existing feature in the dictionary
-        if features != None:
-            self._features = features
-        else:
-            self._features = set(chain.from_iterable([tuple(u.keys())
-                                                      for u in [*dict_features.values()]]))
 
+        self._features = set(chain.from_iterable([tuple(u.keys())
+                                                  for u in [*dict_features.values()]]))
+
+        #feature_files = Path.cwd() / 'config' / 'features'
+        self._features = {feat: {'numerical': False, 'categorical': False}
+                          for feat in self._features}
+
+        for feature in self._config['FEATURES']:
+            if self._config['FEATURES'][feature]:
+                feature_file = self._config['FEATURE_CONFIG'][feature]
+                config = ConfigParser()
+                config.read(eval(feature_file))
+                for section in config:
+                    if config.has_option(section, 'numerical'):
+                        for feature in self._features:
+                            if feature.split('_')[0] == section:
+                                self._features[feature]['binary'] = config[section].getboolean('binary')
+                                self._features[feature]['categorical'] = config[section].getboolean('categorical')
+                                self._features[feature]['numerical'] = config[section].getboolean('numerical')
+
+        self._binary_columns = [k for k in self._features if self._features[k]['binary']]
+        self._categorical_columns = [k for k in self._features if self._features[k]['categorical']]
+        self._numerical_columns = [k for k in self._features if self._features[k]['numerical']]
+
+    @property
+    def start(self):
+        return self._start
+
+    @start.setter
+    def start(self, x):
+        self._start = x
+
+    @property
+    def end(self):
+        return self._end
+
+    @end.setter
+    def end(self, x):
+        self._end = x
+
+    @property
+    def timestep(self):
+        return self._timestep
+
+    @timestep.setter
+    def timestep(self, x):
+        self._timestep = x
 
     def _fill_dict(self):
         '''
@@ -54,16 +100,16 @@ class Preprocessing:
         while current <= self._end:
             self._datetimes.append(current)
             if current not in self._dict_features:
-                self._dict_features[current] = {feature:np.NaN
+                self._dict_features[current] = {feature: np.NaN
                                                 for feature in self._features}
             else:
-                null_dict = {feature:np.NaN
+                null_dict = {feature: np.NaN
                              for feature in self._features}
                 null_dict.update(self._dict_features[current])
                 self._dict_features[current] = null_dict
             current += self._timestep
         for k in self._dict_features:
-            null_dict = {feature:np.NaN
+            null_dict = {feature: np.NaN
                          for feature in self._features}
             null_dict.update(self._dict_features[k])
             self._dict_features[k] = null_dict
@@ -71,8 +117,6 @@ class Preprocessing:
         self._full_dict = {k: self._dict_features[k]
                            for k in sorted(self._dict_features.keys())}
 
-
-
     @property
     def full_dict(self):
         '''
@@ -82,7 +126,85 @@ class Preprocessing:
             self._fill_dict()
         return self._full_dict
 
+    def _fill_nan(self):
+        '''
+        Fill NaN values, either by propagation or by interpolation (linear or splines)
+        '''
+        logger.info("Filling NaN numerical values in the feature dataframe")
+        # We interpolate (linearly or with splines) only numerical columns
+        # The interpolation
+        if self._config['PREPROCESSING']['fill_method'] == 'propagate':
+            self._dataframe[self._numerical_columns] =\
+                self._dataframe[self._numerical_columns].fillna(method='ffill')
+        elif self._config['PREPROCESSING']['fill_method'] == 'linear':
+            self._dataframe[self._numerical_columns] =\
+                self._dataframe[self._numerical_columns].interpolate()
+        elif self._config['PREPROCESSING']['fill_method'] == 'spline':
+            self._dataframe[self._numerical_columns] =\
+                self._dataframe[self._numerical_columns].interpolate(method='spline',
+                                                                     order=self._config['PREPROCESSING'].getint('order'))
+
+        # For the categorical columns, NaN values are filled by duplicating
+        # the last known value (forward fill method)
+        logger.info("Filling NaN categorical values in the feature dataframe")
+        self._dataframe[self._categorical_columns] =\
+            self._dataframe[self._categorical_columns].fillna(method='ffill')
+
+        # Uncomment this line to fill NaN values at the beginning of the
+        # dataframe. This may not be a good idea, especially for features
+        # that are available only for recent years, e.g., air quality
+        #self._dataframe = self._dataframe.fillna(method='bfill')
+
+        # Dropping rows that are not related to our datetime window (start/
+        # step / end)
+        logger.info("Dropping rows that are not related to our datetime window")
+        dates = tuple((x.year, x.month, x.day, x.hour) for x in self._datetimes)
+        self._dataframe['row_ok'] =\
+            self._dataframe.apply(lambda x: (int(x.year), int(x.month), int(x.dayInMonth), int(x.hour)) in dates, axis=1)
+        self._dataframe = self._dataframe[self._dataframe['row_ok']]
+        self._dataframe = self._dataframe.drop(['row_ok'], axis=1)
+        logger.info("Rows dropped")
+
+    def _add_history(self):
+        '''
+        Integrating previous nb of interventions as features
+        '''
+        logger.info("Integrating previous nb of interventions as features")
+        nb_lines = eval(self._config['HISTORY_KNOWLEDGE']['nb_lines'])
+        for k in range(1, nb_lines + 1):
+            name = 'history_' + str(nb_lines - k + 1)
+            self._dataframe[name] = [np.NaN] * k + list(self._dict_target.values())[:-k]
+            self._numerical_columns.append(name)
+        self._dataframe = self._dataframe[nb_lines:]
+
+    def _standardize(self):
+        '''
+        Normalizing numerical features
+        '''
+        logger.info("Standardizing numerical values in the feature dataframe")
+        # We operate only on numerical columns
+        self._dataframe[self._numerical_columns] =\
+            preprocessing.scale(self._dataframe[self._numerical_columns])
 
+    def _one_hot_encoding(self):
+        '''
+        Apply a one hot encoding for category features
+        '''
+        logger.info("One hot encoding for categorical feature")
+        # We store numerical columns
+
+        df_out = pd.DataFrame()
+        for col in self._numerical_columns:
+            df_out[col] = self._dataframe[col]
+        # Idem for binary features
+        for col in self._binary_columns:
+            df_out[col] = self._dataframe[col]
+        # The one hot encoding
+        for col in self._categorical_columns:
+            pd1 = pd.get_dummies(self._dataframe[col], prefix=col)
+            for col1 in pd1.columns:
+                df_out[col1] = pd1[col1]
+        self._dataframe = df_out
 
     @property
     def dataframe(self):
@@ -93,13 +215,16 @@ class Preprocessing:
             logger.info("Creating feature dataframe from feature dictionary")
             self._dataframe = pd.DataFrame.from_dict(self.full_dict,
                                                      orient='index')
-            logger.info("Filling NaN values in the feature dataframe")
-            #TODO: add other filling methods like linear interpolation
-            self._dataframe = self._dataframe.fillna(method='ffill')
-            self._dataframe = self._dataframe.fillna(method='bfill')
-            self._dataframe = self._dataframe.drop([k.to_pydatetime()
-                                                   for k in self._dataframe.T
-                                                   if k not in self._datetimes])
+            # Dealing with NaN values
+            self._fill_nan()
+            # Adding previous (historical) nb_interventions as features
+            self._add_history()
+            # self._dataframe.to_csv('toto.csv')
+            # exit()
+            # Normalizing numerical values
+            self._standardize()
+            # Dealing with categorical features
+            self._one_hot_encoding()
         return self._dataframe
 
     @dataframe.setter