X-Git-Url: https://bilbo.iut-bm.univ-fcomte.fr/and/gitweb/predictops.git/blobdiff_plain/844b558b71ac568d904e8845ce203d31b68c776d..6a7cf0e5dd962e7da028cb0d82c6682ac87b0540:/lib/source/meteofrance.py diff --git a/lib/source/meteofrance.py b/lib/source/meteofrance.py index 79c2546..3f1c3eb 100644 --- a/lib/source/meteofrance.py +++ b/lib/source/meteofrance.py @@ -1,106 +1,120 @@ -from extomeAI.lib.connector import PostgreSQLDBConnection - from configparser import ConfigParser from csv import DictReader -from datetime import datetime, timedelta -from os import remove, system, listdir -from pathlib import Path -from shutil import rmtree -from timezonefinder import TimezoneFinder - +from datetime import datetime +from geopy.distance import vincenty +from logging import getLogger from logging.config import fileConfig +from os import listdir, remove, system from os.path import isfile, basename +from pathlib import Path +from shutil import rmtree from urllib.request import urlretrieve -import logging import gzip -import pytz -import tempfile fileConfig((Path.cwd() / 'config') / 'logging.cfg') -logger = logging.getLogger() +logger = getLogger() class MeteoFrance: - def __init__(self): + + def __init__(self, latitude = 47.25, longitude = 6.0333, nb_stations = 3): + ''' + Constructor of the MeteoFrance source of feature. + + - It will reinitiate the data directory, if asked in the config + features.cfg file. + - It searches for the nb_stations meteo stations closest to the provided + point (longitude and latitude) + + For more information about this source of feature, see: + https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32 + + Parameters: + latitude (float): The latitude from which we want the meteo features. + longitude (float): The longitude from which we want the meteo features. + nb_stations (int): Number of closest stations to consider. + ''' - Constructor - See: https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32 - ''' - self.__data_directory = (Path.cwd() / 'data') / 'meteo_france' + self._latitude = latitude + self._longitude = longitude + self._nb_stations = nb_stations + + self._data_directory = (Path.cwd() / 'data') / 'meteo_france' + + self._dated_features = None + # Re-creating data directory architecture for MeteoFrance, if asked config = ConfigParser() config.read((Path.cwd() / 'config') / 'features.cfg') if eval(config['meteofrance']['regenerate']): - logger.info("Regenerating meteofrance data directory") - try: - rmtree(self.__data_directory) - except: - pass - p = Path(self.__data_directory / 'historical') - p.mkdir(exist_ok=True, parents=True) - p = Path(self.__data_directory / 'config') - p.mkdir(exist_ok=True, parents=True) - if eval(config['meteofrance']['reinsert']): - logger.info("Reinserting meteofrance database") - with PostgreSQLDBConnection.Instance() as db: - db.cursor.execute(f'DELETE FROM "METEO_FEATURE";') - db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_METFT_ID_seq" RESTART WITH 1;') - db.cursor.execute(f'DELETE FROM "METEO_STATION";') - db.cursor.execute(f'ALTER SEQUENCE "METEO_STATION_METST_ID_seq" RESTART WITH 1;') - db.cursor.execute(f'DELETE FROM "METEO_FEATURE_VALUE";') - db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_VALUE_METVA_ID_seq" RESTART WITH 1;') - self.__generate() - - - def __collect_stations(self): + self._regenerate_directory() + + # Collecting the closest meteo station + self._stations = self._get_stations() + + + + def _regenerate_directory(self): ''' - Filling METEO_STATION table from location schema + Re-creating data directory architecture for MeteoFrance ''' - tf = TimezoneFinder(in_memory=True) - - with PostgreSQLDBConnection.Instance() as db: - link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv' - p = Path(self.__data_directory / 'config' ) - csv_file = p / basename(link) - if not isfile(csv_file): - logger.info('Downloading location stations from MeteoFrance') - urlretrieve(link, csv_file) - with open(csv_file, "r") as f: - reader = DictReader(f, delimiter=';') - logger.info(f'Inserting location stations in {db.dbname} database') - for row in reader: - longitude, latitude = eval(row['Longitude']), eval(row['Latitude']) - point = (longitude, latitude) - timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude'])) - if timezone_name is None: - timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']), - lat = eval(row['Latitude']), - delta_degree = 5) - cet = pytz.timezone(timezone_name) - dt = datetime.now() - offset = cet.utcoffset(dt, is_dst = True) - shift = int(offset / timedelta(hours=1)) - request = f"""INSERT INTO "METEO_STATION" ("METST_NAME", "METST_IDNAME", "METST_LOCATION", "METST_TIMEZONE") - VALUES ('{row['Nom'].replace("'",'’')}', '{row['ID']}', - point({row['Latitude']}, {row['Longitude']}), {shift});""" - db.cursor.execute(request) - - - def __insert_features(self): - logger.info('Inserting MeteoFrance list of features from meteo_features.csv') - csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv' - with PostgreSQLDBConnection.Instance() as db: - with open(csv_file, "r") as f: - reader = DictReader(f, delimiter=',') - next(reader) - for row in reader: - request = f"""INSERT INTO "METEO_FEATURE" ("METFT_NAME", "PARAM_ID_PARAMETER") - VALUES ('{row['METFT_NAME']}', {row['PARAM_ID_PARAMETER']});""" - db.cursor.execute(request) - - - - def __collect_historical_data(self): + logger.info("Regenerating meteofrance data directory") + try: + rmtree(self._data_directory) + except: + pass + p = Path(self._data_directory / 'historical') + p.mkdir(exist_ok=True, parents=True) + p = Path(self._data_directory / 'config') + p.mkdir(exist_ok=True, parents=True) + + + + def _get_stations(self): + ''' + Collect (after downloading them, if needed) the stations and their + locations in a dictionary + + Returns: + list: The self._nb_stations closest station IDs, starting by the + closest one + ''' + # The csv file of meteo stations (names, ids and locations) if downloaded, + # if not available in the config directory within data / meteo_france + link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv' + p = Path(self._data_directory / 'config' ) + csv_file = p / basename(link) + if not isfile(csv_file): + logger.info('Downloading location stations from MeteoFrance') + urlretrieve(link, csv_file) + + # A dictionary for the meteo stations is created + self._dict_stations = {} + logger.info('Collecting information about meteo stations') + with open(csv_file, "r") as f: + reader = DictReader(f, delimiter=';') + for row in reader: + latitude, longitude = eval(row['Latitude']), eval(row['Longitude']) + self._dict_stations[row['Nom'].replace("'",'’')] = { + 'id' : row['ID'], + 'longitude' : longitude, + 'latitude' : latitude, + 'distance' : vincenty( + (self._latitude, self._longitude), + (latitude, longitude)).km + } + + # Find the closest stations + logger.info('Finding the closest stations') + stations_by_distance = sorted(self._dict_stations.keys(), + key = lambda x: self._dict_stations[x]['distance']) + logger.info(f'The {self._nb_stations} closest stations are: ' + f'{", ".join(stations_by_distance[:self._nb_stations])}.') + return [self._dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations] + + + + def _collect_historical_data(self): ''' We collect all csv files from January 1996 until the month before now. The argument in the url to download are of the @@ -115,16 +129,16 @@ class MeteoFrance: date = datetime(year, month, 1) if date <= date_end: historical.append(date.strftime("%Y%m")) - - # We download all csv files from meteofrance that are not in + + # We download all csv files from meteofrance that are not in # the data repository - meteo_data = self.__data_directory / 'historical' + meteo_data = self._data_directory / 'historical' p = Path(meteo_data) - p.mkdir(exist_ok=True, parents=True) + p.mkdir(exist_ok=True, parents=True) for date in historical: if not isfile(meteo_data / ('synop.'+date+'.csv')): link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.' - link += date + '.csv.gz' + link += date + '.csv.gz' download_path = meteo_data / basename(link) urlretrieve(link, download_path) with gzip.open(download_path, 'rb') as f: @@ -132,113 +146,8 @@ class MeteoFrance: with open(csv_file, 'w') as g: g.write(f.read().decode()) remove(meteo_data / basename(link)) - - - def __from_date_to_datetz(self, date, a, b): - if not hasattr(self, '__meteo_station_tz'): - self.__meteo_station_tz = {} - tf = TimezoneFinder(in_memory=True) - with PostgreSQLDBConnection.Instance() as db: - db.cursor.execute('select "METST_IDNAME", "METST_LOCATION" from "METEO_STATION";') - list_of_rows = db.cursor.fetchall() - for k in list_of_rows: - print('\n',k) - longitude, latitude = eval(k[1]) - print(longitude, latitude) - print(type(longitude)) - timezone_name = tf.timezone_at(lng = longitude, lat = latitude) - if timezone_name is None: - timezone_name = tf.closest_timezone_at(lng = longitude, - lat = latitude, - delta_degree = 13, - exact_computation=True, - #return_distances=True, - force_evaluation=True) - cet = pytz.timezone(timezone_name) - dt = datetime.now() - offset = cet.utcoffset(dt, is_dst = True) - shift = int(offset / timedelta(hours=1)) - self.__meteo_station_tz[k[0]] = shift - - print(self.__meteo_station_tz) - exit() - '''longitude, latitude = eval(row['Longitude']), eval(row['Latitude']) - point = (longitude, latitude) - timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude'])) - if timezone_name is None: - timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']), - lat = eval(row['Latitude']), - delta_degree = 5) - cet = pytz.timezone(timezone_name) - dt = datetime.now() - offset = cet.utcoffset(dt, is_dst = True) - shift = int(offset / timedelta(hours=1)) - - self.__meteo_station_tz''' - exit() - return date[:4]+'-'+date[4:6]+'-'+date[6:8]+' '+date[8:10]+':00:00+01' - - - def __insert_historical_data(self): - csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv' - with PostgreSQLDBConnection.Instance() as db: - db.cursor.execute('SELECT * from "METEO_FEATURE";') - list_of_rows = db.cursor.fetchall() - dico = {u[1]:u[0] for u in list_of_rows} - - with open(csv_file, "r") as f: - reader = DictReader(f, delimiter=',') - next(reader) - dico_features = {row["abbreviation"]:dico[row["METFT_NAME"]] for row in reader} - - with PostgreSQLDBConnection.Instance() as db: - db.cursor.execute('SELECT * from "METEO_STATION";') - list_of_rows = db.cursor.fetchall() - dico_station = {u[2]:u[0] for u in list_of_rows} - - for feature in dico_features: - logger.info(f'Integrating {[u for u in dico if dico[u]==dico_features[feature]][0]} feature') - for station in dico_station: - logger.info(f' - Dealing with meteo station n°: {station}') - csv_file = tempfile.NamedTemporaryFile('w') - dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical' - for csv_meteo in listdir(dir_data): - with open(dir_data / csv_meteo, "r") as f: - reader = DictReader(f, delimiter=';') - csv_file.write(''.join([row[feature]+",'"+self.__from_date_to_datetz(row["date"], station, dico_station[station])+"',"+str(dico_features[feature])+','+str(dico_station[station])+'\n' for row in reader if row['numer_sta'] == station])) - csv_file.flush() - with open(csv_file.name, 'r') as f: - with PostgreSQLDBConnection.Instance() as db: - db.cursor.copy_from(f, '"METEO_FEATURE_VALUE"', sep=',', null='mq', - columns=['"METVA_VALUE"','"METVA_DATETIME"','"METFT_ID_METEO_FEATURE"','"METST_ID_METEO_STATION"']) - - - - - - - - def __generate(self): - # Meteo stations must be collected first, if not in the database - with PostgreSQLDBConnection.Instance() as db: - db.cursor.execute('SELECT count(*) FROM "METEO_STATION";') - updated_meteo_station = db.cursor.fetchone()[0] - if not updated_meteo_station: - self.__collect_stations() - - # Features from data/meteo_france/config/meteo_features.csv - # must be inserted in the database, if not already done - with PostgreSQLDBConnection.Instance() as db: - db.cursor.execute('SELECT count(*) FROM "METEO_FEATURE";') - updated_meteo_features = db.cursor.fetchone()[0] - if not updated_meteo_features: - self.__insert_features() - - # Downloading meteofrance historical csv files - logger.info('Downloading historical csv files from MeteoFrance, if needed') - self.__collect_historical_data() - - self.__insert_historical_data() + + def update(self): ''' @@ -246,17 +155,49 @@ class MeteoFrance: ''' # We collect archive files from MeteoFrance, until the current month # by using the same method than for data generation : this is currently - # based on the presence of a synop.+date+.csv' file in the - # data/meteo_france/historical directory. The file corresponding to the - # current month is deleted first, so that its most recent version will be downloaded - # by colling self.__collect_historical_data - # TODO: updates according to a PostgreSQL request ? + # based on the presence of a synop.+date+.csv' file in the + # data/meteo_france/historical directory. The file corresponding to the + # current month is deleted first, so that its most recent version will + # be downloaded by calling self._collect_historical_data + logger.info('Update historical csv files from MeteoFrance, if needed') today = datetime.now() todel = 'synop.'+today.strftime("%Y%m")+".csv" - remove(self.__data_directory / 'historical' / todel) + try: + remove(self._data_directory / 'historical' / todel) + except: + logger.warning(f"{self._data_directory / 'historical' / todel} not found") system("touch "+todel) - self.__collect_historical_data() - logger.info('Inserting csv files in database') - self.__insert_historical_data() - + self._collect_historical_data() + + + + @property + def dated_features(self): + ''' + ''' + if self._dated_features == None: + csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv' + logger.info(f'Collecting meteo feature information from {csv_file}') + # A dictionary for the features + with open(csv_file, "r") as f: + reader = DictReader(f, delimiter=',') + next(reader) + dico_features = {row["abbreviation"]: + { + 'name': row['name'], # feature name + 'type': row['type'] # qualitative (2) or quantitative (1) + } + for row in reader} + + dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical' + self._dated_features = {} + for csv_meteo in listdir(dir_data): + logger.info(f'Inserting {csv_meteo} in intervention dictionary') + with open(dir_data / csv_meteo, "r") as f: + reader = DictReader(f, delimiter=';') + for row in reader: + if row['numer_sta'] in self._stations: + self._dated_features.setdefault(row['date'],{}).update({dico_features[feat]['name']+'_'+str(self._stations.index(row['numer_sta'])): eval(row[feat].replace('mq','None')) for feat in dico_features}) + return self._dated_features +