`python -m venv ~/.venvs/predictops`
-activer l'environnement :
+- activer l'environnement :
`source ~/.venvs/predictops/bin/activate`
2. Détailler le traitement de chaque famille de feature dans le cfg associé
(feature_ephemeris.cfg, feature_meteo.cfg, etc.), en accord avec les fichiers
-csv associés dans le répertoire features.
+csv associés dans le répertoire features. Dans ces derniers, le type spécifie
+si la variable est numérique (1), qualitative (2), ou si elle peut être consi-
+dérée comme de l'un ou l'autre type (3), comme le jour dans l'année.
+
Exécution
else:
self._features = set(chain.from_iterable([tuple(u.keys())
for u in [*dict_features.values()]]))
- csv_files = Path.cwd() / 'config' / 'features'
- self._features = {feat : None for feat in self._features}
- for csv_file in listdir(csv_files):
- with open(csv_files / csv_file, "r") as f:
- reader = DictReader(f, delimiter=',')
- for row in reader:
- if row['name'] in self._features:
- self._features[row['name']] = row['type']
- print(self._features)
- exit()
+ feature_files = Path.cwd() / 'config' / 'features'
+ self._features = {feat : {'numerical': False} for feat in self._features}
+ for feature_file in listdir(feature_files):
+ if feature_file.endswith('csv'):
+ with open(feature_files / feature_file , "r") as f:
+ reader = DictReader(f, delimiter=',')
+ typed_names = {row['name']: row['type'] for row in reader}
+ for feature in self._features:
+ if feature.split('_')[0] in typed_names:
+ self._features[feature]['type'] = int(typed_names[feature.split('_')[0]])
+ elif feature_file.endswith('cfg'):
+ config = ConfigParser()
+ config.read(feature_files / feature_file)
+ for section in config:
+ if config.has_option(section, 'numerical'):
+ self._features[section]['numerical'] = config[section].getboolean('numerical')
+
@property
logger.info("Creating feature dataframe from feature dictionary")
self._dataframe = pd.DataFrame.from_dict(self.full_dict,
orient='index')
- logger.info("Filling NaN values in the feature dataframe")
-
+ logger.info("Filling NaN numerical values in the feature dataframe")
+ # We interpolate (linearly or with splines) only numerical columns
+ numerical_columns = [k for k in self._features if self._features[k]['type'] == 1
+ or (self._features[k]['type'] == 3 and self._features[k]['numerical'])]
+ # The interpolation
if self._config['PREPROCESSING']['fill_method'] == 'propagate':
- self._dataframe = self._dataframe.fillna(method='ffill')
+ self._dataframe[numerical_columns] =\
+ self._dataframe[numerical_columns].fillna(method='ffill')
elif self._config['PREPROCESSING']['fill_method'] == 'linear':
- self._dataframe = self._dataframe.interpolate()
+ self._dataframe[numerical_columns] =\
+ self._dataframe[numerical_columns].interpolate()
elif self._config['PREPROCESSING']['fill_method'] == 'spline':
- self._dataframe = self._dataframe.interpolate(method='spline',
- order=self._config['PREPROCESSING'].getint('order'))
+ self._dataframe[numerical_columns] =\
+ self._dataframe[numerical_columns].interpolate(method='spline',
+ order=self._config['PREPROCESSING'].getint('order'))
+
+ # For the categorical columns, NaN values are filled by duplicating
+ # the last known value (forward fill method)
+ logger.info("Filling NaN categorical values in the feature dataframe")
+ categorical_columns = [k for k in self._features if self._features[k]['type'] == 2
+ or (self._features[k]['type'] == 3 and not self._features[k]['numerical'])]
+ self._dataframe[categorical_columns] =\
+ self._dataframe[categorical_columns].fillna(method='ffill')
# Uncomment this line to fill NaN values at the beginning of the
# dataframe. This may not be a good idea, especially for features
# that are available only for recent years, e.g., air quality
#self._dataframe = self._dataframe.fillna(method='bfill')
+ # Dropping rows that are not related to our datetime window (start/
+ # step / end)
self._dataframe = self._dataframe.drop([k.to_pydatetime()
for k in self._dataframe.T
if k not in self._datetimes])
+from configparser import ConfigParser
from csv import DictReader
from logging import getLogger
from logging.config import fileConfig
class Source:
def __init__(self):
'''
- Check if the same feature name is used in two different feature sources
+ Check if the same feature name is used in two different feature sources,
+ and if the sources of type 3 (being both categorical and numerical) have
+ a specified type in the feature_...cfg file
'''
logger.info('Check for redondant feature names')
- csv_files = Path.cwd() / 'config' / 'features'
+ feature_files = Path.cwd() / 'config' / 'features'
list_of_names = []
- for csv_file in listdir(csv_files):
- with open(csv_files / csv_file, "r") as f:
- reader = DictReader(f, delimiter=',')
- list_of_names.extend([row['name'] for row in reader])
+ for file_name in listdir(feature_files ):
+ if file_name.endswith('csv'):
+ with open(feature_files / file_name, "r") as f:
+ reader = DictReader(f, delimiter=',')
+ list_of_names.extend([row['name'] for row in reader])
+
if len(list_of_names) != len(set(list_of_names)):
- raise ValueError("At least two features have the same name")
\ No newline at end of file
+ raise ValueError("At least two features have the same name")
+
+ logger.info('Check for specified feature types')
+ names_of_mixed_types = []
+ for file_name in listdir(feature_files):
+ if file_name.endswith('csv'):
+ with open(feature_files / file_name, "r") as f:
+ reader = DictReader(f, delimiter=',')
+ names_of_mixed_types.extend([row['name'] for row in reader
+ if row['type'] == '3'])
+
+ cfg_names_of_mixed_types = []
+ for file_name in listdir(feature_files):
+ if file_name.endswith('cfg'):
+ config = ConfigParser()
+ config.read(feature_files / file_name)
+ for section in config:
+ if config.has_option(section, 'numerical'):
+ cfg_names_of_mixed_types.append(section)
+
+ if sorted(names_of_mixed_types) != sorted(cfg_names_of_mixed_types):
+ raise ValueError(f"Problem with features of mixed types: "
+ f"{set(names_of_mixed_types).symmetric_difference(cfg_names_of_mixed_types)}")