1 from configparser import ConfigParser
2 from csv import DictReader
3 from datetime import datetime, timedelta
4 from itertools import chain
5 from logging import getLogger
6 from logging.config import fileConfig
8 from pathlib import Path
9 from sklearn import preprocessing
14 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
19 Generate a pandas dataframe from a dictionary of features per datetime, which
20 respects the starting and ending dates of the study, and its precision (the
21 time step) as passed to the constructor. Missing feature values are completed.
23 - Missing datetimes are added first with np.NaN feature values,
24 - The dataframe is then constructed based on the filled feature dictionary,
25 - NaN values are then filled with last known values.
28 def __init__(self, config_file = None,
29 dict_features = None, dict_target = None):
31 Constructor that defines all needed attributes and collects features.
33 self._config = config_file
35 self._start = datetime.strptime(self._config['DATETIME']['start'],
37 self._end = datetime.strptime(self._config['DATETIME']['end'],
39 self._timestep = timedelta(hours =
40 self._config['DATETIME'].getfloat('hourStep'))
41 self._dict_features = dict_features
42 self._dict_target = dict_target
44 self._full_dict = None
45 self._dataframe = None
48 self._features = set(chain.from_iterable([tuple(u.keys())
49 for u in [*dict_features.values()]]))
51 feature_files = Path.cwd() / 'config' / 'features'
52 self._features = {feat : {'numerical': False} for feat in self._features}
53 for feature_file in listdir(feature_files):
54 if feature_file.endswith('csv'):
55 with open(feature_files / feature_file , "r") as f:
56 reader = DictReader(f, delimiter=',')
57 typed_names = {row['name']: row['type'] for row in reader}
58 for feature in self._features:
59 if feature.split('_')[0] in typed_names:
60 self._features[feature]['type'] = int(typed_names[feature.split('_')[0]])
61 elif feature_file.endswith('cfg'):
62 config = ConfigParser()
63 config.read(feature_files / feature_file)
64 for section in config:
65 if config.has_option(section, 'numerical'):
66 self._features[section]['numerical'] = config[section].getboolean('numerical')
68 self._numerical_columns = [k for k in self._features if self._features[k]['type'] == 1
69 or (self._features[k]['type'] == 3 and self._features[k]['numerical'])]
71 self._categorical_columns = [k for k in self._features if self._features[k]['type'] == 2
72 or (self._features[k]['type'] == 3 and not self._features[k]['numerical'])]
99 def timestep(self, x):
103 def _fill_dict(self):
105 Add datetime keys in the dated feature dictionary that are missing. The
106 features are then set to np.NaN. Add missing features in existing datetimes
109 logger.info("Adding missing dates and filling missing features with NaN values")
110 current = self._start
111 while current <= self._end:
112 self._datetimes.append(current)
113 if current not in self._dict_features:
114 self._dict_features[current] = {feature:np.NaN
115 for feature in self._features}
117 null_dict = {feature:np.NaN
118 for feature in self._features}
119 null_dict.update(self._dict_features[current])
120 self._dict_features[current] = null_dict
121 current += self._timestep
122 for k in self._dict_features:
123 null_dict = {feature:np.NaN
124 for feature in self._features}
125 null_dict.update(self._dict_features[k])
126 self._dict_features[k] = null_dict
128 self._full_dict = {k: self._dict_features[k]
129 for k in sorted(self._dict_features.keys())}
136 Returns the fully filled dated feature dictionary, ordered by datetimes
138 if self._full_dict is None:
140 return self._full_dict
145 Fill NaN values, either by propagation or by interpolation (linear or splines)
147 logger.info("Filling NaN numerical values in the feature dataframe")
148 # We interpolate (linearly or with splines) only numerical columns
150 if self._config['PREPROCESSING']['fill_method'] == 'propagate':
151 self._dataframe[self._numerical_columns] =\
152 self._dataframe[self._numerical_columns].fillna(method='ffill')
153 elif self._config['PREPROCESSING']['fill_method'] == 'linear':
154 self._dataframe[self._numerical_columns] =\
155 self._dataframe[self._numerical_columns].interpolate()
156 elif self._config['PREPROCESSING']['fill_method'] == 'spline':
157 self._dataframe[self._numerical_columns] =\
158 self._dataframe[self._numerical_columns].interpolate(method='spline',
159 order=self._config['PREPROCESSING'].getint('order'))
161 # For the categorical columns, NaN values are filled by duplicating
162 # the last known value (forward fill method)
163 logger.info("Filling NaN categorical values in the feature dataframe")
164 self._dataframe[self._categorical_columns] =\
165 self._dataframe[self._categorical_columns].fillna(method='ffill')
167 # Uncomment this line to fill NaN values at the beginning of the
168 # dataframe. This may not be a good idea, especially for features
169 # that are available only for recent years, e.g., air quality
170 #self._dataframe = self._dataframe.fillna(method='bfill')
172 # Dropping rows that are not related to our datetime window (start/
174 logger.info("Dropping rows that are not related to our datetime window")
175 self._dataframe['datetime'] =\
176 self._dataframe.apply(lambda x: datetime(int(x.year), int(x.month), int(x.dayInMonth), int(x.hour)), axis=1)
177 self._dataframe['row_ok'] =\
178 self._dataframe.apply(lambda x:x.datetime in self._datetimes, axis=1)
179 self._dataframe = self._dataframe[self._dataframe['row_ok']]
180 self._dataframe = self._dataframe.drop(['datetime', 'row_ok'], axis=1)
181 logger.info("Rows dropped")
184 def _add_history(self):
186 Integrating previous nb of interventions as features
188 logger.info("Integrating previous nb of interventions as features")
189 nb_lines = self._config['HISTORY_KNOWLEDGE'].getint('nb_lines')
190 for k in range(1,nb_lines+1):
191 name = 'history_'+str(nb_lines-k+1)
192 self._dataframe[name] = [np.NaN]*k + list(self._dict_target.values())[:-k]
193 self._numerical_columns.append(name)
194 self._dataframe = self._dataframe[nb_lines:]
198 def _standardize(self):
200 Normalizing numerical features
202 logger.info("Standardizing numerical values in the feature dataframe")
203 # We operate only on numerical columns
204 self._dataframe[self._numerical_columns] =\
205 preprocessing.scale(self._dataframe[self._numerical_columns])
209 def _one_hot_encoding(self):
211 Apply a one hot encoding for category features
213 logger.info("One hot encoding for categorical feature")
215 # We store numerical columns
216 df_out = pd.DataFrame()
217 for col in self._numerical_columns:
218 df_out[col] = self._dataframe[col]
219 # The one hot encoding
220 for col in self._categorical_columns:
221 pd1 = pd.get_dummies(self._dataframe[col],prefix=col)
222 for col1 in pd1.columns:
223 df_out[col1] = pd1[col1]
224 self._dataframe = df_out
230 Returns the feature dataframe, after creating it if needed.
232 if self._dataframe is None:
233 logger.info("Creating feature dataframe from feature dictionary")
234 self._dataframe = pd.DataFrame.from_dict(self.full_dict,
236 # Dealing with NaN values
238 # Adding previous (historical) nb_interventions as features
240 # Normalizing numerical values
242 # Dealing with categorical features
243 self._one_hot_encoding()
244 return self._dataframe
248 def dataframe(self, df):