]> AND Private Git Repository - predictops.git/blobdiff - predictops/learn/preprocessing.py
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Reducing the computation time and adding holidays features
[predictops.git] / predictops / learn / preprocessing.py
index 5400d1d39f1135ce5e2abcfec2541201cf5d8ed6..885aad3393979b897e3e0d8c40f3378dbba08e5a 100644 (file)
@@ -1,9 +1,12 @@
 from configparser import ConfigParser
 from configparser import ConfigParser
+from csv import DictReader
 from datetime import datetime, timedelta
 from itertools import chain
 from logging import getLogger
 from logging.config import fileConfig
 from datetime import datetime, timedelta
 from itertools import chain
 from logging import getLogger
 from logging.config import fileConfig
+from os import listdir
 from pathlib import Path
 from pathlib import Path
+from sklearn import preprocessing
 
 import numpy as np
 import pandas as pd
 
 import numpy as np
 import pandas as pd
@@ -22,12 +25,12 @@ class Preprocessing:
      - NaN values are then filled with last known values.
     '''
 
      - NaN values are then filled with last known values.
     '''
 
-    def __init__(self, config_file = None, dict_features = None, features = None):
+    def __init__(self, config_file = None,
+                 dict_features = None, dict_target = None):
         '''
         Constructor that defines all needed attributes and collects features.
         '''
         '''
         Constructor that defines all needed attributes and collects features.
         '''
-        self._config = ConfigParser()
-        self._config.read(config_file)
+        self._config = config_file
 
         self._start = datetime.strptime(self._config['DATETIME']['start'],
                                         '%m/%d/%Y %H:%M:%S')
 
         self._start = datetime.strptime(self._config['DATETIME']['start'],
                                         '%m/%d/%Y %H:%M:%S')
@@ -36,17 +39,33 @@ class Preprocessing:
         self._timestep = timedelta(hours =
                                    self._config['DATETIME'].getfloat('hourStep'))
         self._dict_features = dict_features
         self._timestep = timedelta(hours =
                                    self._config['DATETIME'].getfloat('hourStep'))
         self._dict_features = dict_features
+        self._dict_target = dict_target
+
         self._full_dict = None
         self._dataframe = None
         self._datetimes = []
         self._full_dict = None
         self._dataframe = None
         self._datetimes = []
-        # If features are not provided to the constructor, then we collect
-        # any existing feature in the dictionary
-        if features != None:
-            self._features = features
-        else:
-            self._features = set(chain.from_iterable([tuple(u.keys())
+
+        self._features = set(chain.from_iterable([tuple(u.keys())
                                                       for u in [*dict_features.values()]]))
 
                                                       for u in [*dict_features.values()]]))
 
+        #feature_files = Path.cwd() / 'config' / 'features'
+        self._features = {feat : {'numerical': False, 'categorical': False}
+                          for feat in self._features}
+
+        for feature in self._config['FEATURES']:
+            if self._config['FEATURES'][feature]:
+                feature_file = self._config['FEATURE_CONFIG'][feature]
+                config = ConfigParser()
+                config.read(feature_file)
+                for section in config:
+                    if config.has_option(section, 'numerical'):
+                        self._features[section]['numerical'] = config[section].getboolean('numerical')
+                        self._features[section]['categorical'] = config[section].getboolean('categorical')
+
+        self._numerical_columns = [k for k in self._features if self._features[k]['numerical']]
+        self._categorical_columns = [k for k in self._features if self._features[k]['categorical']]
+
+
 
     @property
     def start(self):
 
     @property
     def start(self):
@@ -115,6 +134,88 @@ class Preprocessing:
         return self._full_dict
 
 
         return self._full_dict
 
 
