]> AND Private Git Repository - predictops.git/blob - predictops/learn/preprocessing.py
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
51ecb4e162ff77b804a6a9e4790c4e9da34e1410
[predictops.git] / predictops / learn / preprocessing.py
1 from configparser import ConfigParser
2 from csv import DictReader
3 from datetime import datetime, timedelta
4 from itertools import chain
5 from logging import getLogger
6 from logging.config import fileConfig
7 from os import listdir
8 from pathlib import Path
9 from sklearn import preprocessing
10
11 import numpy as np
12 import pandas as pd
13
14 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
15 logger = getLogger()
16
17 class Preprocessing:
18     '''
19     Generate a pandas dataframe from a dictionary of features per datetime, which
20     respects the starting and ending dates of the study, and its precision (the
21     time step) as passed to the constructor. Missing feature values are completed.
22
23      - Missing datetimes are added first with np.NaN feature values,
24      - The dataframe is then constructed based on the filled feature dictionary,
25      - NaN values are then filled with last known values.
26     '''
27
28     def __init__(self, config_file = None,
29                  dict_features = None, dict_target = None):
30         '''
31         Constructor that defines all needed attributes and collects features.
32         '''
33         self._config = config_file
34
35         self._start = datetime.strptime(self._config['DATETIME']['start'],
36                                         '%m/%d/%Y %H:%M:%S')
37         self._end = datetime.strptime(self._config['DATETIME']['end'],
38                                         '%m/%d/%Y %H:%M:%S')
39         self._timestep = timedelta(hours =
40                                    self._config['DATETIME'].getfloat('hourStep'))
41         self._dict_features = dict_features
42         self._dict_target = dict_target
43
44         self._full_dict = None
45         self._dataframe = None
46         self._datetimes = []
47
48         self._features = set(chain.from_iterable([tuple(u.keys())
49                                                       for u in [*dict_features.values()]]))
50
51         feature_files = Path.cwd() / 'config' / 'features'
52         self._features = {feat : {'numerical': False} for feat in self._features}
53         for feature_file in listdir(feature_files):
54             if feature_file.endswith('csv'):
55                 with open(feature_files / feature_file , "r") as f:
56                     reader = DictReader(f, delimiter=',')
57                     typed_names = {row['name']: row['type'] for row in reader}
58                 for feature in self._features:
59                     if feature.split('_')[0] in typed_names:
60                         self._features[feature]['type'] = int(typed_names[feature.split('_')[0]])
61             elif feature_file.endswith('cfg'):
62                 config = ConfigParser()
63                 config.read(feature_files / feature_file)
64                 for section in config:
65                     if config.has_option(section, 'numerical'):
66                         self._features[section]['numerical'] = config[section].getboolean('numerical')
67
68         self._numerical_columns = [k for k in self._features if self._features[k]['type'] == 1
69                    or (self._features[k]['type'] == 3 and self._features[k]['numerical'])]
70
71         self._categorical_columns = [k for k in self._features if self._features[k]['type'] == 2
72                    or (self._features[k]['type'] == 3 and not self._features[k]['numerical'])]
73
74
75
76     @property
77     def start(self):
78         return self._start
79
80     @start.setter
81     def start(self, x):
82         self._start = x
83
84
85     @property
86     def end(self):
87         return self._end
88
89     @end.setter
90     def end(self, x):
91         self._end = x
92
93
94     @property
95     def timestep(self):
96         return self._timestep
97
98     @timestep.setter
99     def timestep(self, x):
100         self._timestep = x
101
102
103     def _fill_dict(self):
104         '''
105         Add datetime keys in the dated feature dictionary that are missing. The
106         features are then set to np.NaN. Add missing features in existing datetimes
107         too.
108         '''
109         logger.info("Adding missing dates and filling missing features with NaN values")
110         current = self._start
111         while current <= self._end:
112             self._datetimes.append(current)
113             if current not in self._dict_features:
114                 self._dict_features[current] = {feature:np.NaN
115                                                 for feature in self._features}
116             else:
117                 null_dict = {feature:np.NaN
118                              for feature in self._features}
119                 null_dict.update(self._dict_features[current])
120                 self._dict_features[current] = null_dict
121             current += self._timestep
122         for k in self._dict_features:
123             null_dict = {feature:np.NaN
124                          for feature in self._features}
125             null_dict.update(self._dict_features[k])
126             self._dict_features[k] = null_dict
127
128         self._full_dict = {k: self._dict_features[k]
129                            for k in sorted(self._dict_features.keys())}
130
131
132
133     @property
134     def full_dict(self):
135         '''
136         Returns the fully filled dated feature dictionary, ordered by datetimes
137         '''
138         if self._full_dict is None:
139             self._fill_dict()
140         return self._full_dict
141
142
143     def _fill_nan(self):
144         '''
145         Fill NaN values, either by propagation or by interpolation (linear or splines)
146         '''
147         logger.info("Filling NaN numerical values in the feature dataframe")
148         # We interpolate (linearly or with splines) only numerical columns
149         # The interpolation
150         if self._config['PREPROCESSING']['fill_method'] == 'propagate':
151             self._dataframe[self._numerical_columns] =\
152                 self._dataframe[self._numerical_columns].fillna(method='ffill')
153         elif self._config['PREPROCESSING']['fill_method'] == 'linear':
154             self._dataframe[self._numerical_columns] =\
155                 self._dataframe[self._numerical_columns].interpolate()
156         elif self._config['PREPROCESSING']['fill_method'] == 'spline':
157             self._dataframe[self._numerical_columns] =\
158                 self._dataframe[self._numerical_columns].interpolate(method='spline',
159                      order=self._config['PREPROCESSING'].getint('order'))
160
161         # For the categorical columns, NaN values are filled by duplicating
162         # the last known value (forward fill method)
163         logger.info("Filling NaN categorical values in the feature dataframe")
164         self._dataframe[self._categorical_columns] =\
165             self._dataframe[self._categorical_columns].fillna(method='ffill')
166
167         # Uncomment this line to fill NaN values at the beginning of the
168         # dataframe. This may not be a good idea, especially for features
169         # that are available only for recent years, e.g., air quality
170         #self._dataframe = self._dataframe.fillna(method='bfill')
171
172         # Dropping rows that are not related to our datetime window (start/
173         # step / end)
174         self._dataframe = self._dataframe.drop([k.to_pydatetime()
175                                                for k in self._dataframe.T
176                                                if k not in self._datetimes])
177
178
179     def _add_history(self):
180         '''
181         Integrating previous nb of interventions as features
182         '''
183         logger.info("Integrating previous nb of interventions as features")
184         nb_lines = self._config['HISTORY_KNOWLEDGE'].getint('nb_lines')
185         for k in range(1,nb_lines+1):
186             name = 'history_'+str(nb_lines-k+1)
187             self._dataframe[name] = [np.NaN]*k + list(self._dict_target.values())[:-k]
188             self._numerical_columns.append(name)
189         self._dataframe = self._dataframe[nb_lines:]
190
191
192
193     def _standardize(self):
194         '''
195         Normalizing numerical features
196         '''
197         logger.info("Standardizing numerical values in the feature dataframe")
198         # We operate only on numerical columns
199         self._dataframe[self._numerical_columns] =\
200             preprocessing.scale(self._dataframe[self._numerical_columns])
201
202
203
204     def _one_hot_encoding(self):
205         '''
206         Apply a one hot encoding for category features
207         '''
208         logger.info("One hot encoding for categorical feature")
209
210         # We store numerical columns
211         df_out = pd.DataFrame()
212         for col in  self._numerical_columns:
213             df_out[col] = self._dataframe[col]
214         # The one hot encoding
215         for col in self._categorical_columns:
216             pd1 = pd.get_dummies(self._dataframe[col],prefix=col)
217             for col1 in pd1.columns:
218                 df_out[col1] = pd1[col1]
219         self._dataframe = df_out
220
221
222     @property
223     def dataframe(self):
224         '''
225         Returns the feature dataframe, after creating it if needed.
226         '''
227         if self._dataframe is None:
228             logger.info("Creating feature dataframe from feature dictionary")
229             self._dataframe = pd.DataFrame.from_dict(self.full_dict,
230                                                      orient='index')
231             # Dealing with NaN values
232             self._fill_nan()
233             # Adding previous (historical) nb_interventions as features
234             self._add_history()
235             # Normalizing numerical values
236             self._standardize()
237             # Dealing with categorical features
238             self._one_hot_encoding()
239         return self._dataframe
240
241
242     @dataframe.setter
243     def dataframe(self, df):
244         self._dataframe = df
245
246