]> AND Private Git Repository - predictops.git/blob - predictops/learn/preprocessing.py
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
885aad3393979b897e3e0d8c40f3378dbba08e5a
[predictops.git] / predictops / learn / preprocessing.py
1 from configparser import ConfigParser
2 from csv import DictReader
3 from datetime import datetime, timedelta
4 from itertools import chain
5 from logging import getLogger
6 from logging.config import fileConfig
7 from os import listdir
8 from pathlib import Path
9 from sklearn import preprocessing
10
11 import numpy as np
12 import pandas as pd
13
14 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
15 logger = getLogger()
16
17 class Preprocessing:
18     '''
19     Generate a pandas dataframe from a dictionary of features per datetime, which
20     respects the starting and ending dates of the study, and its precision (the
21     time step) as passed to the constructor. Missing feature values are completed.
22
23      - Missing datetimes are added first with np.NaN feature values,
24      - The dataframe is then constructed based on the filled feature dictionary,
25      - NaN values are then filled with last known values.
26     '''
27
28     def __init__(self, config_file = None,
29                  dict_features = None, dict_target = None):
30         '''
31         Constructor that defines all needed attributes and collects features.
32         '''
33         self._config = config_file
34
35         self._start = datetime.strptime(self._config['DATETIME']['start'],
36                                         '%m/%d/%Y %H:%M:%S')
37         self._end = datetime.strptime(self._config['DATETIME']['end'],
38                                         '%m/%d/%Y %H:%M:%S')
39         self._timestep = timedelta(hours =
40                                    self._config['DATETIME'].getfloat('hourStep'))
41         self._dict_features = dict_features
42         self._dict_target = dict_target
43
44         self._full_dict = None
45         self._dataframe = None
46         self._datetimes = []
47
48         self._features = set(chain.from_iterable([tuple(u.keys())
49                                                       for u in [*dict_features.values()]]))
50
51         #feature_files = Path.cwd() / 'config' / 'features'
52         self._features = {feat : {'numerical': False, 'categorical': False}
53                           for feat in self._features}
54
55         for feature in self._config['FEATURES']:
56             if self._config['FEATURES'][feature]:
57                 feature_file = self._config['FEATURE_CONFIG'][feature]
58                 config = ConfigParser()
59                 config.read(feature_file)
60                 for section in config:
61                     if config.has_option(section, 'numerical'):
62                         self._features[section]['numerical'] = config[section].getboolean('numerical')
63                         self._features[section]['categorical'] = config[section].getboolean('categorical')
64
65         self._numerical_columns = [k for k in self._features if self._features[k]['numerical']]
66         self._categorical_columns = [k for k in self._features if self._features[k]['categorical']]
67
68
69
70     @property
71     def start(self):
72         return self._start
73
74     @start.setter
75     def start(self, x):
76         self._start = x
77
78
79     @property
80     def end(self):
81         return self._end
82
83     @end.setter
84     def end(self, x):
85         self._end = x
86
87
88     @property
89     def timestep(self):
90         return self._timestep
91
92     @timestep.setter
93     def timestep(self, x):
94         self._timestep = x
95
96
97     def _fill_dict(self):
98         '''
99         Add datetime keys in the dated feature dictionary that are missing. The
100         features are then set to np.NaN. Add missing features in existing datetimes
101         too.
102         '''
103         logger.info("Adding missing dates and filling missing features with NaN values")
104         current = self._start
105         while current <= self._end:
106             self._datetimes.append(current)
107             if current not in self._dict_features:
108                 self._dict_features[current] = {feature:np.NaN
109                                                 for feature in self._features}
110             else:
111                 null_dict = {feature:np.NaN
112                              for feature in self._features}
113                 null_dict.update(self._dict_features[current])
114                 self._dict_features[current] = null_dict
115             current += self._timestep
116         for k in self._dict_features:
117             null_dict = {feature:np.NaN
118                          for feature in self._features}
119             null_dict.update(self._dict_features[k])
120             self._dict_features[k] = null_dict
121
122         self._full_dict = {k: self._dict_features[k]
123                            for k in sorted(self._dict_features.keys())}
124
125
126
127     @property
128     def full_dict(self):
129         '''
130         Returns the fully filled dated feature dictionary, ordered by datetimes
131         '''
132         if self._full_dict is None:
133             self._fill_dict()
134         return self._full_dict
135
136
137     def _fill_nan(self):
138         '''
139         Fill NaN values, either by propagation or by interpolation (linear or splines)
140         '''
141         logger.info("Filling NaN numerical values in the feature dataframe")
142         # We interpolate (linearly or with splines) only numerical columns
143         # The interpolation
144         if self._config['PREPROCESSING']['fill_method'] == 'propagate':
145             self._dataframe[self._numerical_columns] =\
146                 self._dataframe[self._numerical_columns].fillna(method='ffill')
147         elif self._config['PREPROCESSING']['fill_method'] == 'linear':
148             self._dataframe[self._numerical_columns] =\
149                 self._dataframe[self._numerical_columns].interpolate()
150         elif self._config['PREPROCESSING']['fill_method'] == 'spline':
151             self._dataframe[self._numerical_columns] =\
152                 self._dataframe[self._numerical_columns].interpolate(method='spline',
153                      order=self._config['PREPROCESSING'].getint('order'))
154
155         # For the categorical columns, NaN values are filled by duplicating
156         # the last known value (forward fill method)
157         logger.info("Filling NaN categorical values in the feature dataframe")
158         self._dataframe[self._categorical_columns] =\
159             self._dataframe[self._categorical_columns].fillna(method='ffill')
160
161         # Uncomment this line to fill NaN values at the beginning of the
162         # dataframe. This may not be a good idea, especially for features
163         # that are available only for recent years, e.g., air quality
164         #self._dataframe = self._dataframe.fillna(method='bfill')
165
166         # Dropping rows that are not related to our datetime window (start/
167         # step / end)
168         logger.info("Dropping rows that are not related to our datetime window")
169         dates = tuple((x.year, x.month, x.day, x.hour) for x in self._datetimes)
170         self._dataframe['row_ok'] =\
171             self._dataframe.apply(lambda x: (int(x.year), int(x.month), int(x.dayInMonth), int(x.hour)) in dates, axis=1)
172         self._dataframe = self._dataframe[self._dataframe['row_ok']]
173         self._dataframe = self._dataframe.drop(['row_ok'], axis=1)
174         logger.info("Rows dropped")
175
176
177     def _add_history(self):
178         '''
179         Integrating previous nb of interventions as features
180         '''
181         logger.info("Integrating previous nb of interventions as features")
182         nb_lines = self._config['HISTORY_KNOWLEDGE'].getint('nb_lines')
183         for k in range(1,nb_lines+1):
184             name = 'history_'+str(nb_lines-k+1)
185             self._dataframe[name] = [np.NaN]*k + list(self._dict_target.values())[:-k]
186             self._numerical_columns.append(name)
187         self._dataframe = self._dataframe[nb_lines:]
188
189
190
191     def _standardize(self):
192         '''
193         Normalizing numerical features
194         '''
195         logger.info("Standardizing numerical values in the feature dataframe")
196         # We operate only on numerical columns
197         self._dataframe[self._numerical_columns] =\
198             preprocessing.scale(self._dataframe[self._numerical_columns])
199
200
201
202     def _one_hot_encoding(self):
203         '''
204         Apply a one hot encoding for category features
205         '''
206         logger.info("One hot encoding for categorical feature")
207
208         # We store numerical columns
209         df_out = pd.DataFrame()
210         for col in  self._numerical_columns:
211             df_out[col] = self._dataframe[col]
212         # The one hot encoding
213         for col in self._categorical_columns:
214             pd1 = pd.get_dummies(self._dataframe[col],prefix=col)
215             for col1 in pd1.columns:
216                 df_out[col1] = pd1[col1]
217         self._dataframe = df_out
218
219
220     @property
221     def dataframe(self):
222         '''
223         Returns the feature dataframe, after creating it if needed.
224         '''
225         if self._dataframe is None:
226             logger.info("Creating feature dataframe from feature dictionary")
227             self._dataframe = pd.DataFrame.from_dict(self.full_dict,
228                                                      orient='index')
229             # Dealing with NaN values
230             self._fill_nan()
231             # Adding previous (historical) nb_interventions as features
232             self._add_history()
233             # Normalizing numerical values
234             self._standardize()
235             # Dealing with categorical features
236             self._one_hot_encoding()
237         return self._dataframe
238
239
240     @dataframe.setter
241     def dataframe(self, df):
242         self._dataframe = df
243
244