From 34192be628efe7e95de1abb5e14253899a6081fd Mon Sep 17 00:00:00 2001 From: Christophe Guyeux Date: Mon, 10 Feb 2020 11:00:54 +0100 Subject: [PATCH] Nouvelle approche : csv -> dico -> dataframe, au lieu de postgresql, en cours. --- README.md | 6 +- config/features/parameters.csv | 4 - lib/source/__init__.py | 26 +-- lib/source/meteofrance.py | 317 +++++++++------------------------ main.py | 27 ++- requirements.txt | 5 +- 6 files changed, 106 insertions(+), 279 deletions(-) delete mode 100644 config/features/parameters.csv diff --git a/README.md b/README.md index b4cbde9..660b7ef 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ Creer un environnement : pip install virtualenv -virtualenv predictops +python -m venv ~/.venvs/predictops activer l'environnement : source ~/.venvs/predictops/bin/activate @@ -9,9 +9,9 @@ installer un package : pip install celery pip freeze > requirements.txt -Lancer +Lancer celery -A test_celery worker --loglevel=info -puis +puis python -m test_celery.run_tasks diff --git a/config/features/parameters.csv b/config/features/parameters.csv deleted file mode 100644 index 2d70417..0000000 --- a/config/features/parameters.csv +++ /dev/null @@ -1,4 +0,0 @@ -PARAM_NAME -Numerical -Categorical -Both diff --git a/lib/source/__init__.py b/lib/source/__init__.py index d2c2a0f..527538d 100644 --- a/lib/source/__init__.py +++ b/lib/source/__init__.py @@ -1,25 +1 @@ -from .meteofrance import MeteoFrance - -from extomeAI.lib.connector import PostgreSQLDBConnection - -from csv import DictReader -from logging.config import fileConfig -from pathlib import Path - -import logging - -fileConfig((Path.cwd() / 'config') / 'logging.cfg') -logger = logging.getLogger() - -with PostgreSQLDBConnection.Instance() as db: - db.cursor.execute('SELECT count(*) FROM "PARAMETER";') - nb_parameters = db.cursor.fetchone()[0] - if not nb_parameters: - logger.info('Inserting PARAMETER values from parameters.csv') - csv_file = Path.cwd() / 'config' / 'features' / 'parameters.csv' - with open(csv_file, "r") as f: - reader = DictReader(f, delimiter=',') - for row in reader: - request = f"""INSERT INTO "PARAMETER" ("PARAM_NAME") - VALUES ('{row['PARAM_NAME']}');""" - db.cursor.execute(request) +from .meteofrance import MeteoFrance \ No newline at end of file diff --git a/lib/source/meteofrance.py b/lib/source/meteofrance.py index 79c2546..315aac3 100644 --- a/lib/source/meteofrance.py +++ b/lib/source/meteofrance.py @@ -1,262 +1,119 @@ -from extomeAI.lib.connector import PostgreSQLDBConnection - from configparser import ConfigParser from csv import DictReader -from datetime import datetime, timedelta -from os import remove, system, listdir +from geopy import distance from pathlib import Path from shutil import rmtree -from timezonefinder import TimezoneFinder from logging.config import fileConfig from os.path import isfile, basename from urllib.request import urlretrieve import logging -import gzip -import pytz -import tempfile + fileConfig((Path.cwd() / 'config') / 'logging.cfg') logger = logging.getLogger() class MeteoFrance: - def __init__(self): - ''' - Constructor - See: https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32 - ''' - self.__data_directory = (Path.cwd() / 'data') / 'meteo_france' - # Re-creating data directory architecture for MeteoFrance, if asked - config = ConfigParser() - config.read((Path.cwd() / 'config') / 'features.cfg') - if eval(config['meteofrance']['regenerate']): - logger.info("Regenerating meteofrance data directory") - try: - rmtree(self.__data_directory) - except: - pass - p = Path(self.__data_directory / 'historical') - p.mkdir(exist_ok=True, parents=True) - p = Path(self.__data_directory / 'config') - p.mkdir(exist_ok=True, parents=True) - if eval(config['meteofrance']['reinsert']): - logger.info("Reinserting meteofrance database") - with PostgreSQLDBConnection.Instance() as db: - db.cursor.execute(f'DELETE FROM "METEO_FEATURE";') - db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_METFT_ID_seq" RESTART WITH 1;') - db.cursor.execute(f'DELETE FROM "METEO_STATION";') - db.cursor.execute(f'ALTER SEQUENCE "METEO_STATION_METST_ID_seq" RESTART WITH 1;') - db.cursor.execute(f'DELETE FROM "METEO_FEATURE_VALUE";') - db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_VALUE_METVA_ID_seq" RESTART WITH 1;') - self.__generate() - - def __collect_stations(self): - ''' - Filling METEO_STATION table from location schema + def __init__(self, latitude = 47.25, longitude = 6.0333, nb_stations = 3): ''' - tf = TimezoneFinder(in_memory=True) - - with PostgreSQLDBConnection.Instance() as db: - link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv' - p = Path(self.__data_directory / 'config' ) - csv_file = p / basename(link) - if not isfile(csv_file): - logger.info('Downloading location stations from MeteoFrance') - urlretrieve(link, csv_file) - with open(csv_file, "r") as f: - reader = DictReader(f, delimiter=';') - logger.info(f'Inserting location stations in {db.dbname} database') - for row in reader: - longitude, latitude = eval(row['Longitude']), eval(row['Latitude']) - point = (longitude, latitude) - timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude'])) - if timezone_name is None: - timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']), - lat = eval(row['Latitude']), - delta_degree = 5) - cet = pytz.timezone(timezone_name) - dt = datetime.now() - offset = cet.utcoffset(dt, is_dst = True) - shift = int(offset / timedelta(hours=1)) - request = f"""INSERT INTO "METEO_STATION" ("METST_NAME", "METST_IDNAME", "METST_LOCATION", "METST_TIMEZONE") - VALUES ('{row['Nom'].replace("'",'’')}', '{row['ID']}', - point({row['Latitude']}, {row['Longitude']}), {shift});""" - db.cursor.execute(request) + Constructor of the MeteoFrance source of feature. + - It will reinitiate the data directory, if asked in the config + features.cfg file. + - It searches for the nb_stations meteo stations closest to the provided + point (longitude and latitude) - def __insert_features(self): - logger.info('Inserting MeteoFrance list of features from meteo_features.csv') - csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv' - with PostgreSQLDBConnection.Instance() as db: - with open(csv_file, "r") as f: - reader = DictReader(f, delimiter=',') - next(reader) - for row in reader: - request = f"""INSERT INTO "METEO_FEATURE" ("METFT_NAME", "PARAM_ID_PARAMETER") - VALUES ('{row['METFT_NAME']}', {row['PARAM_ID_PARAMETER']});""" - db.cursor.execute(request) - - - - def __collect_historical_data(self): - ''' - We collect all csv files from January 1996 until the month - before now. The argument in the url to download are of the - form 201001 for January 2010. We start by computing all these - patterns, in historical list. - ''' - # List of year-months to consider - historical = [] - date_end = datetime.now() - for year in range(1996, date_end.year+1): - for month in range(1,13): - date = datetime(year, month, 1) - if date <= date_end: - historical.append(date.strftime("%Y%m")) - - # We download all csv files from meteofrance that are not in - # the data repository - meteo_data = self.__data_directory / 'historical' - p = Path(meteo_data) - p.mkdir(exist_ok=True, parents=True) - for date in historical: - if not isfile(meteo_data / ('synop.'+date+'.csv')): - link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.' - link += date + '.csv.gz' - download_path = meteo_data / basename(link) - urlretrieve(link, download_path) - with gzip.open(download_path, 'rb') as f: - csv_file = meteo_data / basename(link[:-3]) - with open(csv_file, 'w') as g: - g.write(f.read().decode()) - remove(meteo_data / basename(link)) - - - def __from_date_to_datetz(self, date, a, b): - if not hasattr(self, '__meteo_station_tz'): - self.__meteo_station_tz = {} - tf = TimezoneFinder(in_memory=True) - with PostgreSQLDBConnection.Instance() as db: - db.cursor.execute('select "METST_IDNAME", "METST_LOCATION" from "METEO_STATION";') - list_of_rows = db.cursor.fetchall() - for k in list_of_rows: - print('\n',k) - longitude, latitude = eval(k[1]) - print(longitude, latitude) - print(type(longitude)) - timezone_name = tf.timezone_at(lng = longitude, lat = latitude) - if timezone_name is None: - timezone_name = tf.closest_timezone_at(lng = longitude, - lat = latitude, - delta_degree = 13, - exact_computation=True, - #return_distances=True, - force_evaluation=True) - cet = pytz.timezone(timezone_name) - dt = datetime.now() - offset = cet.utcoffset(dt, is_dst = True) - shift = int(offset / timedelta(hours=1)) - self.__meteo_station_tz[k[0]] = shift + For more information about this source of feature, see: + https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32 - print(self.__meteo_station_tz) - exit() - '''longitude, latitude = eval(row['Longitude']), eval(row['Latitude']) - point = (longitude, latitude) - timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude'])) - if timezone_name is None: - timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']), - lat = eval(row['Latitude']), - delta_degree = 5) - cet = pytz.timezone(timezone_name) - dt = datetime.now() - offset = cet.utcoffset(dt, is_dst = True) - shift = int(offset / timedelta(hours=1)) - - self.__meteo_station_tz''' - exit() - return date[:4]+'-'+date[4:6]+'-'+date[6:8]+' '+date[8:10]+':00:00+01' + Parameters: + latitude (float): The latitude from which we want the meteo features. + longitude (float): The longitude from which we want the meteo features. + nb_stations (int): Number of closest stations to consider. + ''' + self._latitude = latitude + self._longitude = longitude + self._nb_stations = nb_stations + + self._data_directory = (Path.cwd() / 'data') / 'meteo_france' - def __insert_historical_data(self): - csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv' - with PostgreSQLDBConnection.Instance() as db: - db.cursor.execute('SELECT * from "METEO_FEATURE";') - list_of_rows = db.cursor.fetchall() - dico = {u[1]:u[0] for u in list_of_rows} + # Re-creating data directory architecture for MeteoFrance, if asked + config = ConfigParser() + config.read((Path.cwd() / 'config') / 'features.cfg') + if eval(config['meteofrance']['regenerate']): + self._regenerate_directory() - with open(csv_file, "r") as f: - reader = DictReader(f, delimiter=',') - next(reader) - dico_features = {row["abbreviation"]:dico[row["METFT_NAME"]] for row in reader} + # Collecting the closest meteo station + self._stations = self._get_stations() + print(self._stations) - with PostgreSQLDBConnection.Instance() as db: - db.cursor.execute('SELECT * from "METEO_STATION";') - list_of_rows = db.cursor.fetchall() - dico_station = {u[2]:u[0] for u in list_of_rows} - for feature in dico_features: - logger.info(f'Integrating {[u for u in dico if dico[u]==dico_features[feature]][0]} feature') - for station in dico_station: - logger.info(f' - Dealing with meteo station n°: {station}') - csv_file = tempfile.NamedTemporaryFile('w') - dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical' - for csv_meteo in listdir(dir_data): - with open(dir_data / csv_meteo, "r") as f: - reader = DictReader(f, delimiter=';') - csv_file.write(''.join([row[feature]+",'"+self.__from_date_to_datetz(row["date"], station, dico_station[station])+"',"+str(dico_features[feature])+','+str(dico_station[station])+'\n' for row in reader if row['numer_sta'] == station])) - csv_file.flush() - with open(csv_file.name, 'r') as f: - with PostgreSQLDBConnection.Instance() as db: - db.cursor.copy_from(f, '"METEO_FEATURE_VALUE"', sep=',', null='mq', - columns=['"METVA_VALUE"','"METVA_DATETIME"','"METFT_ID_METEO_FEATURE"','"METST_ID_METEO_STATION"']) - + def _regenerate_directory(self): + ''' + Re-creating data directory architecture for MeteoFrance + ''' + logger.info("Regenerating meteofrance data directory") + try: + rmtree(self._data_directory) + except: + pass + p = Path(self._data_directory / 'historical') + p.mkdir(exist_ok=True, parents=True) + p = Path(self._data_directory / 'config') + p.mkdir(exist_ok=True, parents=True) - - - - def __generate(self): - # Meteo stations must be collected first, if not in the database - with PostgreSQLDBConnection.Instance() as db: - db.cursor.execute('SELECT count(*) FROM "METEO_STATION";') - updated_meteo_station = db.cursor.fetchone()[0] - if not updated_meteo_station: - self.__collect_stations() - # Features from data/meteo_france/config/meteo_features.csv - # must be inserted in the database, if not already done - with PostgreSQLDBConnection.Instance() as db: - db.cursor.execute('SELECT count(*) FROM "METEO_FEATURE";') - updated_meteo_features = db.cursor.fetchone()[0] - if not updated_meteo_features: - self.__insert_features() - - # Downloading meteofrance historical csv files - logger.info('Downloading historical csv files from MeteoFrance, if needed') - self.__collect_historical_data() - - self.__insert_historical_data() + def _get_stations(self): + ''' + Collect (after downloading them, if needed) the stations and their + locations in a dictionary - def update(self): + Returns: + list: The self._nb_stations closest station IDs, starting by the + closest one + ''' + # The csv file of meteo stations (names, ids and locations) if downloaded, + # if not available in the config directory within data / meteo_france + link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv' + p = Path(self._data_directory / 'config' ) + csv_file = p / basename(link) + if not isfile(csv_file): + logger.info('Downloading location stations from MeteoFrance') + urlretrieve(link, csv_file) + + # A dictionary for the meteo stations is created + dict_stations = {} + logger.info('Collecting information about meteo stations') + with open(csv_file, "r") as f: + reader = DictReader(f, delimiter=';') + for row in reader: + latitude, longitude = eval(row['Latitude']), eval(row['Longitude']) + dict_stations[row['Nom'].replace("'",'’')] = { + 'id' : row['ID'], + 'longitude' : longitude, + 'latitude' : latitude, + 'distance' : distance.vincenty( + (self._latitude, self._longitude), + (latitude, longitude)).km + } + + # Find the closest stations + logger.info('Finding the closest stations') + stations_by_distance = sorted(dict_stations.keys(), + key = lambda x: dict_stations[x]['distance']) + logger.info(f'The {self._nb_stations} closest stations are: ' + f'{", ".join(stations_by_distance[:self._nb_stations])}.') + return [dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations] + + + + def _get_feature(self): ''' - Update the MeteoFrance features with the last available data + TODO ''' - # We collect archive files from MeteoFrance, until the current month - # by using the same method than for data generation : this is currently - # based on the presence of a synop.+date+.csv' file in the - # data/meteo_france/historical directory. The file corresponding to the - # current month is deleted first, so that its most recent version will be downloaded - # by colling self.__collect_historical_data - # TODO: updates according to a PostgreSQL request ? - logger.info('Update historical csv files from MeteoFrance, if needed') - today = datetime.now() - todel = 'synop.'+today.strftime("%Y%m")+".csv" - remove(self.__data_directory / 'historical' / todel) - system("touch "+todel) - self.__collect_historical_data() - logger.info('Inserting csv files in database') - self.__insert_historical_data() - + pass diff --git a/main.py b/main.py index 3d3739f..6810673 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,5 @@ -from extomeAI.source import MeteoFrance +from lib.source import MeteoFrance -from celery import Celery from configparser import ConfigParser from logging.config import fileConfig from logging import getLogger @@ -13,14 +12,14 @@ fileConfig((Path.cwd() / 'config') / 'logging.cfg') logger = getLogger() -class ExtomeEngine: +class Engine: def __init__(self, clean = False): - logger.info("Extome-IA engine launched") + logger.info("Predictops engine launched") if clean: self.clean() print("Ne pas oublier d'exporter la BDD dans pgModeler") print("Ni de copier l'archive dans la data") - + def clean(self): # Cleaning the data directory logger.info("Cleaning and restoring data directory") @@ -29,30 +28,30 @@ class ExtomeEngine: rmtree(directory) p = Path(Path.cwd() / 'data') p.mkdir() - + # Cleaning the postgresql database config = ConfigParser() config.read((Path.cwd() / 'config') / 'main.cfg') - + host = config['postgresql']['host'] user = config['postgresql']['user'] port = config['postgresql']['port'] dbname = config['postgresql']['dbname'] - + logger.info("PostgreSQL database deletion") command = ['dropdb', '-h', host, '-U', user, '-p', port, dbname] process = Popen(command, stdout=PIPE, stderr=PIPE) process.communicate() - + logger.info("PostgreSQL database creation") command = ['createdb', '-h', host, '-U', user, '-p', port, dbname] process = Popen(command, stdout=PIPE, stderr=PIPE) - process.communicate() - + process.communicate() + def add_meteofrance(self): self.meteofrance = MeteoFrance() - - -engine = ExtomeEngine(clean = False) + + +engine = Engine(clean = False) engine.add_meteofrance() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 70af5d5..0cf261a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ -numpy==1.18.1 -scipy==1.4.1 -xgboost==0.90 +geographiclib==1.50 +geopy==1.21.0 -- 2.39.5