-from extomeAI.lib.connector import PostgreSQLDBConnection
-
from configparser import ConfigParser
from csv import DictReader
-from datetime import datetime, timedelta
-from os import remove, system, listdir
-from pathlib import Path
-from shutil import rmtree
-from timezonefinder import TimezoneFinder
-
+from datetime import datetime
+from geopy.distance import vincenty
+from logging import getLogger
from logging.config import fileConfig
+from os import listdir, remove, system
from os.path import isfile, basename
+from pathlib import Path
+from shutil import rmtree
from urllib.request import urlretrieve
-import logging
import gzip
-import pytz
-import tempfile
+
fileConfig((Path.cwd() / 'config') / 'logging.cfg')
-logger = logging.getLogger()
+logger = getLogger()
class MeteoFrance:
- def __init__(self):
+
+ def __init__(self, latitude = 47.25, longitude = 6.0333, nb_stations = 3,
+ start = datetime.strptime('19960101000000', '%Y%m%d%H%M%S'),
+ end = datetime.now(),
+ features = []):
+ '''
+ Constructor of the MeteoFrance source of feature.
+
+ - It will reinitiate the data directory, if asked in the config
+ features.cfg file.
+ - It searches for the nb_stations meteo stations closest to the provided
+ point (longitude and latitude)
+
+ For more information about this source of feature, see:
+ https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
+
+ Parameters:
+ latitude (float): The latitude from which we want the meteo features.
+ longitude (float): The longitude from which we want the meteo features.
+ nb_stations (int): Number of closest stations to consider.
+ features (list): Weather features that have to be integrated, according
+ to their names in meteofrance_features.csv (cf. config directory)
+
'''
- Constructor
- See: https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
- '''
- self.__data_directory = (Path.cwd() / 'data') / 'meteo_france'
+ self._latitude = latitude
+ self._longitude = longitude
+ self._nb_stations = nb_stations
+ self._start = start
+ self._end = end
+ self._features = features
+
+ self._data_directory = (Path.cwd() / 'data') / 'meteo_france'
+
+ self._dated_features = None
+
# Re-creating data directory architecture for MeteoFrance, if asked
config = ConfigParser()
config.read((Path.cwd() / 'config') / 'features.cfg')
if eval(config['meteofrance']['regenerate']):
- logger.info("Regenerating meteofrance data directory")
- try:
- rmtree(self.__data_directory)
- except:
- pass
- p = Path(self.__data_directory / 'historical')
- p.mkdir(exist_ok=True, parents=True)
- p = Path(self.__data_directory / 'config')
- p.mkdir(exist_ok=True, parents=True)
- if eval(config['meteofrance']['reinsert']):
- logger.info("Reinserting meteofrance database")
- with PostgreSQLDBConnection.Instance() as db:
- db.cursor.execute(f'DELETE FROM "METEO_FEATURE";')
- db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_METFT_ID_seq" RESTART WITH 1;')
- db.cursor.execute(f'DELETE FROM "METEO_STATION";')
- db.cursor.execute(f'ALTER SEQUENCE "METEO_STATION_METST_ID_seq" RESTART WITH 1;')
- db.cursor.execute(f'DELETE FROM "METEO_FEATURE_VALUE";')
- db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_VALUE_METVA_ID_seq" RESTART WITH 1;')
- self.__generate()
-
-
- def __collect_stations(self):
+ self._regenerate_directory()
+
+ # Collecting the closest meteo station
+ self._stations = self._get_stations()
+
+
+
+ def _regenerate_directory(self):
'''
- Filling METEO_STATION table from location schema
+ Re-creating data directory architecture for MeteoFrance
'''
- tf = TimezoneFinder(in_memory=True)
-
- with PostgreSQLDBConnection.Instance() as db:
- link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
- p = Path(self.__data_directory / 'config' )
- csv_file = p / basename(link)
- if not isfile(csv_file):
- logger.info('Downloading location stations from MeteoFrance')
- urlretrieve(link, csv_file)
- with open(csv_file, "r") as f:
- reader = DictReader(f, delimiter=';')
- logger.info(f'Inserting location stations in {db.dbname} database')
- for row in reader:
- longitude, latitude = eval(row['Longitude']), eval(row['Latitude'])
- point = (longitude, latitude)
- timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude']))
- if timezone_name is None:
- timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']),
- lat = eval(row['Latitude']),
- delta_degree = 5)
- cet = pytz.timezone(timezone_name)
- dt = datetime.now()
- offset = cet.utcoffset(dt, is_dst = True)
- shift = int(offset / timedelta(hours=1))
- request = f"""INSERT INTO "METEO_STATION" ("METST_NAME", "METST_IDNAME", "METST_LOCATION", "METST_TIMEZONE")
- VALUES ('{row['Nom'].replace("'",'’')}', '{row['ID']}',
- point({row['Latitude']}, {row['Longitude']}), {shift});"""
- db.cursor.execute(request)
-
-
- def __insert_features(self):
- logger.info('Inserting MeteoFrance list of features from meteo_features.csv')
- csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv'
- with PostgreSQLDBConnection.Instance() as db:
- with open(csv_file, "r") as f:
- reader = DictReader(f, delimiter=',')
- next(reader)
- for row in reader:
- request = f"""INSERT INTO "METEO_FEATURE" ("METFT_NAME", "PARAM_ID_PARAMETER")
- VALUES ('{row['METFT_NAME']}', {row['PARAM_ID_PARAMETER']});"""
- db.cursor.execute(request)
-
-
-
- def __collect_historical_data(self):
+ logger.info("Regenerating meteofrance data directory")
+ try:
+ rmtree(self._data_directory)
+ except:
+ pass
+ p = Path(self._data_directory / 'historical')
+ p.mkdir(exist_ok=True, parents=True)
+ p = Path(self._data_directory / 'config')
+ p.mkdir(exist_ok=True, parents=True)
+
+
+
+ def _get_stations(self):
+ '''
+ Collect (after downloading them, if needed) the stations and their
+ locations in a dictionary
+
+ Returns:
+ list: The self._nb_stations closest station IDs, starting by the
+ closest one
+ '''
+ # The csv file of meteo stations (names, ids and locations) if downloaded,
+ # if not available in the config directory within data / meteo_france
+ link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
+ p = Path(self._data_directory / 'config' )
+ csv_file = p / basename(link)
+ if not isfile(csv_file):
+ logger.info('Downloading location stations from MeteoFrance')
+ urlretrieve(link, csv_file)
+
+ # A dictionary for the meteo stations is created
+ self._dict_stations = {}
+ logger.info('Collecting information about meteo stations')
+ with open(csv_file, "r") as f:
+ reader = DictReader(f, delimiter=';')
+ for row in reader:
+ latitude, longitude = eval(row['Latitude']), eval(row['Longitude'])
+ self._dict_stations[row['Nom'].replace("'",'’')] = {
+ 'id' : row['ID'],
+ 'longitude' : longitude,
+ 'latitude' : latitude,
+ 'distance' : vincenty(
+ (self._latitude, self._longitude),
+ (latitude, longitude)).km
+ }
+
+ # Find the closest stations
+ logger.info('Finding the closest stations')
+ stations_by_distance = sorted(self._dict_stations.keys(),
+ key = lambda x: self._dict_stations[x]['distance'])
+ logger.info(f'The {self._nb_stations} closest stations are: '
+ f'{", ".join(stations_by_distance[:self._nb_stations])}.')
+ return [self._dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
+
+
+
+ def _collect_historical_data(self):
'''
We collect all csv files from January 1996 until the month
before now. The argument in the url to download are of the
'''
# List of year-months to consider
historical = []
- date_end = datetime.now()
- for year in range(1996, date_end.year+1):
+ date_end = self._end
+ for year in range(self._start.year, date_end.year+1):
for month in range(1,13):
date = datetime(year, month, 1)
- if date <= date_end:
+ if date >= self._start and date <= date_end:
historical.append(date.strftime("%Y%m"))
-
- # We download all csv files from meteofrance that are not in
+
+ # We download all csv files from meteofrance that are not in
# the data repository
- meteo_data = self.__data_directory / 'historical'
+ meteo_data = self._data_directory / 'historical'
p = Path(meteo_data)
- p.mkdir(exist_ok=True, parents=True)
+ p.mkdir(exist_ok=True, parents=True)
for date in historical:
if not isfile(meteo_data / ('synop.'+date+'.csv')):
link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
- link += date + '.csv.gz'
+ link += date + '.csv.gz'
download_path = meteo_data / basename(link)
urlretrieve(link, download_path)
with gzip.open(download_path, 'rb') as f:
with open(csv_file, 'w') as g:
g.write(f.read().decode())
remove(meteo_data / basename(link))
-
-
- def __from_date_to_datetz(self, date, a, b):
- if not hasattr(self, '__meteo_station_tz'):
- self.__meteo_station_tz = {}
- tf = TimezoneFinder(in_memory=True)
- with PostgreSQLDBConnection.Instance() as db:
- db.cursor.execute('select "METST_IDNAME", "METST_LOCATION" from "METEO_STATION";')
- list_of_rows = db.cursor.fetchall()
- for k in list_of_rows:
- print('\n',k)
- longitude, latitude = eval(k[1])
- print(longitude, latitude)
- print(type(longitude))
- timezone_name = tf.timezone_at(lng = longitude, lat = latitude)
- if timezone_name is None:
- timezone_name = tf.closest_timezone_at(lng = longitude,
- lat = latitude,
- delta_degree = 13,
- exact_computation=True,
- #return_distances=True,
- force_evaluation=True)
- cet = pytz.timezone(timezone_name)
- dt = datetime.now()
- offset = cet.utcoffset(dt, is_dst = True)
- shift = int(offset / timedelta(hours=1))
- self.__meteo_station_tz[k[0]] = shift
-
- print(self.__meteo_station_tz)
- exit()
- '''longitude, latitude = eval(row['Longitude']), eval(row['Latitude'])
- point = (longitude, latitude)
- timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude']))
- if timezone_name is None:
- timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']),
- lat = eval(row['Latitude']),
- delta_degree = 5)
- cet = pytz.timezone(timezone_name)
- dt = datetime.now()
- offset = cet.utcoffset(dt, is_dst = True)
- shift = int(offset / timedelta(hours=1))
-
- self.__meteo_station_tz'''
- exit()
- return date[:4]+'-'+date[4:6]+'-'+date[6:8]+' '+date[8:10]+':00:00+01'
-
-
- def __insert_historical_data(self):
- csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv'
- with PostgreSQLDBConnection.Instance() as db:
- db.cursor.execute('SELECT * from "METEO_FEATURE";')
- list_of_rows = db.cursor.fetchall()
- dico = {u[1]:u[0] for u in list_of_rows}
-
- with open(csv_file, "r") as f:
- reader = DictReader(f, delimiter=',')
- next(reader)
- dico_features = {row["abbreviation"]:dico[row["METFT_NAME"]] for row in reader}
-
- with PostgreSQLDBConnection.Instance() as db:
- db.cursor.execute('SELECT * from "METEO_STATION";')
- list_of_rows = db.cursor.fetchall()
- dico_station = {u[2]:u[0] for u in list_of_rows}
-
- for feature in dico_features:
- logger.info(f'Integrating {[u for u in dico if dico[u]==dico_features[feature]][0]} feature')
- for station in dico_station:
- logger.info(f' - Dealing with meteo station n°: {station}')
- csv_file = tempfile.NamedTemporaryFile('w')
- dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical'
- for csv_meteo in listdir(dir_data):
- with open(dir_data / csv_meteo, "r") as f:
- reader = DictReader(f, delimiter=';')
- csv_file.write(''.join([row[feature]+",'"+self.__from_date_to_datetz(row["date"], station, dico_station[station])+"',"+str(dico_features[feature])+','+str(dico_station[station])+'\n' for row in reader if row['numer_sta'] == station]))
- csv_file.flush()
- with open(csv_file.name, 'r') as f:
- with PostgreSQLDBConnection.Instance() as db:
- db.cursor.copy_from(f, '"METEO_FEATURE_VALUE"', sep=',', null='mq',
- columns=['"METVA_VALUE"','"METVA_DATETIME"','"METFT_ID_METEO_FEATURE"','"METST_ID_METEO_STATION"'])
-
-
-
-
-
-
-
- def __generate(self):
- # Meteo stations must be collected first, if not in the database
- with PostgreSQLDBConnection.Instance() as db:
- db.cursor.execute('SELECT count(*) FROM "METEO_STATION";')
- updated_meteo_station = db.cursor.fetchone()[0]
- if not updated_meteo_station:
- self.__collect_stations()
-
- # Features from data/meteo_france/config/meteo_features.csv
- # must be inserted in the database, if not already done
- with PostgreSQLDBConnection.Instance() as db:
- db.cursor.execute('SELECT count(*) FROM "METEO_FEATURE";')
- updated_meteo_features = db.cursor.fetchone()[0]
- if not updated_meteo_features:
- self.__insert_features()
-
- # Downloading meteofrance historical csv files
- logger.info('Downloading historical csv files from MeteoFrance, if needed')
- self.__collect_historical_data()
-
- self.__insert_historical_data()
+
+
def update(self):
'''
'''
# We collect archive files from MeteoFrance, until the current month
# by using the same method than for data generation : this is currently
- # based on the presence of a synop.+date+.csv' file in the
- # data/meteo_france/historical directory. The file corresponding to the
- # current month is deleted first, so that its most recent version will be downloaded
- # by colling self.__collect_historical_data
- # TODO: updates according to a PostgreSQL request ?
+ # based on the presence of a synop.+date+.csv' file in the
+ # data/meteo_france/historical directory. The file corresponding to the
+ # current month is deleted first, so that its most recent version will
+ # be downloaded by calling self._collect_historical_data
+
logger.info('Update historical csv files from MeteoFrance, if needed')
today = datetime.now()
todel = 'synop.'+today.strftime("%Y%m")+".csv"
- remove(self.__data_directory / 'historical' / todel)
+ try:
+ remove(self._data_directory / 'historical' / todel)
+ except:
+ logger.warning(f"{self._data_directory / 'historical' / todel} not found")
system("touch "+todel)
- self.__collect_historical_data()
- logger.info('Inserting csv files in database')
- self.__insert_historical_data()
-
+ self._collect_historical_data()
+
+
+
+ @property
+ def dated_features(self):
+ '''
+ If the attribute dated_features is None, then we create it: a dictionary
+ with datestamps as keys, and {features: values} as values.
+ - considered features are the ones from meteofrance_features.csv, found
+ in config/features/meteofrance directory
+ - only the closest meteo stations are considered
+
+ Returns:
+ dict: the dictionary of features per datestamp
+ '''
+ if self._dated_features == None:
+ csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance_features.csv'
+ logger.info(f'Collecting meteo feature information from {csv_file}')
+ # A dictionary for the features
+ with open(csv_file, "r") as f:
+ reader = DictReader(f, delimiter=',')
+ dico_features = {row["abbreviation"]:
+ {
+ 'name': row['name'], # feature name
+ 'type': row['type'] # qualitative (2) or quantitative (1)
+ }
+ for row in reader if row['name'] in self._features}
+ dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical'
+ self._dated_features = {}
+ for csv_meteo in listdir(dir_data):
+ date = datetime.strptime(csv_meteo.split('.')[1], '%Y%m')
+ if date >= self._start and date <= self._end:
+ logger.info(f'Inserting {csv_meteo} in intervention dictionary')
+ with open(dir_data / csv_meteo, "r") as f:
+ reader = DictReader(f, delimiter=';')
+ for row in reader:
+ if row['numer_sta'] in self._stations:
+ date = datetime.strptime(row['date'], '%Y%m%d%H%M%S')
+ self._dated_features.setdefault(date,{}).update({dico_features[feat]['name']+'_'+str(self._stations.index(row['numer_sta'])): eval(row[feat].replace('mq','None')) for feat in dico_features})
+ return self._dated_features
+