From: Christophe Guyeux Date: Mon, 10 Feb 2020 07:41:15 +0000 (+0100) Subject: On importe les travaux précédents. X-Git-Url: https://bilbo.iut-bm.univ-fcomte.fr/and/gitweb/predictops.git/commitdiff_plain/844b558b71ac568d904e8845ce203d31b68c776d?ds=inline;hp=27d766f401d49503c949ee75bb14a1109732ecac On importe les travaux précédents. --- diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1e0f3bb --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ + +**.py[cod] +**$py.class +data/ +archives/ + +bonnes_pratiques.txt +celerybeat* +.~lock* diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..5b1fafa --- /dev/null +++ b/Makefile @@ -0,0 +1,13 @@ +run: + #@python -m test_celery.run_tasks + @python main.py + +clean-db: + @python extomeAI/lib/cleaner.py db + +clean-data: + @python extomeAI/lib/cleaner.py data + +clean-all: + @python extomeAI/lib/cleaner.py all + diff --git a/config/features.cfg b/config/features.cfg new file mode 100644 index 0000000..927fb92 --- /dev/null +++ b/config/features.cfg @@ -0,0 +1,3 @@ +[meteofrance] +regenerate = False +reinsert = True diff --git a/config/features/meteofrance/meteofrance_features.csv b/config/features/meteofrance/meteofrance_features.csv new file mode 100644 index 0000000..e8d60f7 --- /dev/null +++ b/config/features/meteofrance/meteofrance_features.csv @@ -0,0 +1,14 @@ +abbreviation,METFT_NAME,unit,type,PARAM_ID_PARAMETER +t,temperature,K,real,1 +pres,pressure,Pa,integer,1 +tend,pressureVariation,Pa,integer,1 +cod_tend,BarometricTrend,code,integer,2 +u,humidity,%,integer,1 +td,dewPoint,K,real,1 +rr1,lastHourRainfall,mm,real,1 +rr3,last3hHourRainfall,mm,real,1 +ff,meanWindSpeed10min,m/s,real,1 +dd,meanWindDirection10min,degré,integer,1 +rafper,gustsOverAPeriod,m/s,real,1 +vv,horizontalVisibility,m,real,1 +ww,currentWeather,code,integer,2 diff --git a/config/features/parameters.csv b/config/features/parameters.csv new file mode 100644 index 0000000..2d70417 --- /dev/null +++ b/config/features/parameters.csv @@ -0,0 +1,4 @@ +PARAM_NAME +Numerical +Categorical +Both diff --git a/config/logging.cfg b/config/logging.cfg new file mode 100644 index 0000000..cecd031 --- /dev/null +++ b/config/logging.cfg @@ -0,0 +1,21 @@ +[loggers] +keys=root + +[handlers] +keys=stream_handler + +[formatters] +keys=formatter + +[logger_root] +level=DEBUG +handlers=stream_handler + +[handler_stream_handler] +class=StreamHandler +level=DEBUG +formatter=formatter +args=(sys.stderr,) + +[formatter_formatter] +format=%(asctime)s %(name)-12s %(levelname)-8s %(message)s \ No newline at end of file diff --git a/config/main.cfg b/config/main.cfg new file mode 100644 index 0000000..942ef96 --- /dev/null +++ b/config/main.cfg @@ -0,0 +1,5 @@ +[postgresql] +host = localhost +user = christophe +port = 5432 +dbname = extome diff --git a/doc/predictops.model b/doc/predictops.model new file mode 100644 index 0000000..0f26b6e --- /dev/null +++ b/doc/predictops.model @@ -0,0 +1,121 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + +
+ + + + + + + + + +
diff --git a/lib/__init__.py b/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/source/__init__.py b/lib/source/__init__.py new file mode 100644 index 0000000..d2c2a0f --- /dev/null +++ b/lib/source/__init__.py @@ -0,0 +1,25 @@ +from .meteofrance import MeteoFrance + +from extomeAI.lib.connector import PostgreSQLDBConnection + +from csv import DictReader +from logging.config import fileConfig +from pathlib import Path + +import logging + +fileConfig((Path.cwd() / 'config') / 'logging.cfg') +logger = logging.getLogger() + +with PostgreSQLDBConnection.Instance() as db: + db.cursor.execute('SELECT count(*) FROM "PARAMETER";') + nb_parameters = db.cursor.fetchone()[0] + if not nb_parameters: + logger.info('Inserting PARAMETER values from parameters.csv') + csv_file = Path.cwd() / 'config' / 'features' / 'parameters.csv' + with open(csv_file, "r") as f: + reader = DictReader(f, delimiter=',') + for row in reader: + request = f"""INSERT INTO "PARAMETER" ("PARAM_NAME") + VALUES ('{row['PARAM_NAME']}');""" + db.cursor.execute(request) diff --git a/lib/source/meteofrance.py b/lib/source/meteofrance.py new file mode 100644 index 0000000..79c2546 --- /dev/null +++ b/lib/source/meteofrance.py @@ -0,0 +1,262 @@ +from extomeAI.lib.connector import PostgreSQLDBConnection + +from configparser import ConfigParser +from csv import DictReader +from datetime import datetime, timedelta +from os import remove, system, listdir +from pathlib import Path +from shutil import rmtree +from timezonefinder import TimezoneFinder + +from logging.config import fileConfig +from os.path import isfile, basename +from urllib.request import urlretrieve + +import logging +import gzip +import pytz +import tempfile + +fileConfig((Path.cwd() / 'config') / 'logging.cfg') +logger = logging.getLogger() + +class MeteoFrance: + def __init__(self): + ''' + Constructor + See: https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32 + ''' + self.__data_directory = (Path.cwd() / 'data') / 'meteo_france' + # Re-creating data directory architecture for MeteoFrance, if asked + config = ConfigParser() + config.read((Path.cwd() / 'config') / 'features.cfg') + if eval(config['meteofrance']['regenerate']): + logger.info("Regenerating meteofrance data directory") + try: + rmtree(self.__data_directory) + except: + pass + p = Path(self.__data_directory / 'historical') + p.mkdir(exist_ok=True, parents=True) + p = Path(self.__data_directory / 'config') + p.mkdir(exist_ok=True, parents=True) + if eval(config['meteofrance']['reinsert']): + logger.info("Reinserting meteofrance database") + with PostgreSQLDBConnection.Instance() as db: + db.cursor.execute(f'DELETE FROM "METEO_FEATURE";') + db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_METFT_ID_seq" RESTART WITH 1;') + db.cursor.execute(f'DELETE FROM "METEO_STATION";') + db.cursor.execute(f'ALTER SEQUENCE "METEO_STATION_METST_ID_seq" RESTART WITH 1;') + db.cursor.execute(f'DELETE FROM "METEO_FEATURE_VALUE";') + db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_VALUE_METVA_ID_seq" RESTART WITH 1;') + self.__generate() + + + def __collect_stations(self): + ''' + Filling METEO_STATION table from location schema + ''' + tf = TimezoneFinder(in_memory=True) + + with PostgreSQLDBConnection.Instance() as db: + link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv' + p = Path(self.__data_directory / 'config' ) + csv_file = p / basename(link) + if not isfile(csv_file): + logger.info('Downloading location stations from MeteoFrance') + urlretrieve(link, csv_file) + with open(csv_file, "r") as f: + reader = DictReader(f, delimiter=';') + logger.info(f'Inserting location stations in {db.dbname} database') + for row in reader: + longitude, latitude = eval(row['Longitude']), eval(row['Latitude']) + point = (longitude, latitude) + timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude'])) + if timezone_name is None: + timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']), + lat = eval(row['Latitude']), + delta_degree = 5) + cet = pytz.timezone(timezone_name) + dt = datetime.now() + offset = cet.utcoffset(dt, is_dst = True) + shift = int(offset / timedelta(hours=1)) + request = f"""INSERT INTO "METEO_STATION" ("METST_NAME", "METST_IDNAME", "METST_LOCATION", "METST_TIMEZONE") + VALUES ('{row['Nom'].replace("'",'’')}', '{row['ID']}', + point({row['Latitude']}, {row['Longitude']}), {shift});""" + db.cursor.execute(request) + + + def __insert_features(self): + logger.info('Inserting MeteoFrance list of features from meteo_features.csv') + csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv' + with PostgreSQLDBConnection.Instance() as db: + with open(csv_file, "r") as f: + reader = DictReader(f, delimiter=',') + next(reader) + for row in reader: + request = f"""INSERT INTO "METEO_FEATURE" ("METFT_NAME", "PARAM_ID_PARAMETER") + VALUES ('{row['METFT_NAME']}', {row['PARAM_ID_PARAMETER']});""" + db.cursor.execute(request) + + + + def __collect_historical_data(self): + ''' + We collect all csv files from January 1996 until the month + before now. The argument in the url to download are of the + form 201001 for January 2010. We start by computing all these + patterns, in historical list. + ''' + # List of year-months to consider + historical = [] + date_end = datetime.now() + for year in range(1996, date_end.year+1): + for month in range(1,13): + date = datetime(year, month, 1) + if date <= date_end: + historical.append(date.strftime("%Y%m")) + + # We download all csv files from meteofrance that are not in + # the data repository + meteo_data = self.__data_directory / 'historical' + p = Path(meteo_data) + p.mkdir(exist_ok=True, parents=True) + for date in historical: + if not isfile(meteo_data / ('synop.'+date+'.csv')): + link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.' + link += date + '.csv.gz' + download_path = meteo_data / basename(link) + urlretrieve(link, download_path) + with gzip.open(download_path, 'rb') as f: + csv_file = meteo_data / basename(link[:-3]) + with open(csv_file, 'w') as g: + g.write(f.read().decode()) + remove(meteo_data / basename(link)) + + + def __from_date_to_datetz(self, date, a, b): + if not hasattr(self, '__meteo_station_tz'): + self.__meteo_station_tz = {} + tf = TimezoneFinder(in_memory=True) + with PostgreSQLDBConnection.Instance() as db: + db.cursor.execute('select "METST_IDNAME", "METST_LOCATION" from "METEO_STATION";') + list_of_rows = db.cursor.fetchall() + for k in list_of_rows: + print('\n',k) + longitude, latitude = eval(k[1]) + print(longitude, latitude) + print(type(longitude)) + timezone_name = tf.timezone_at(lng = longitude, lat = latitude) + if timezone_name is None: + timezone_name = tf.closest_timezone_at(lng = longitude, + lat = latitude, + delta_degree = 13, + exact_computation=True, + #return_distances=True, + force_evaluation=True) + cet = pytz.timezone(timezone_name) + dt = datetime.now() + offset = cet.utcoffset(dt, is_dst = True) + shift = int(offset / timedelta(hours=1)) + self.__meteo_station_tz[k[0]] = shift + + print(self.__meteo_station_tz) + exit() + '''longitude, latitude = eval(row['Longitude']), eval(row['Latitude']) + point = (longitude, latitude) + timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude'])) + if timezone_name is None: + timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']), + lat = eval(row['Latitude']), + delta_degree = 5) + cet = pytz.timezone(timezone_name) + dt = datetime.now() + offset = cet.utcoffset(dt, is_dst = True) + shift = int(offset / timedelta(hours=1)) + + self.__meteo_station_tz''' + exit() + return date[:4]+'-'+date[4:6]+'-'+date[6:8]+' '+date[8:10]+':00:00+01' + + + def __insert_historical_data(self): + csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv' + with PostgreSQLDBConnection.Instance() as db: + db.cursor.execute('SELECT * from "METEO_FEATURE";') + list_of_rows = db.cursor.fetchall() + dico = {u[1]:u[0] for u in list_of_rows} + + with open(csv_file, "r") as f: + reader = DictReader(f, delimiter=',') + next(reader) + dico_features = {row["abbreviation"]:dico[row["METFT_NAME"]] for row in reader} + + with PostgreSQLDBConnection.Instance() as db: + db.cursor.execute('SELECT * from "METEO_STATION";') + list_of_rows = db.cursor.fetchall() + dico_station = {u[2]:u[0] for u in list_of_rows} + + for feature in dico_features: + logger.info(f'Integrating {[u for u in dico if dico[u]==dico_features[feature]][0]} feature') + for station in dico_station: + logger.info(f' - Dealing with meteo station n°: {station}') + csv_file = tempfile.NamedTemporaryFile('w') + dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical' + for csv_meteo in listdir(dir_data): + with open(dir_data / csv_meteo, "r") as f: + reader = DictReader(f, delimiter=';') + csv_file.write(''.join([row[feature]+",'"+self.__from_date_to_datetz(row["date"], station, dico_station[station])+"',"+str(dico_features[feature])+','+str(dico_station[station])+'\n' for row in reader if row['numer_sta'] == station])) + csv_file.flush() + with open(csv_file.name, 'r') as f: + with PostgreSQLDBConnection.Instance() as db: + db.cursor.copy_from(f, '"METEO_FEATURE_VALUE"', sep=',', null='mq', + columns=['"METVA_VALUE"','"METVA_DATETIME"','"METFT_ID_METEO_FEATURE"','"METST_ID_METEO_STATION"']) + + + + + + + + def __generate(self): + # Meteo stations must be collected first, if not in the database + with PostgreSQLDBConnection.Instance() as db: + db.cursor.execute('SELECT count(*) FROM "METEO_STATION";') + updated_meteo_station = db.cursor.fetchone()[0] + if not updated_meteo_station: + self.__collect_stations() + + # Features from data/meteo_france/config/meteo_features.csv + # must be inserted in the database, if not already done + with PostgreSQLDBConnection.Instance() as db: + db.cursor.execute('SELECT count(*) FROM "METEO_FEATURE";') + updated_meteo_features = db.cursor.fetchone()[0] + if not updated_meteo_features: + self.__insert_features() + + # Downloading meteofrance historical csv files + logger.info('Downloading historical csv files from MeteoFrance, if needed') + self.__collect_historical_data() + + self.__insert_historical_data() + + def update(self): + ''' + Update the MeteoFrance features with the last available data + ''' + # We collect archive files from MeteoFrance, until the current month + # by using the same method than for data generation : this is currently + # based on the presence of a synop.+date+.csv' file in the + # data/meteo_france/historical directory. The file corresponding to the + # current month is deleted first, so that its most recent version will be downloaded + # by colling self.__collect_historical_data + # TODO: updates according to a PostgreSQL request ? + logger.info('Update historical csv files from MeteoFrance, if needed') + today = datetime.now() + todel = 'synop.'+today.strftime("%Y%m")+".csv" + remove(self.__data_directory / 'historical' / todel) + system("touch "+todel) + self.__collect_historical_data() + logger.info('Inserting csv files in database') + self.__insert_historical_data() + diff --git a/lib/tools/cleaner.py b/lib/tools/cleaner.py new file mode 100644 index 0000000..1ee1ba4 --- /dev/null +++ b/lib/tools/cleaner.py @@ -0,0 +1,41 @@ +from pathlib import Path +from shutil import rmtree +from configparser import ConfigParser +from os import remove +from subprocess import Popen, PIPE +from sys import argv +import logging +from logging.config import fileConfig + +fileConfig((Path.cwd() / 'config') / 'logging.cfg') +logger = logging.getLogger() + +argument = argv[-1] + +if argument in ['data', 'all']: + logger.info("Cleaning and restoring data directory") + directory = Path.cwd() / 'data' + if directory.is_dir(): + rmtree(directory) + p = Path(Path.cwd() / 'data') + p.mkdir() + +# Cleaning the postgresql database +if argument in ['db', 'all']: + config = ConfigParser() + config.read((Path.cwd() / 'config') / 'main.cfg') + + host = config['postgresql']['host'] + user = config['postgresql']['user'] + port = config['postgresql']['port'] + dbname = config['postgresql']['dbname'] + + logger.info("PostgreSQL database deletion") + command = ['dropdb', '-h', host, '-U', user, '-p', port, dbname] + process = Popen(command, stdout=PIPE, stderr=PIPE) + stdout, stderr = process.communicate() + + logger.info("PostgreSQL database creation") + command = ['createdb', '-h', host, '-U', user, '-p', port, dbname] + process = Popen(command, stdout=PIPE, stderr=PIPE) + stdout, stderr = process.communicate() diff --git a/lib/tools/connector.py b/lib/tools/connector.py new file mode 100644 index 0000000..a0cc0d5 --- /dev/null +++ b/lib/tools/connector.py @@ -0,0 +1,62 @@ +from pathlib import Path +import psycopg2 +import configparser + +class Singleton: + + def __init__(self, cls): + self._cls = cls + + def Instance(self): + try: + return self._instance + except AttributeError: + self._instance = self._cls() + return self._instance + + def __call__(self): + raise TypeError('Singletons must be accessed through `Instance()`.') + + def __instancecheck__(self, inst): + return isinstance(inst, self._cls) + +@Singleton +class PostgreSQLDBConnection(object): + """Postgresql database connection""" + + def __init__(self, connection_string = ''): + if connection_string == '': + # We're retrieving information related to the database in config.ini + config = configparser.ConfigParser() + config.read((Path.cwd() / 'config') / 'main.cfg') + + host = config['postgresql']['host'] + user = config['postgresql']['user'] + port = config['postgresql']['port'] + self.dbname = config['postgresql']['dbname'] + + self.connection_string = f"host={host} port={port} dbname={self.dbname} user={user}" + + else: + self.connection_string = connection_string + self.dbname = '' + + + def __enter__(self): + self.connection = psycopg2.connect(self.connection_string) + self.connection.autocommit = True + self.cursor = self.connection.cursor() + return self + + @property + def name(self): + return self.dbname + + def __str__(self): + return 'Database connection object' + + def __exit__(self, exc_type, exc_val, exc_tb): + #self.connection.commit() + self.cursor.close() + self.connection.close() + diff --git a/main.py b/main.py new file mode 100644 index 0000000..3d3739f --- /dev/null +++ b/main.py @@ -0,0 +1,58 @@ +from extomeAI.source import MeteoFrance + +from celery import Celery +from configparser import ConfigParser +from logging.config import fileConfig +from logging import getLogger +from pathlib import Path +from shutil import rmtree +from subprocess import Popen, PIPE + + +fileConfig((Path.cwd() / 'config') / 'logging.cfg') +logger = getLogger() + + +class ExtomeEngine: + def __init__(self, clean = False): + logger.info("Extome-IA engine launched") + if clean: + self.clean() + print("Ne pas oublier d'exporter la BDD dans pgModeler") + print("Ni de copier l'archive dans la data") + + def clean(self): + # Cleaning the data directory + logger.info("Cleaning and restoring data directory") + directory = Path.cwd() / 'data' + if directory.is_dir(): + rmtree(directory) + p = Path(Path.cwd() / 'data') + p.mkdir() + + # Cleaning the postgresql database + config = ConfigParser() + config.read((Path.cwd() / 'config') / 'main.cfg') + + host = config['postgresql']['host'] + user = config['postgresql']['user'] + port = config['postgresql']['port'] + dbname = config['postgresql']['dbname'] + + logger.info("PostgreSQL database deletion") + command = ['dropdb', '-h', host, '-U', user, '-p', port, dbname] + process = Popen(command, stdout=PIPE, stderr=PIPE) + process.communicate() + + logger.info("PostgreSQL database creation") + command = ['createdb', '-h', host, '-U', user, '-p', port, dbname] + process = Popen(command, stdout=PIPE, stderr=PIPE) + process.communicate() + + def add_meteofrance(self): + self.meteofrance = MeteoFrance() + + + +engine = ExtomeEngine(clean = False) +engine.add_meteofrance() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..70af5d5 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +numpy==1.18.1 +scipy==1.4.1 +xgboost==0.90 diff --git a/test_celery/__init__.py b/test_celery/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test_celery/celery.py b/test_celery/celery.py new file mode 100644 index 0000000..1b8344b --- /dev/null +++ b/test_celery/celery.py @@ -0,0 +1,7 @@ +from __future__ import absolute_import +from celery import Celery + +app = Celery('test_celery', + backend='amqp', + broker='amqp://guest@localhost//', + include=['test_celery.tasks']) diff --git a/test_celery/run_tasks.py b/test_celery/run_tasks.py new file mode 100644 index 0000000..0017032 --- /dev/null +++ b/test_celery/run_tasks.py @@ -0,0 +1,13 @@ +from .tasks import longtime_add +import time + +if __name__ == '__main__': + result = longtime_add.delay(1,2) + # at this time, our task is not finished, so it will return False + print ('Task finished? ', result.ready()) + print ('Task result: ', result.result) + # sleep 10 seconds to ensure the task has been finished + time.sleep(10) + # now the task should be finished and ready method will return True + print ('Task finished? ', result.ready()) + print ('Task result: ', result.result) \ No newline at end of file diff --git a/test_celery/tasks.py b/test_celery/tasks.py new file mode 100644 index 0000000..a8382e3 --- /dev/null +++ b/test_celery/tasks.py @@ -0,0 +1,12 @@ +from __future__ import absolute_import +from test_celery.celery import app +import time + + +@app.task +def longtime_add(x, y): + print ('long time task begins') + # sleep 5 seconds + time.sleep(5) + print ('long time task finished') + return x + y \ No newline at end of file