-from .source import Source
-
from configparser import ConfigParser
from csv import DictReader
from datetime import datetime
fileConfig((Path.cwd() / 'config') / 'logging.cfg')
logger = getLogger()
-CSV_FILE = Path.cwd() / 'config' / 'features' / 'meteofrance_features.csv'
-
-class MeteoFrance(Source):
+class MeteoFrance:
- _latitude = None
- _longitude = None
+ _latitude = None
+ _longitude = None
_nb_stations = None
- _start = None
- _end = None
- _features = None
+ _start = None
+ _end = None
+ _features = None
def __init__(self, config_file):
'''
Constructor of the MeteoFrance source of feature.
-
- - It will reinitiate the data directory, if asked in the config
- features.cfg file.
- - It searches for the nb_stations meteo stations closest to the provided
- point (longitude and latitude)
-
- For more information about this source of feature, see:
- https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
-
- Parameters:
- - in config file:
- latitude (float): The latitude from which we want the meteo features.
- longitude (float): The longitude from which we want the meteo features.
- nb_stations (int): Number of closest stations to consider.
- - provided to the constructor
- features (list): Weather features that have to be integrated, according
- to their names in meteofrance_features.csv (cf. config directory)
-
'''
- # Check for the integrity of feature names
- Source.__init__(self)
-
self._config = ConfigParser()
self._config.read(config_file)
self._stations = self._get_stations()
# Collecting meteofrance features
- with open(CSV_FILE, "r") as f:
- reader = DictReader(f, delimiter=',')
- self._features = [row['name'] for row in reader
- if self._config['FEATURES'].getboolean(row['name'])]
-
+ self._features = [section for section in self._config
+ if self._config.has_option(section, 'numerical')
+ and (self._config[section]['binary'] or
+ self._config[section]['categorical'] or
+ self._config[section]['numerical'])]
@property
def start(self):
def start(self, x):
self._start = x
-
@property
def end(self):
return self._end
def end(self, x):
self._end = x
-
@property
def latitude(self):
return self._latitude
def latitude(self, x):
self._latitude = x
-
@property
def longitude(self):
return self._longitude
def longitude(self, x):
self._longitude = x
-
@property
def nb_stations(self):
return self._nb_stations
def nb_stations(self, x):
self._nb_stations = x
-
def _regenerate_directory(self):
'''
Re-creating data directory architecture for MeteoFrance
p = Path(self._data_directory / 'config')
p.mkdir(exist_ok=True, parents=True)
-
-
def _get_stations(self):
'''
Collect (after downloading them, if needed) the stations and their
# The csv file of meteo stations (names, ids and locations) if downloaded,
# if not available in the config directory within data / meteo_france
link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
- p = Path(self._data_directory / 'config' )
+ p = Path(self._data_directory / 'config')
csv_file = p / basename(link)
if not isfile(csv_file):
logger.info('Downloading location stations from MeteoFrance')
reader = DictReader(f, delimiter=';')
for row in reader:
latitude, longitude = eval(row['Latitude']), eval(row['Longitude'])
- self._dict_stations[row['Nom'].replace("'",'’')] = {
- 'id' : row['ID'],
- 'longitude' : longitude,
- 'latitude' : latitude,
- 'distance' : vincenty(
+ self._dict_stations[row['Nom'].replace("'", '’')] = {
+ 'id': row['ID'],
+ 'longitude': longitude,
+ 'latitude': latitude,
+ 'distance': vincenty(
(self._latitude, self._longitude),
(latitude, longitude)).km
}
# Find the closest stations
logger.info('Finding the closest stations')
stations_by_distance = sorted(self._dict_stations.keys(),
- key = lambda x: self._dict_stations[x]['distance'])
+ key=lambda x: self._dict_stations[x]['distance'])
logger.info(f'The {self._nb_stations} closest stations are: '
f'{", ".join(stations_by_distance[:self._nb_stations])}.')
return [self._dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
-
-
def _collect_historical_data(self):
'''
We collect all csv files from January 1996 until the month
# List of year-months to consider
historical = []
date_end = self._end
- for year in range(self._start.year, date_end.year+1):
- for month in range(1,13):
+ for year in range(self._start.year, date_end.year + 1):
+ for month in range(1, 13):
date = datetime(year, month, 1)
if date >= self._start and date <= date_end:
historical.append(date.strftime("%Y%m"))
p = Path(meteo_data)
p.mkdir(exist_ok=True, parents=True)
for date in historical:
- if not isfile(meteo_data / ('synop.'+date+'.csv')):
+ if not isfile(meteo_data / ('synop.' + date + '.csv')):
link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
link += date + '.csv.gz'
download_path = meteo_data / basename(link)
g.write(f.read().decode())
remove(meteo_data / basename(link))
-
-
def update(self):
'''
Update the MeteoFrance features with the last available data
logger.info('Update historical csv files from MeteoFrance, if needed')
today = datetime.now()
- todel = 'synop.'+today.strftime("%Y%m")+".csv"
+ todel = 'synop.' + today.strftime("%Y%m") + ".csv"
try:
remove(self._data_directory / 'historical' / todel)
except:
logger.warning(f"{self._data_directory / 'historical' / todel} not found")
self._collect_historical_data()
-
-
@property
def dated_features(self):
'''
dict: the dictionary of features per datestamp
'''
if self._dated_features == None:
- logger.info(f'Collecting meteo feature information from {CSV_FILE}')
+ logger.info('Collecting meteofrance feature information')
# A dictionary for the features
- with open(CSV_FILE, "r") as f:
- reader = DictReader(f, delimiter=',')
- dico_features = {row["abbreviation"]:
- {
- 'name': row['name'], # feature name
- 'type': row['type'] # qualitative (2) or quantitative (1)
- }
- for row in reader if row['name'] in self._features}
- #print([row for row in reader])
- #print([row for row in reader if row['name'] in self._features])
+ dico_features = {self._config[section]["abbreviation"]:
+ {
+ 'name': section, # feature name
+ 'numerical': self._config[section]['numerical'],
+ 'categorical': self._config[section]['categorical']
+ }
+ for section in self._features}
dir_data = Path.cwd() / 'data' / 'features' / 'meteo_france' / 'historical'
self._dated_features = {}
- for csv_meteo in listdir(dir_data):
+
+ for csv_meteo in sorted(listdir(dir_data)):
date = datetime.strptime(csv_meteo.split('.')[1], '%Y%m')
if (date >= self._start and date <= self._end)\
- or (date.year == self._start.year and date.month == self._start.month)\
- or (date.year == self._end.year and date.month == self._end.month):
- logger.info(f'Inserting {csv_meteo} in intervention dictionary')
+ or (date.year == self._start.year and date.month == self._start.month)\
+ or (date.year == self._end.year and date.month == self._end.month):
+ logger.info(f'Adding meteofrance features from {csv_meteo}')
with open(dir_data / csv_meteo, "r") as f:
reader = DictReader(f, delimiter=';')
for row in reader:
if row['numer_sta'] in self._stations:
date = datetime.strptime(row['date'], '%Y%m%d%H%M%S')
- if date >= self._start and date <= self._end:
- self._dated_features.setdefault(date,{}).update({dico_features[feat]['name']+'_'+str(self._stations.index(row['numer_sta'])): eval(row[feat].replace('mq','None')) for feat in dico_features})
+ if date >= self._start and date <= self._end:
+ self._dated_features.setdefault(date, {}).update({dico_features[feat]['name'] + '_' + str(self._stations.index(row['numer_sta'])): eval(row[feat].replace('mq', 'None')) for feat in dico_features})
return self._dated_features