+    def _fill_nan(self):
+        '''
+        Fill NaN values, either by propagation or by interpolation (linear or splines)
+        '''
+        logger.info("Filling NaN numerical values in the feature dataframe")
+        # We interpolate (linearly or with splines) only numerical columns
+        # The interpolation
+        if self._config['PREPROCESSING']['fill_method'] == 'propagate':
+            self._dataframe[self._numerical_columns] =\
+                self._dataframe[self._numerical_columns].fillna(method='ffill')
+        elif self._config['PREPROCESSING']['fill_method'] == 'linear':
+            self._dataframe[self._numerical_columns] =\
+                self._dataframe[self._numerical_columns].interpolate()
+        elif self._config['PREPROCESSING']['fill_method'] == 'spline':
+            self._dataframe[self._numerical_columns] =\
+                self._dataframe[self._numerical_columns].interpolate(method='spline',
+                     order=self._config['PREPROCESSING'].getint('order'))
+
+        # For the categorical columns, NaN values are filled by duplicating
+        # the last known value (forward fill method)
+        logger.info("Filling NaN categorical values in the feature dataframe")
+        self._dataframe[self._categorical_columns] =\
+            self._dataframe[self._categorical_columns].fillna(method='ffill')
+
+        # Uncomment this line to fill NaN values at the beginning of the
+        # dataframe. This may not be a good idea, especially for features
+        # that are available only for recent years, e.g., air quality
+        #self._dataframe = self._dataframe.fillna(method='bfill')
+
+        # Dropping rows that are not related to our datetime window (start/
+        # step / end)
+        logger.info("Dropping rows that are not related to our datetime window")
+        dates = tuple((x.year, x.month, x.day, x.hour) for x in self._datetimes)
+        self._dataframe['row_ok'] =\
+            self._dataframe.apply(lambda x: (int(x.year), int(x.month), int(x.dayInMonth), int(x.hour)) in dates, axis=1)
+        self._dataframe = self._dataframe[self._dataframe['row_ok']]
+        self._dataframe = self._dataframe.drop(['row_ok'], axis=1)
+        logger.info("Rows dropped")
+
+
+    def _add_history(self):
+        '''
+        Integrating previous nb of interventions as features
+        '''
+        logger.info("Integrating previous nb of interventions as features")
+        nb_lines = self._config['HISTORY_KNOWLEDGE'].getint('nb_lines')
+        for k in range(1,nb_lines+1):
+            name = 'history_'+str(nb_lines-k+1)
+            self._dataframe[name] = [np.NaN]*k + list(self._dict_target.values())[:-k]
+            self._numerical_columns.append(name)
+        self._dataframe = self._dataframe[nb_lines:]
+
+
+
+    def _standardize(self):
+        '''
+        Normalizing numerical features
+        '''
+        logger.info("Standardizing numerical values in the feature dataframe")
+        # We operate only on numerical columns
+        self._dataframe[self._numerical_columns] =\
+            preprocessing.scale(self._dataframe[self._numerical_columns])
+
+
+
+    def _one_hot_encoding(self):
+        '''
+        Apply a one hot encoding for category features
+        '''
+        logger.info("One hot encoding for categorical feature")
+
+        # We store numerical columns
+        df_out = pd.DataFrame()
+        for col in  self._numerical_columns:
+            df_out[col] = self._dataframe[col]
+        # The one hot encoding
+        for col in self._categorical_columns:
+            pd1 = pd.get_dummies(self._dataframe[col],prefix=col)
+            for col1 in pd1.columns:
+                df_out[col1] = pd1[col1]
+        self._dataframe = df_out
+
 
     @property
     def dataframe(self):
 
     @property
     def dataframe(self):
@@ -125,20 +226,14 @@ class Preprocessing:
             logger.info("Creating feature dataframe from feature dictionary")
             self._dataframe = pd.DataFrame.from_dict(self.full_dict,
                                                      orient='index')
             logger.info("Creating feature dataframe from feature dictionary")
             self._dataframe = pd.DataFrame.from_dict(self.full_dict,
                                                      orient='index')
-            logger.info("Filling NaN values in the feature dataframe")
-
-            if self._config['PREPROCESSING']['fill_method'] == 'propagate':
-                self._dataframe = self._dataframe.fillna(method='ffill')
-            elif self._config['PREPROCESSING']['fill_method'] == 'linear':
-                self._dataframe = self._dataframe.interpolate()
-            elif self._config['PREPROCESSING']['fill_method'] == 'spline':
-                self._dataframe = self._dataframe.interpolate(method='spline',
-                                                              order=self._config['PREPROCESSING'].getint('order'))
-            self._dataframe = self._dataframe.fillna(method='bfill')
-
-            self._dataframe = self._dataframe.drop([k.to_pydatetime()
-                                                   for k in self._dataframe.T
-                                                   if k not in self._datetimes])
+            # Dealing with NaN values
+            self._fill_nan()
+            # Adding previous (historical) nb_interventions as features
+            self._add_history()
+            # Normalizing numerical values
+            self._standardize()
+            # Dealing with categorical features
+            self._one_hot_encoding()
         return self._dataframe
 
 
         return self._dataframe