1 from configparser import ConfigParser
2 from csv import DictReader
3 from datetime import datetime, timedelta
4 from itertools import chain
5 from logging import getLogger
6 from logging.config import fileConfig
8 from pathlib import Path
13 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
18 Generate a pandas dataframe from a dictionary of features per datetime, which
19 respects the starting and ending dates of the study, and its precision (the
20 time step) as passed to the constructor. Missing feature values are completed.
22 - Missing datetimes are added first with np.NaN feature values,
23 - The dataframe is then constructed based on the filled feature dictionary,
24 - NaN values are then filled with last known values.
27 def __init__(self, config_file = None, dict_features = None, features = None):
29 Constructor that defines all needed attributes and collects features.
31 self._config = ConfigParser()
32 self._config.read(config_file)
34 self._start = datetime.strptime(self._config['DATETIME']['start'],
36 self._end = datetime.strptime(self._config['DATETIME']['end'],
38 self._timestep = timedelta(hours =
39 self._config['DATETIME'].getfloat('hourStep'))
40 self._dict_features = dict_features
41 self._full_dict = None
42 self._dataframe = None
44 # If features are not provided to the constructor, then we collect
45 # any existing feature in the dictionary
47 self._features = features
49 self._features = set(chain.from_iterable([tuple(u.keys())
50 for u in [*dict_features.values()]]))
51 feature_files = Path.cwd() / 'config' / 'features'
52 self._features = {feat : {'numerical': False} for feat in self._features}
53 for feature_file in listdir(feature_files):
54 if feature_file.endswith('csv'):
55 with open(feature_files / feature_file , "r") as f:
56 reader = DictReader(f, delimiter=',')
57 typed_names = {row['name']: row['type'] for row in reader}
58 for feature in self._features:
59 if feature.split('_')[0] in typed_names:
60 self._features[feature]['type'] = int(typed_names[feature.split('_')[0]])
61 elif feature_file.endswith('cfg'):
62 config = ConfigParser()
63 config.read(feature_files / feature_file)
64 for section in config:
65 if config.has_option(section, 'numerical'):
66 self._features[section]['numerical'] = config[section].getboolean('numerical')
93 def timestep(self, x):
99 Add datetime keys in the dated feature dictionary that are missing. The
100 features are then set to np.NaN. Add missing features in existing datetimes
103 logger.info("Adding missing dates and filling missing features with NaN values")
104 current = self._start
105 while current <= self._end:
106 self._datetimes.append(current)
107 if current not in self._dict_features:
108 self._dict_features[current] = {feature:np.NaN
109 for feature in self._features}
111 null_dict = {feature:np.NaN
112 for feature in self._features}
113 null_dict.update(self._dict_features[current])
114 self._dict_features[current] = null_dict
115 current += self._timestep
116 for k in self._dict_features:
117 null_dict = {feature:np.NaN
118 for feature in self._features}
119 null_dict.update(self._dict_features[k])
120 self._dict_features[k] = null_dict
122 self._full_dict = {k: self._dict_features[k]
123 for k in sorted(self._dict_features.keys())}
130 Returns the fully filled dated feature dictionary, ordered by datetimes
132 if self._full_dict is None:
134 return self._full_dict
141 Returns the feature dataframe, after creating it if needed.
143 if self._dataframe is None:
144 logger.info("Creating feature dataframe from feature dictionary")
145 self._dataframe = pd.DataFrame.from_dict(self.full_dict,
147 logger.info("Filling NaN numerical values in the feature dataframe")
148 # We interpolate (linearly or with splines) only numerical columns
149 numerical_columns = [k for k in self._features if self._features[k]['type'] == 1
150 or (self._features[k]['type'] == 3 and self._features[k]['numerical'])]
152 if self._config['PREPROCESSING']['fill_method'] == 'propagate':
153 self._dataframe[numerical_columns] =\
154 self._dataframe[numerical_columns].fillna(method='ffill')
155 elif self._config['PREPROCESSING']['fill_method'] == 'linear':
156 self._dataframe[numerical_columns] =\
157 self._dataframe[numerical_columns].interpolate()
158 elif self._config['PREPROCESSING']['fill_method'] == 'spline':
159 self._dataframe[numerical_columns] =\
160 self._dataframe[numerical_columns].interpolate(method='spline',
161 order=self._config['PREPROCESSING'].getint('order'))
163 # For the categorical columns, NaN values are filled by duplicating
164 # the last known value (forward fill method)
165 logger.info("Filling NaN categorical values in the feature dataframe")
166 categorical_columns = [k for k in self._features if self._features[k]['type'] == 2
167 or (self._features[k]['type'] == 3 and not self._features[k]['numerical'])]
168 self._dataframe[categorical_columns] =\
169 self._dataframe[categorical_columns].fillna(method='ffill')
171 # Uncomment this line to fill NaN values at the beginning of the
172 # dataframe. This may not be a good idea, especially for features
173 # that are available only for recent years, e.g., air quality
174 #self._dataframe = self._dataframe.fillna(method='bfill')
176 # Dropping rows that are not related to our datetime window (start/
178 self._dataframe = self._dataframe.drop([k.to_pydatetime()
179 for k in self._dataframe.T
180 if k not in self._datetimes])
181 return self._dataframe
185 def dataframe(self, df):