+from extomeAI.lib.connector import PostgreSQLDBConnection
+
+from configparser import ConfigParser
+from csv import DictReader
+from datetime import datetime, timedelta
+from os import remove, system, listdir
+from pathlib import Path
+from shutil import rmtree
+from timezonefinder import TimezoneFinder
+
+from logging.config import fileConfig
+from os.path import isfile, basename
+from urllib.request import urlretrieve
+
+import logging
+import gzip
+import pytz
+import tempfile
+
+fileConfig((Path.cwd() / 'config') / 'logging.cfg')
+logger = logging.getLogger()
+
+class MeteoFrance:
+ def __init__(self):
+ '''
+ Constructor
+ See: https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
+ '''
+ self.__data_directory = (Path.cwd() / 'data') / 'meteo_france'
+ # Re-creating data directory architecture for MeteoFrance, if asked
+ config = ConfigParser()
+ config.read((Path.cwd() / 'config') / 'features.cfg')
+ if eval(config['meteofrance']['regenerate']):
+ logger.info("Regenerating meteofrance data directory")
+ try:
+ rmtree(self.__data_directory)
+ except:
+ pass
+ p = Path(self.__data_directory / 'historical')
+ p.mkdir(exist_ok=True, parents=True)
+ p = Path(self.__data_directory / 'config')
+ p.mkdir(exist_ok=True, parents=True)
+ if eval(config['meteofrance']['reinsert']):
+ logger.info("Reinserting meteofrance database")
+ with PostgreSQLDBConnection.Instance() as db:
+ db.cursor.execute(f'DELETE FROM "METEO_FEATURE";')
+ db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_METFT_ID_seq" RESTART WITH 1;')
+ db.cursor.execute(f'DELETE FROM "METEO_STATION";')
+ db.cursor.execute(f'ALTER SEQUENCE "METEO_STATION_METST_ID_seq" RESTART WITH 1;')
+ db.cursor.execute(f'DELETE FROM "METEO_FEATURE_VALUE";')
+ db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_VALUE_METVA_ID_seq" RESTART WITH 1;')
+ self.__generate()
+
+
+ def __collect_stations(self):
+ '''
+ Filling METEO_STATION table from location schema
+ '''
+ tf = TimezoneFinder(in_memory=True)
+
+ with PostgreSQLDBConnection.Instance() as db:
+ link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
+ p = Path(self.__data_directory / 'config' )
+ csv_file = p / basename(link)
+ if not isfile(csv_file):
+ logger.info('Downloading location stations from MeteoFrance')
+ urlretrieve(link, csv_file)
+ with open(csv_file, "r") as f:
+ reader = DictReader(f, delimiter=';')
+ logger.info(f'Inserting location stations in {db.dbname} database')
+ for row in reader:
+ longitude, latitude = eval(row['Longitude']), eval(row['Latitude'])
+ point = (longitude, latitude)
+ timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude']))
+ if timezone_name is None:
+ timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']),
+ lat = eval(row['Latitude']),
+ delta_degree = 5)
+ cet = pytz.timezone(timezone_name)
+ dt = datetime.now()
+ offset = cet.utcoffset(dt, is_dst = True)
+ shift = int(offset / timedelta(hours=1))
+ request = f"""INSERT INTO "METEO_STATION" ("METST_NAME", "METST_IDNAME", "METST_LOCATION", "METST_TIMEZONE")
+ VALUES ('{row['Nom'].replace("'",'’')}', '{row['ID']}',
+ point({row['Latitude']}, {row['Longitude']}), {shift});"""
+ db.cursor.execute(request)
+
+
+ def __insert_features(self):
+ logger.info('Inserting MeteoFrance list of features from meteo_features.csv')
+ csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv'
+ with PostgreSQLDBConnection.Instance() as db:
+ with open(csv_file, "r") as f:
+ reader = DictReader(f, delimiter=',')
+ next(reader)
+ for row in reader:
+ request = f"""INSERT INTO "METEO_FEATURE" ("METFT_NAME", "PARAM_ID_PARAMETER")
+ VALUES ('{row['METFT_NAME']}', {row['PARAM_ID_PARAMETER']});"""
+ db.cursor.execute(request)
+
+
+
+ def __collect_historical_data(self):
+ '''
+ We collect all csv files from January 1996 until the month
+ before now. The argument in the url to download are of the
+ form 201001 for January 2010. We start by computing all these
+ patterns, in historical list.
+ '''
+ # List of year-months to consider
+ historical = []
+ date_end = datetime.now()
+ for year in range(1996, date_end.year+1):
+ for month in range(1,13):
+ date = datetime(year, month, 1)
+ if date <= date_end:
+ historical.append(date.strftime("%Y%m"))
+
+ # We download all csv files from meteofrance that are not in
+ # the data repository
+ meteo_data = self.__data_directory / 'historical'
+ p = Path(meteo_data)
+ p.mkdir(exist_ok=True, parents=True)
+ for date in historical:
+ if not isfile(meteo_data / ('synop.'+date+'.csv')):
+ link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
+ link += date + '.csv.gz'
+ download_path = meteo_data / basename(link)
+ urlretrieve(link, download_path)
+ with gzip.open(download_path, 'rb') as f:
+ csv_file = meteo_data / basename(link[:-3])
+ with open(csv_file, 'w') as g:
+ g.write(f.read().decode())
+ remove(meteo_data / basename(link))
+
+
+ def __from_date_to_datetz(self, date, a, b):
+ if not hasattr(self, '__meteo_station_tz'):
+ self.__meteo_station_tz = {}
+ tf = TimezoneFinder(in_memory=True)
+ with PostgreSQLDBConnection.Instance() as db:
+ db.cursor.execute('select "METST_IDNAME", "METST_LOCATION" from "METEO_STATION";')
+ list_of_rows = db.cursor.fetchall()
+ for k in list_of_rows:
+ print('\n',k)
+ longitude, latitude = eval(k[1])
+ print(longitude, latitude)
+ print(type(longitude))
+ timezone_name = tf.timezone_at(lng = longitude, lat = latitude)
+ if timezone_name is None:
+ timezone_name = tf.closest_timezone_at(lng = longitude,
+ lat = latitude,
+ delta_degree = 13,
+ exact_computation=True,
+ #return_distances=True,
+ force_evaluation=True)
+ cet = pytz.timezone(timezone_name)
+ dt = datetime.now()
+ offset = cet.utcoffset(dt, is_dst = True)
+ shift = int(offset / timedelta(hours=1))
+ self.__meteo_station_tz[k[0]] = shift
+
+ print(self.__meteo_station_tz)
+ exit()
+ '''longitude, latitude = eval(row['Longitude']), eval(row['Latitude'])
+ point = (longitude, latitude)
+ timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude']))
+ if timezone_name is None:
+ timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']),
+ lat = eval(row['Latitude']),
+ delta_degree = 5)
+ cet = pytz.timezone(timezone_name)
+ dt = datetime.now()
+ offset = cet.utcoffset(dt, is_dst = True)
+ shift = int(offset / timedelta(hours=1))
+
+ self.__meteo_station_tz'''
+ exit()
+ return date[:4]+'-'+date[4:6]+'-'+date[6:8]+' '+date[8:10]+':00:00+01'
+
+
+ def __insert_historical_data(self):
+ csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv'
+ with PostgreSQLDBConnection.Instance() as db:
+ db.cursor.execute('SELECT * from "METEO_FEATURE";')
+ list_of_rows = db.cursor.fetchall()
+ dico = {u[1]:u[0] for u in list_of_rows}
+
+ with open(csv_file, "r") as f:
+ reader = DictReader(f, delimiter=',')
+ next(reader)
+ dico_features = {row["abbreviation"]:dico[row["METFT_NAME"]] for row in reader}
+
+ with PostgreSQLDBConnection.Instance() as db:
+ db.cursor.execute('SELECT * from "METEO_STATION";')
+ list_of_rows = db.cursor.fetchall()
+ dico_station = {u[2]:u[0] for u in list_of_rows}
+
+ for feature in dico_features:
+ logger.info(f'Integrating {[u for u in dico if dico[u]==dico_features[feature]][0]} feature')
+ for station in dico_station:
+ logger.info(f' - Dealing with meteo station n°: {station}')
+ csv_file = tempfile.NamedTemporaryFile('w')
+ dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical'
+ for csv_meteo in listdir(dir_data):
+ with open(dir_data / csv_meteo, "r") as f:
+ reader = DictReader(f, delimiter=';')
+ csv_file.write(''.join([row[feature]+",'"+self.__from_date_to_datetz(row["date"], station, dico_station[station])+"',"+str(dico_features[feature])+','+str(dico_station[station])+'\n' for row in reader if row['numer_sta'] == station]))
+ csv_file.flush()
+ with open(csv_file.name, 'r') as f:
+ with PostgreSQLDBConnection.Instance() as db:
+ db.cursor.copy_from(f, '"METEO_FEATURE_VALUE"', sep=',', null='mq',
+ columns=['"METVA_VALUE"','"METVA_DATETIME"','"METFT_ID_METEO_FEATURE"','"METST_ID_METEO_STATION"'])
+
+
+
+
+
+
+
+ def __generate(self):
+ # Meteo stations must be collected first, if not in the database
+ with PostgreSQLDBConnection.Instance() as db:
+ db.cursor.execute('SELECT count(*) FROM "METEO_STATION";')
+ updated_meteo_station = db.cursor.fetchone()[0]
+ if not updated_meteo_station:
+ self.__collect_stations()
+
+ # Features from data/meteo_france/config/meteo_features.csv
+ # must be inserted in the database, if not already done
+ with PostgreSQLDBConnection.Instance() as db:
+ db.cursor.execute('SELECT count(*) FROM "METEO_FEATURE";')
+ updated_meteo_features = db.cursor.fetchone()[0]
+ if not updated_meteo_features:
+ self.__insert_features()
+
+ # Downloading meteofrance historical csv files
+ logger.info('Downloading historical csv files from MeteoFrance, if needed')
+ self.__collect_historical_data()
+
+ self.__insert_historical_data()
+
+ def update(self):
+ '''
+ Update the MeteoFrance features with the last available data
+ '''
+ # We collect archive files from MeteoFrance, until the current month
+ # by using the same method than for data generation : this is currently
+ # based on the presence of a synop.+date+.csv' file in the
+ # data/meteo_france/historical directory. The file corresponding to the
+ # current month is deleted first, so that its most recent version will be downloaded
+ # by colling self.__collect_historical_data
+ # TODO: updates according to a PostgreSQL request ?
+ logger.info('Update historical csv files from MeteoFrance, if needed')
+ today = datetime.now()
+ todel = 'synop.'+today.strftime("%Y%m")+".csv"
+ remove(self.__data_directory / 'historical' / todel)
+ system("touch "+todel)
+ self.__collect_historical_data()
+ logger.info('Inserting csv files in database')
+ self.__insert_historical_data()
+