]> AND Private Git Repository - predictops.git/blob - predictops/learn/preprocessing.py
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
106a6267c3aa804aca024c471e5c7b6e29805799
[predictops.git] / predictops / learn / preprocessing.py
1 from configparser import ConfigParser
2 from csv import DictReader
3 from datetime import datetime, timedelta
4 from itertools import chain
5 from logging import getLogger
6 from logging.config import fileConfig
7 from os import listdir
8 from pathlib import Path
9 from sklearn import preprocessing
10
11 import numpy as np
12 import pandas as pd
13
14 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
15 logger = getLogger()
16
17 class Preprocessing:
18     '''
19     Generate a pandas dataframe from a dictionary of features per datetime, which
20     respects the starting and ending dates of the study, and its precision (the
21     time step) as passed to the constructor. Missing feature values are completed.
22
23      - Missing datetimes are added first with np.NaN feature values,
24      - The dataframe is then constructed based on the filled feature dictionary,
25      - NaN values are then filled with last known values.
26     '''
27
28     def __init__(self, config_file = None,
29                  dict_features = None, dict_target = None):
30         '''
31         Constructor that defines all needed attributes and collects features.
32         '''
33         self._config = config_file
34
35         self._start = datetime.strptime(self._config['DATETIME']['start'],
36                                         '%m/%d/%Y %H:%M:%S')
37         self._end = datetime.strptime(self._config['DATETIME']['end'],
38                                         '%m/%d/%Y %H:%M:%S')
39         self._timestep = timedelta(hours =
40                                    self._config['DATETIME'].getfloat('hourStep'))
41         self._dict_features = dict_features
42         self._dict_target = dict_target
43
44         self._full_dict = None
45         self._dataframe = None
46         self._datetimes = []
47
48         self._features = set(chain.from_iterable([tuple(u.keys())
49                                                       for u in [*dict_features.values()]]))
50
51         feature_files = Path.cwd() / 'config' / 'features'
52         self._features = {feat : {'numerical': False} for feat in self._features}
53         for feature_file in listdir(feature_files):
54             if feature_file.endswith('csv'):
55                 with open(feature_files / feature_file , "r") as f:
56                     reader = DictReader(f, delimiter=',')
57                     typed_names = {row['name']: row['type'] for row in reader}
58                 for feature in self._features:
59                     if feature.split('_')[0] in typed_names:
60                         self._features[feature]['type'] = int(typed_names[feature.split('_')[0]])
61             elif feature_file.endswith('cfg'):
62                 config = ConfigParser()
63                 config.read(feature_files / feature_file)
64                 for section in config:
65                     if config.has_option(section, 'numerical'):
66                         self._features[section]['numerical'] = config[section].getboolean('numerical')
67
68         self._numerical_columns = [k for k in self._features if self._features[k]['type'] == 1
69                    or (self._features[k]['type'] == 3 and self._features[k]['numerical'])]
70
71         self._categorical_columns = [k for k in self._features if self._features[k]['type'] == 2
72                    or (self._features[k]['type'] == 3 and not self._features[k]['numerical'])]
73
74
75
76     @property
77     def start(self):
78         return self._start
79
80     @start.setter
81     def start(self, x):
82         self._start = x
83
84
85     @property
86     def end(self):
87         return self._end
88
89     @end.setter
90     def end(self, x):
91         self._end = x
92
93
94     @property
95     def timestep(self):
96         return self._timestep
97
98     @timestep.setter
99     def timestep(self, x):
100         self._timestep = x
101
102
103     def _fill_dict(self):
104         '''
105         Add datetime keys in the dated feature dictionary that are missing. The
106         features are then set to np.NaN. Add missing features in existing datetimes
107         too.
108         '''
109         logger.info("Adding missing dates and filling missing features with NaN values")
110         current = self._start
111         while current <= self._end:
112             self._datetimes.append(current)
113             if current not in self._dict_features:
114                 self._dict_features[current] = {feature:np.NaN
115                                                 for feature in self._features}
116             else:
117                 null_dict = {feature:np.NaN
118                              for feature in self._features}
119                 null_dict.update(self._dict_features[current])
120                 self._dict_features[current] = null_dict
121             current += self._timestep
122         for k in self._dict_features:
123             null_dict = {feature:np.NaN
124                          for feature in self._features}
125             null_dict.update(self._dict_features[k])
126             self._dict_features[k] = null_dict
127
128         self._full_dict = {k: self._dict_features[k]
129                            for k in sorted(self._dict_features.keys())}
130
131
132
133     @property
134     def full_dict(self):
135         '''
136         Returns the fully filled dated feature dictionary, ordered by datetimes
137         '''
138         if self._full_dict is None:
139             self._fill_dict()
140         return self._full_dict
141
142
143     def _fill_nan(self):
144         '''
145         Fill NaN values, either by propagation or by interpolation (linear or splines)
146         '''
147         logger.info("Filling NaN numerical values in the feature dataframe")
148         # We interpolate (linearly or with splines) only numerical columns
149         # The interpolation
150         if self._config['PREPROCESSING']['fill_method'] == 'propagate':
151             self._dataframe[self._numerical_columns] =\
152                 self._dataframe[self._numerical_columns].fillna(method='ffill')
153         elif self._config['PREPROCESSING']['fill_method'] == 'linear':
154             self._dataframe[self._numerical_columns] =\
155                 self._dataframe[self._numerical_columns].interpolate()
156         elif self._config['PREPROCESSING']['fill_method'] == 'spline':
157             self._dataframe[self._numerical_columns] =\
158                 self._dataframe[self._numerical_columns].interpolate(method='spline',
159                      order=self._config['PREPROCESSING'].getint('order'))
160
161         # For the categorical columns, NaN values are filled by duplicating
162         # the last known value (forward fill method)
163         logger.info("Filling NaN categorical values in the feature dataframe")
164         self._dataframe[self._categorical_columns] =\
165             self._dataframe[self._categorical_columns].fillna(method='ffill')
166
167         # Uncomment this line to fill NaN values at the beginning of the
168         # dataframe. This may not be a good idea, especially for features
169         # that are available only for recent years, e.g., air quality
170         #self._dataframe = self._dataframe.fillna(method='bfill')
171
172         # Dropping rows that are not related to our datetime window (start/
173         # step / end)
174         logger.info("Dropping rows that are not related to our datetime window")
175         self._dataframe['datetime'] =\
176             self._dataframe.apply(lambda x: datetime(int(x.year), int(x.month), int(x.dayInMonth), int(x.hour)), axis=1)
177         self._dataframe['row_ok'] =\
178             self._dataframe.apply(lambda x:x.datetime in self._datetimes, axis=1)
179         self._dataframe = self._dataframe[self._dataframe['row_ok']]
180         self._dataframe = self._dataframe.drop(['datetime', 'row_ok'], axis=1)
181         logger.info("Rows dropped")
182
183
184     def _add_history(self):
185         '''
186         Integrating previous nb of interventions as features
187         '''
188         logger.info("Integrating previous nb of interventions as features")
189         nb_lines = self._config['HISTORY_KNOWLEDGE'].getint('nb_lines')
190         for k in range(1,nb_lines+1):
191             name = 'history_'+str(nb_lines-k+1)
192             self._dataframe[name] = [np.NaN]*k + list(self._dict_target.values())[:-k]
193             self._numerical_columns.append(name)
194         self._dataframe = self._dataframe[nb_lines:]
195
196
197
198     def _standardize(self):
199         '''
200         Normalizing numerical features
201         '''
202         logger.info("Standardizing numerical values in the feature dataframe")
203         # We operate only on numerical columns
204         self._dataframe[self._numerical_columns] =\
205             preprocessing.scale(self._dataframe[self._numerical_columns])
206
207
208
209     def _one_hot_encoding(self):
210         '''
211         Apply a one hot encoding for category features
212         '''
213         logger.info("One hot encoding for categorical feature")
214
215         # We store numerical columns
216         df_out = pd.DataFrame()
217         for col in  self._numerical_columns:
218             df_out[col] = self._dataframe[col]
219         # The one hot encoding
220         for col in self._categorical_columns:
221             pd1 = pd.get_dummies(self._dataframe[col],prefix=col)
222             for col1 in pd1.columns:
223                 df_out[col1] = pd1[col1]
224         self._dataframe = df_out
225
226
227     @property
228     def dataframe(self):
229         '''
230         Returns the feature dataframe, after creating it if needed.
231         '''
232         if self._dataframe is None:
233             logger.info("Creating feature dataframe from feature dictionary")
234             self._dataframe = pd.DataFrame.from_dict(self.full_dict,
235                                                      orient='index')
236             # Dealing with NaN values
237             self._fill_nan()
238             # Adding previous (historical) nb_interventions as features
239             self._add_history()
240             # Normalizing numerical values
241             self._standardize()
242             # Dealing with categorical features
243             self._one_hot_encoding()
244         return self._dataframe
245
246
247     @dataframe.setter
248     def dataframe(self, df):
249         self._dataframe = df
250
251