X-Git-Url: https://bilbo.iut-bm.univ-fcomte.fr/and/gitweb/predictops.git/blobdiff_plain/34192be628efe7e95de1abb5e14253899a6081fd..dc10b29095dc3e2d74b82b208fd4f4677e6a9c30:/lib/source/meteofrance.py?ds=inline diff --git a/lib/source/meteofrance.py b/lib/source/meteofrance.py index 315aac3..cc2eff1 100644 --- a/lib/source/meteofrance.py +++ b/lib/source/meteofrance.py @@ -1,18 +1,19 @@ from configparser import ConfigParser from csv import DictReader -from geopy import distance -from pathlib import Path -from shutil import rmtree - +from datetime import datetime +from geopy.distance import vincenty +from logging import getLogger from logging.config import fileConfig +from os import listdir, remove, system from os.path import isfile, basename +from pathlib import Path +from shutil import rmtree from urllib.request import urlretrieve -import logging - +import gzip fileConfig((Path.cwd() / 'config') / 'logging.cfg') -logger = logging.getLogger() +logger = getLogger() class MeteoFrance: @@ -40,6 +41,8 @@ class MeteoFrance: self._data_directory = (Path.cwd() / 'data') / 'meteo_france' + self._dated_features = None + # Re-creating data directory architecture for MeteoFrance, if asked config = ConfigParser() config.read((Path.cwd() / 'config') / 'features.cfg') @@ -48,7 +51,6 @@ class MeteoFrance: # Collecting the closest meteo station self._stations = self._get_stations() - print(self._stations) @@ -87,33 +89,123 @@ class MeteoFrance: urlretrieve(link, csv_file) # A dictionary for the meteo stations is created - dict_stations = {} + self._dict_stations = {} logger.info('Collecting information about meteo stations') with open(csv_file, "r") as f: reader = DictReader(f, delimiter=';') for row in reader: latitude, longitude = eval(row['Latitude']), eval(row['Longitude']) - dict_stations[row['Nom'].replace("'",'’')] = { + self._dict_stations[row['Nom'].replace("'",'’')] = { 'id' : row['ID'], 'longitude' : longitude, 'latitude' : latitude, - 'distance' : distance.vincenty( + 'distance' : vincenty( (self._latitude, self._longitude), (latitude, longitude)).km } # Find the closest stations logger.info('Finding the closest stations') - stations_by_distance = sorted(dict_stations.keys(), - key = lambda x: dict_stations[x]['distance']) + stations_by_distance = sorted(self._dict_stations.keys(), + key = lambda x: self._dict_stations[x]['distance']) logger.info(f'The {self._nb_stations} closest stations are: ' f'{", ".join(stations_by_distance[:self._nb_stations])}.') - return [dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations] + return [self._dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations] - def _get_feature(self): + def _collect_historical_data(self): + ''' + We collect all csv files from January 1996 until the month + before now. The argument in the url to download are of the + form 201001 for January 2010. We start by computing all these + patterns, in historical list. ''' - TODO + # List of year-months to consider + historical = [] + date_end = datetime.now() + for year in range(1996, date_end.year+1): + for month in range(1,13): + date = datetime(year, month, 1) + if date <= date_end: + historical.append(date.strftime("%Y%m")) + + # We download all csv files from meteofrance that are not in + # the data repository + meteo_data = self._data_directory / 'historical' + p = Path(meteo_data) + p.mkdir(exist_ok=True, parents=True) + for date in historical: + if not isfile(meteo_data / ('synop.'+date+'.csv')): + link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.' + link += date + '.csv.gz' + download_path = meteo_data / basename(link) + urlretrieve(link, download_path) + with gzip.open(download_path, 'rb') as f: + csv_file = meteo_data / basename(link[:-3]) + with open(csv_file, 'w') as g: + g.write(f.read().decode()) + remove(meteo_data / basename(link)) + + + + def update(self): ''' - pass + Update the MeteoFrance features with the last available data + ''' + # We collect archive files from MeteoFrance, until the current month + # by using the same method than for data generation : this is currently + # based on the presence of a synop.+date+.csv' file in the + # data/meteo_france/historical directory. The file corresponding to the + # current month is deleted first, so that its most recent version will + # be downloaded by calling self._collect_historical_data + + logger.info('Update historical csv files from MeteoFrance, if needed') + today = datetime.now() + todel = 'synop.'+today.strftime("%Y%m")+".csv" + try: + remove(self._data_directory / 'historical' / todel) + except: + logger.warning(f"{self._data_directory / 'historical' / todel} not found") + system("touch "+todel) + self._collect_historical_data() + + + + @property + def dated_features(self): + ''' + If the attribute dated_features is None, then we create it: a dictionary + with datestamps as keys, and {features: values} as values. + - considered features are the ones from meteofrance_features.csv, found + in config/features/meteofrance directory + - only the closest meteo stations are considered + + Returns: + dict: the dictionary of features per datestamp + ''' + if self._dated_features == None: + csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv' + logger.info(f'Collecting meteo feature information from {csv_file}') + # A dictionary for the features + with open(csv_file, "r") as f: + reader = DictReader(f, delimiter=',') + next(reader) + dico_features = {row["abbreviation"]: + { + 'name': row['name'], # feature name + 'type': row['type'] # qualitative (2) or quantitative (1) + } + for row in reader} + + dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical' + self._dated_features = {} + for csv_meteo in listdir(dir_data): + logger.info(f'Inserting {csv_meteo} in intervention dictionary') + with open(dir_data / csv_meteo, "r") as f: + reader = DictReader(f, delimiter=';') + for row in reader: + if row['numer_sta'] in self._stations: + self._dated_features.setdefault(row['date'],{}).update({dico_features[feat]['name']+'_'+str(self._stations.index(row['numer_sta'])): eval(row[feat].replace('mq','None')) for feat in dico_features}) + return self._dated_features +