from configparser import ConfigParser
from csv import DictReader
-from geopy import distance
-from pathlib import Path
-from shutil import rmtree
-
+from datetime import datetime
+from geopy.distance import vincenty
+from logging import getLogger
from logging.config import fileConfig
+from os import listdir, remove, system
from os.path import isfile, basename
+from pathlib import Path
+from shutil import rmtree
from urllib.request import urlretrieve
-import logging
+import gzip
fileConfig((Path.cwd() / 'config') / 'logging.cfg')
-logger = logging.getLogger()
+logger = getLogger()
class MeteoFrance:
- def __init__(self, latitude = 47.25, longitude = 6.0333, nb_stations = 3):
+ def __init__(self, latitude = 47.25, longitude = 6.0333, nb_stations = 3,
+ start = datetime.strptime('19960101000000', '%Y%m%d%H%M%S'),
+ end = datetime.now(),
+ features = []):
'''
Constructor of the MeteoFrance source of feature.
latitude (float): The latitude from which we want the meteo features.
longitude (float): The longitude from which we want the meteo features.
nb_stations (int): Number of closest stations to consider.
+ features (list): Weather features that have to be integrated, according
+ to their names in meteofrance_features.csv (cf. config directory)
'''
self._latitude = latitude
self._longitude = longitude
self._nb_stations = nb_stations
+ self._start = start
+ self._end = end
+ self._features = features
self._data_directory = (Path.cwd() / 'data') / 'meteo_france'
+ self._dated_features = None
+
# Re-creating data directory architecture for MeteoFrance, if asked
config = ConfigParser()
config.read((Path.cwd() / 'config') / 'features.cfg')
# Collecting the closest meteo station
self._stations = self._get_stations()
- print(self._stations)
urlretrieve(link, csv_file)
# A dictionary for the meteo stations is created
- dict_stations = {}
+ self._dict_stations = {}
logger.info('Collecting information about meteo stations')
with open(csv_file, "r") as f:
reader = DictReader(f, delimiter=';')
for row in reader:
latitude, longitude = eval(row['Latitude']), eval(row['Longitude'])
- dict_stations[row['Nom'].replace("'",'’')] = {
+ self._dict_stations[row['Nom'].replace("'",'’')] = {
'id' : row['ID'],
'longitude' : longitude,
'latitude' : latitude,
- 'distance' : distance.vincenty(
+ 'distance' : vincenty(
(self._latitude, self._longitude),
(latitude, longitude)).km
}
# Find the closest stations
logger.info('Finding the closest stations')
- stations_by_distance = sorted(dict_stations.keys(),
- key = lambda x: dict_stations[x]['distance'])
+ stations_by_distance = sorted(self._dict_stations.keys(),
+ key = lambda x: self._dict_stations[x]['distance'])
logger.info(f'The {self._nb_stations} closest stations are: '
f'{", ".join(stations_by_distance[:self._nb_stations])}.')
- return [dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
+ return [self._dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
+
+
+
+ def _collect_historical_data(self):
+ '''
+ We collect all csv files from January 1996 until the month
+ before now. The argument in the url to download are of the
+ form 201001 for January 2010. We start by computing all these
+ patterns, in historical list.
+ '''
+ # List of year-months to consider
+ historical = []
+ date_end = self._end
+ for year in range(self._start.year, date_end.year+1):
+ for month in range(1,13):
+ date = datetime(year, month, 1)
+ if date >= self._start and date <= date_end:
+ historical.append(date.strftime("%Y%m"))
+
+ # We download all csv files from meteofrance that are not in
+ # the data repository
+ meteo_data = self._data_directory / 'historical'
+ p = Path(meteo_data)
+ p.mkdir(exist_ok=True, parents=True)
+ for date in historical:
+ if not isfile(meteo_data / ('synop.'+date+'.csv')):
+ link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
+ link += date + '.csv.gz'
+ download_path = meteo_data / basename(link)
+ urlretrieve(link, download_path)
+ with gzip.open(download_path, 'rb') as f:
+ csv_file = meteo_data / basename(link[:-3])
+ with open(csv_file, 'w') as g:
+ g.write(f.read().decode())
+ remove(meteo_data / basename(link))
- def _get_feature(self):
+ def update(self):
'''
- TODO
+ Update the MeteoFrance features with the last available data
'''
- pass
+ # We collect archive files from MeteoFrance, until the current month
+ # by using the same method than for data generation : this is currently
+ # based on the presence of a synop.+date+.csv' file in the
+ # data/meteo_france/historical directory. The file corresponding to the
+ # current month is deleted first, so that its most recent version will
+ # be downloaded by calling self._collect_historical_data
+
+ logger.info('Update historical csv files from MeteoFrance, if needed')
+ today = datetime.now()
+ todel = 'synop.'+today.strftime("%Y%m")+".csv"
+ try:
+ remove(self._data_directory / 'historical' / todel)
+ except:
+ logger.warning(f"{self._data_directory / 'historical' / todel} not found")
+ system("touch "+todel)
+ self._collect_historical_data()
+
+
+
+ @property
+ def dated_features(self):
+ '''
+ If the attribute dated_features is None, then we create it: a dictionary
+ with datestamps as keys, and {features: values} as values.
+ - considered features are the ones from meteofrance_features.csv, found
+ in config/features/meteofrance directory
+ - only the closest meteo stations are considered
+
+ Returns:
+ dict: the dictionary of features per datestamp
+ '''
+ if self._dated_features == None:
+ csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance_features.csv'
+ logger.info(f'Collecting meteo feature information from {csv_file}')
+ # A dictionary for the features
+ with open(csv_file, "r") as f:
+ reader = DictReader(f, delimiter=',')
+ dico_features = {row["abbreviation"]:
+ {
+ 'name': row['name'], # feature name
+ 'type': row['type'] # qualitative (2) or quantitative (1)
+ }
+ for row in reader if row['name'] in self._features}
+ dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical'
+ self._dated_features = {}
+ for csv_meteo in listdir(dir_data):
+ date = datetime.strptime(csv_meteo.split('.')[1], '%Y%m')
+ if date >= self._start and date <= self._end:
+ logger.info(f'Inserting {csv_meteo} in intervention dictionary')
+ with open(dir_data / csv_meteo, "r") as f:
+ reader = DictReader(f, delimiter=';')
+ for row in reader:
+ if row['numer_sta'] in self._stations:
+ date = datetime.strptime(row['date'], '%Y%m%d%H%M%S')
+ self._dated_features.setdefault(date,{}).update({dico_features[feat]['name']+'_'+str(self._stations.index(row['numer_sta'])): eval(row[feat].replace('mq','None')) for feat in dico_features})
+ return self._dated_features
+