]> AND Private Git Repository - predictops.git/commitdiff
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Nouvelle approche : csv -> dico -> dataframe, au lieu de postgresql, en
authorChristophe Guyeux <christophe.guyeux@univ-fcomte.fr>
Mon, 10 Feb 2020 10:00:54 +0000 (11:00 +0100)
committerChristophe Guyeux <christophe.guyeux@univ-fcomte.fr>
Mon, 10 Feb 2020 10:00:54 +0000 (11:00 +0100)
cours.

README.md
config/features/parameters.csv [deleted file]
lib/source/__init__.py
lib/source/meteofrance.py
main.py
requirements.txt

index b4cbde94c5926841fae2d14f8e15b7ac1e6f7375..660b7ef9940ff03411bb3d4b826008583bdf9fd7 100644 (file)
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
 Creer un environnement :
 pip install virtualenv
 Creer un environnement :
 pip install virtualenv
-virtualenv predictops
+python -m venv ~/.venvs/predictops
 
 activer l'environnement :
 source ~/.venvs/predictops/bin/activate
 
 activer l'environnement :
 source ~/.venvs/predictops/bin/activate
@@ -9,9 +9,9 @@ installer un package :
 pip install celery
 pip freeze > requirements.txt
 
 pip install celery
 pip freeze > requirements.txt
 
-Lancer 
+Lancer
 celery -A test_celery worker --loglevel=info
 celery -A test_celery worker --loglevel=info
-puis 
+puis
 python -m test_celery.run_tasks
 
 
 python -m test_celery.run_tasks
 
 
diff --git a/config/features/parameters.csv b/config/features/parameters.csv
deleted file mode 100644 (file)
index 2d70417..0000000
+++ /dev/null
@@ -1,4 +0,0 @@
-PARAM_NAME
-Numerical
-Categorical
-Both
index d2c2a0f9910e5981106b55c3a539e2b30480dc6e..527538dc22068b2b132b65859781c166abdd8a3b 100644 (file)
@@ -1,25 +1 @@
-from .meteofrance import MeteoFrance
-
-from extomeAI.lib.connector import PostgreSQLDBConnection
-
-from csv import DictReader
-from logging.config import fileConfig
-from pathlib import Path
-
-import logging
-
-fileConfig((Path.cwd() / 'config') / 'logging.cfg')
-logger = logging.getLogger()
-
-with PostgreSQLDBConnection.Instance() as db:
-    db.cursor.execute('SELECT count(*) FROM "PARAMETER";')
-    nb_parameters = db.cursor.fetchone()[0]
-    if not nb_parameters:
-        logger.info('Inserting PARAMETER values from parameters.csv')            
-        csv_file = Path.cwd() / 'config' / 'features' / 'parameters.csv'
-        with open(csv_file, "r") as f:                
-            reader = DictReader(f, delimiter=',')
-            for row in reader:
-                request = f"""INSERT INTO "PARAMETER" ("PARAM_NAME")
-                              VALUES ('{row['PARAM_NAME']}');"""
-                db.cursor.execute(request)
+from .meteofrance import MeteoFrance
\ No newline at end of file
index 79c2546596e6a8e6f35a8e1ee6af40351d32116a..315aac3edef79e2178ad344ba52eea345f54d768 100644 (file)
-from extomeAI.lib.connector import PostgreSQLDBConnection
-
 from configparser import ConfigParser
 from csv import DictReader
 from configparser import ConfigParser
 from csv import DictReader
-from datetime import datetime, timedelta
-from os import remove, system, listdir
+from geopy import distance
 from pathlib import Path
 from shutil import rmtree
 from pathlib import Path
 from shutil import rmtree
-from timezonefinder import TimezoneFinder
 
 from logging.config import fileConfig
 from os.path import isfile, basename
 from urllib.request import urlretrieve
 
 import logging
 
 from logging.config import fileConfig
 from os.path import isfile, basename
 from urllib.request import urlretrieve
 
 import logging
-import gzip
-import pytz
-import tempfile
+
 
 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
 logger = logging.getLogger()
 
 class MeteoFrance:
 
 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
 logger = logging.getLogger()
 
 class MeteoFrance:
-    def __init__(self):
-        '''
-        Constructor
-        See: https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
-        '''    
-        self.__data_directory = (Path.cwd() / 'data') / 'meteo_france'
-        # Re-creating data directory architecture for MeteoFrance, if asked
-        config = ConfigParser()
-        config.read((Path.cwd() / 'config') / 'features.cfg')
-        if eval(config['meteofrance']['regenerate']):
-            logger.info("Regenerating meteofrance data directory")
-            try:
-                rmtree(self.__data_directory)
-            except:
-                pass
-            p = Path(self.__data_directory  / 'historical')
-            p.mkdir(exist_ok=True, parents=True)
-            p = Path(self.__data_directory  / 'config')
-            p.mkdir(exist_ok=True, parents=True)
-        if eval(config['meteofrance']['reinsert']):
-            logger.info("Reinserting meteofrance database")
-            with PostgreSQLDBConnection.Instance() as db:
-                db.cursor.execute(f'DELETE FROM "METEO_FEATURE";')
-                db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_METFT_ID_seq" RESTART WITH 1;')
-                db.cursor.execute(f'DELETE FROM "METEO_STATION";')
-                db.cursor.execute(f'ALTER SEQUENCE "METEO_STATION_METST_ID_seq" RESTART WITH 1;')
-                db.cursor.execute(f'DELETE FROM "METEO_FEATURE_VALUE";')
-                db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_VALUE_METVA_ID_seq" RESTART WITH 1;')
-            self.__generate()
-        
 
 
-    def __collect_stations(self):
-        '''
-        Filling METEO_STATION table from location schema
+    def __init__(self, latitude = 47.25, longitude = 6.0333, nb_stations = 3):
         '''
         '''
-        tf = TimezoneFinder(in_memory=True)
-        
-        with PostgreSQLDBConnection.Instance() as db:
-            link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
-            p = Path(self.__data_directory / 'config' )
-            csv_file = p / basename(link)
-            if not isfile(csv_file):
-                logger.info('Downloading location stations from MeteoFrance')                    
-                urlretrieve(link, csv_file)
-            with open(csv_file, "r") as f:                
-                reader = DictReader(f, delimiter=';')
-                logger.info(f'Inserting location stations in {db.dbname} database')                    
-                for row in reader:
-                    longitude, latitude = eval(row['Longitude']), eval(row['Latitude'])
-                    point = (longitude, latitude)
-                    timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude']))
-                    if timezone_name is None:
-                        timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']), 
-                                                               lat = eval(row['Latitude']),
-                                                               delta_degree = 5) 
-                    cet = pytz.timezone(timezone_name)
-                    dt = datetime.now()
-                    offset = cet.utcoffset(dt, is_dst = True)
-                    shift = int(offset / timedelta(hours=1))
-                    request = f"""INSERT INTO "METEO_STATION" ("METST_NAME", "METST_IDNAME", "METST_LOCATION", "METST_TIMEZONE") 
-                                  VALUES ('{row['Nom'].replace("'",'’')}', '{row['ID']}',
-                                  point({row['Latitude']}, {row['Longitude']}), {shift});"""
-                    db.cursor.execute(request)
+        Constructor of the MeteoFrance source of feature.
 
 
+        - It will reinitiate the data directory, if asked in the config
+          features.cfg file.
+        - It searches for the nb_stations meteo stations closest to the provided
+          point (longitude and latitude)
 
 
-    def __insert_features(self):
-        logger.info('Inserting MeteoFrance list of features from meteo_features.csv')            
-        csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv'
-        with PostgreSQLDBConnection.Instance() as db:
-            with open(csv_file, "r") as f:                
-                reader = DictReader(f, delimiter=',')
-                next(reader)
-                for row in reader:
-                    request = f"""INSERT INTO "METEO_FEATURE" ("METFT_NAME", "PARAM_ID_PARAMETER")
-                                  VALUES ('{row['METFT_NAME']}', {row['PARAM_ID_PARAMETER']});"""
-                    db.cursor.execute(request)
-        
-        
-            
-    def __collect_historical_data(self):
-        '''
-        We collect all csv files from January 1996 until the month
-        before now. The argument in the url to download are of the
-        form 201001 for January 2010. We start by computing all these
-        patterns, in historical list.
-        '''
-        # List of year-months to consider
-        historical = []
-        date_end = datetime.now()
-        for year in range(1996, date_end.year+1):
-            for month in range(1,13):
-                date = datetime(year, month, 1)
-                if date <= date_end:
-                    historical.append(date.strftime("%Y%m"))
-                    
-        # We download all csv files from meteofrance that are not in 
-        # the data repository
-        meteo_data = self.__data_directory / 'historical' 
-        p = Path(meteo_data)
-        p.mkdir(exist_ok=True, parents=True)                
-        for date in historical:
-            if not isfile(meteo_data / ('synop.'+date+'.csv')):
-                link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
-                link += date + '.csv.gz' 
-                download_path = meteo_data / basename(link)
-                urlretrieve(link, download_path)
-                with gzip.open(download_path, 'rb') as f:
-                    csv_file = meteo_data / basename(link[:-3])
-                    with open(csv_file, 'w') as g:
-                        g.write(f.read().decode())
-                        remove(meteo_data / basename(link))
-    
-    
-    def __from_date_to_datetz(self, date, a, b):          
-        if not hasattr(self, '__meteo_station_tz'):
-            self.__meteo_station_tz = {}
-            tf = TimezoneFinder(in_memory=True)
-            with PostgreSQLDBConnection.Instance() as db:
-                db.cursor.execute('select "METST_IDNAME", "METST_LOCATION" from "METEO_STATION";')
-                list_of_rows = db.cursor.fetchall()                
-            for k in list_of_rows:
-                print('\n',k)
-                longitude, latitude = eval(k[1])
-                print(longitude, latitude)
-                print(type(longitude))
-                timezone_name = tf.timezone_at(lng = longitude, lat = latitude)
-                if timezone_name is None:
-                    timezone_name = tf.closest_timezone_at(lng = longitude, 
-                                                           lat = latitude,
-                                                           delta_degree = 13,
-                                                           exact_computation=True, 
-                                                           #return_distances=True,
-                                                           force_evaluation=True) 
-                cet = pytz.timezone(timezone_name)
-                dt = datetime.now()
-                offset = cet.utcoffset(dt, is_dst = True)
-                shift = int(offset / timedelta(hours=1))
-                self.__meteo_station_tz[k[0]] = shift
+        For more information about this source of feature, see:
+    https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
 
 
-            print(self.__meteo_station_tz)
-            exit()
-            '''longitude, latitude = eval(row['Longitude']), eval(row['Latitude'])
-            point = (longitude, latitude)
-            timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude']))
-            if timezone_name is None:
-                timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']), 
-                                                       lat = eval(row['Latitude']),
-                                                       delta_degree = 5) 
-            cet = pytz.timezone(timezone_name)
-            dt = datetime.now()
-            offset = cet.utcoffset(dt, is_dst = True)
-            shift = int(offset / timedelta(hours=1))
-            
-            self.__meteo_station_tz'''
-        exit()
-        return date[:4]+'-'+date[4:6]+'-'+date[6:8]+' '+date[8:10]+':00:00+01'
+        Parameters:
+            latitude (float): The latitude from which we want the meteo features.
+            longitude (float): The longitude from which we want the meteo features.
+            nb_stations (int): Number of closest stations to consider.
 
 
+        '''
+        self._latitude = latitude
+        self._longitude = longitude
+        self._nb_stations = nb_stations
+
+        self._data_directory = (Path.cwd() / 'data') / 'meteo_france'
 
 
-    def __insert_historical_data(self):
-        csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv'
-        with PostgreSQLDBConnection.Instance() as db:
-            db.cursor.execute('SELECT * from "METEO_FEATURE";')
-            list_of_rows = db.cursor.fetchall()
-            dico = {u[1]:u[0] for u in list_of_rows}
+        # Re-creating data directory architecture for MeteoFrance, if asked
+        config = ConfigParser()
+        config.read((Path.cwd() / 'config') / 'features.cfg')
+        if eval(config['meteofrance']['regenerate']):
+            self._regenerate_directory()
 
 
-        with open(csv_file, "r") as f:                
-            reader = DictReader(f, delimiter=',')
-            next(reader)
-            dico_features = {row["abbreviation"]:dico[row["METFT_NAME"]] for row in reader}
+        # Collecting the closest meteo station
+        self._stations = self._get_stations()
+        print(self._stations)
 
 
-        with PostgreSQLDBConnection.Instance() as db:
-            db.cursor.execute('SELECT * from "METEO_STATION";')
-            list_of_rows = db.cursor.fetchall()
-            dico_station = {u[2]:u[0] for u in list_of_rows}
 
 
-        for feature in dico_features:
-            logger.info(f'Integrating {[u for u in dico if dico[u]==dico_features[feature]][0]} feature')
-            for station in dico_station:
-                logger.info(f'  - Dealing with meteo station n°: {station}')                            
-                csv_file = tempfile.NamedTemporaryFile('w')
-                dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical'
-                for csv_meteo in listdir(dir_data):
-                    with open(dir_data / csv_meteo, "r") as f:                
-                        reader = DictReader(f, delimiter=';')
-                        csv_file.write(''.join([row[feature]+",'"+self.__from_date_to_datetz(row["date"], station, dico_station[station])+"',"+str(dico_features[feature])+','+str(dico_station[station])+'\n' for row in reader if row['numer_sta'] == station]))
-                csv_file.flush()
-                with open(csv_file.name, 'r') as f:
-                    with PostgreSQLDBConnection.Instance() as db:
-                        db.cursor.copy_from(f, '"METEO_FEATURE_VALUE"', sep=',', null='mq',
-                                            columns=['"METVA_VALUE"','"METVA_DATETIME"','"METFT_ID_METEO_FEATURE"','"METST_ID_METEO_STATION"'])
 
 
-                        
+    def _regenerate_directory(self):
+        '''
+        Re-creating data directory architecture for MeteoFrance
+        '''
+        logger.info("Regenerating meteofrance data directory")
+        try:
+            rmtree(self._data_directory)
+        except:
+            pass
+        p = Path(self._data_directory / 'historical')
+        p.mkdir(exist_ok=True, parents=True)
+        p = Path(self._data_directory / 'config')
+        p.mkdir(exist_ok=True, parents=True)
 
 
 
 
-        
-            
-    
-    def __generate(self):
-        # Meteo stations must be collected first, if not in the database
-        with PostgreSQLDBConnection.Instance() as db:
-            db.cursor.execute('SELECT count(*) FROM "METEO_STATION";')
-            updated_meteo_station = db.cursor.fetchone()[0]
-        if not updated_meteo_station:
-            self.__collect_stations()
 
 
-        # Features from data/meteo_france/config/meteo_features.csv
-        # must be inserted in the database, if not already done
-        with PostgreSQLDBConnection.Instance() as db:
-            db.cursor.execute('SELECT count(*) FROM "METEO_FEATURE";')
-            updated_meteo_features = db.cursor.fetchone()[0]
-        if not updated_meteo_features:
-            self.__insert_features()
-        
-        # Downloading meteofrance historical csv files
-        logger.info('Downloading historical csv files from MeteoFrance, if needed')            
-        self.__collect_historical_data()
-        
-        self.__insert_historical_data()
+    def _get_stations(self):
+        '''
+        Collect (after downloading them, if needed) the stations and their
+        locations in a dictionary
 
 
-    def update(self):
+        Returns:
+            list: The self._nb_stations closest station IDs, starting by the
+                  closest one
+        '''
+        # The csv file of meteo stations (names, ids and locations) if downloaded,
+        # if not available in the config directory within data / meteo_france
+        link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
+        p = Path(self._data_directory / 'config' )
+        csv_file = p / basename(link)
+        if not isfile(csv_file):
+            logger.info('Downloading location stations from MeteoFrance')
+            urlretrieve(link, csv_file)
+
+        # A dictionary for the meteo stations is created
+        dict_stations = {}
+        logger.info('Collecting information about meteo stations')
+        with open(csv_file, "r") as f:
+            reader = DictReader(f, delimiter=';')
+            for row in reader:
+                latitude, longitude = eval(row['Latitude']), eval(row['Longitude'])
+                dict_stations[row['Nom'].replace("'",'’')] = {
+                    'id' : row['ID'],
+                    'longitude' : longitude,
+                    'latitude' : latitude,
+                    'distance' : distance.vincenty(
+                        (self._latitude, self._longitude),
+                        (latitude, longitude)).km
+                }
+
+        # Find the closest stations
+        logger.info('Finding the closest stations')
+        stations_by_distance = sorted(dict_stations.keys(),
+                                      key = lambda x: dict_stations[x]['distance'])
+        logger.info(f'The {self._nb_stations} closest stations are: '
+                    f'{", ".join(stations_by_distance[:self._nb_stations])}.')
+        return [dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
+
+
+
+    def _get_feature(self):
         '''
         '''
-        Update the MeteoFrance features with the last available data
+        TODO
         '''
         '''
-        # We collect archive files from MeteoFrance, until the current month
-        # by using the same method than for data generation : this is currently
-        # based on the presence of a synop.+date+.csv' file in the 
-        # data/meteo_france/historical directory. The file corresponding to the 
-        # current month is deleted first, so that its most recent version will be downloaded
-        # by colling self.__collect_historical_data
-        # TODO: updates according to a PostgreSQL request ?
-        logger.info('Update historical csv files from MeteoFrance, if needed')
-        today = datetime.now()
-        todel = 'synop.'+today.strftime("%Y%m")+".csv"
-        remove(self.__data_directory / 'historical' / todel)
-        system("touch "+todel)
-        self.__collect_historical_data()
-        logger.info('Inserting csv files in database')
-        self.__insert_historical_data()
-        
+        pass
diff --git a/main.py b/main.py
index 3d3739fa46c6016c995f341d6140aec3bb58af49..6810673765688db13b0a2b8ec6e3cef5ff53bb1f 100644 (file)
--- a/main.py
+++ b/main.py
@@ -1,6 +1,5 @@
-from extomeAI.source import MeteoFrance
+from lib.source import MeteoFrance
 
 
-from celery import Celery
 from configparser import ConfigParser
 from logging.config import fileConfig
 from logging import getLogger
 from configparser import ConfigParser
 from logging.config import fileConfig
 from logging import getLogger
@@ -13,14 +12,14 @@ fileConfig((Path.cwd() / 'config') / 'logging.cfg')
 logger = getLogger()
 
 
 logger = getLogger()
 
 
-class ExtomeEngine:
+class Engine:
     def __init__(self, clean = False):
     def __init__(self, clean = False):
-        logger.info("Extome-IA engine launched")
+        logger.info("Predictops engine launched")
         if clean:
             self.clean()
             print("Ne pas oublier d'exporter la BDD dans pgModeler")
             print("Ni de copier l'archive dans la data")
         if clean:
             self.clean()
             print("Ne pas oublier d'exporter la BDD dans pgModeler")
             print("Ni de copier l'archive dans la data")
-    
+
     def clean(self):
         # Cleaning the data directory
         logger.info("Cleaning and restoring data directory")
     def clean(self):
         # Cleaning the data directory
         logger.info("Cleaning and restoring data directory")
@@ -29,30 +28,30 @@ class ExtomeEngine:
             rmtree(directory)
         p = Path(Path.cwd() / 'data')
         p.mkdir()
             rmtree(directory)
         p = Path(Path.cwd() / 'data')
         p.mkdir()
-    
+
         # Cleaning the postgresql database
         config = ConfigParser()
         config.read((Path.cwd() / 'config') / 'main.cfg')
         # Cleaning the postgresql database
         config = ConfigParser()
         config.read((Path.cwd() / 'config') / 'main.cfg')
-        
+
         host   = config['postgresql']['host']
         user   = config['postgresql']['user']
         port   = config['postgresql']['port']
         dbname = config['postgresql']['dbname']
         host   = config['postgresql']['host']
         user   = config['postgresql']['user']
         port   = config['postgresql']['port']
         dbname = config['postgresql']['dbname']
-        
+
         logger.info("PostgreSQL database deletion")
         command = ['dropdb', '-h', host, '-U', user, '-p', port, dbname]
         process = Popen(command, stdout=PIPE, stderr=PIPE)
         process.communicate()
         logger.info("PostgreSQL database deletion")
         command = ['dropdb', '-h', host, '-U', user, '-p', port, dbname]
         process = Popen(command, stdout=PIPE, stderr=PIPE)
         process.communicate()
-        
+
         logger.info("PostgreSQL database creation")
         command = ['createdb', '-h', host, '-U', user, '-p', port, dbname]
         process = Popen(command, stdout=PIPE, stderr=PIPE)
         logger.info("PostgreSQL database creation")
         command = ['createdb', '-h', host, '-U', user, '-p', port, dbname]
         process = Popen(command, stdout=PIPE, stderr=PIPE)
-        process.communicate() 
-    
+        process.communicate()
+
     def add_meteofrance(self):
         self.meteofrance = MeteoFrance()
     def add_meteofrance(self):
         self.meteofrance = MeteoFrance()
-        
-        
 
 
-engine = ExtomeEngine(clean = False)
+
+
+engine = Engine(clean = False)
 engine.add_meteofrance()
\ No newline at end of file
 engine.add_meteofrance()
\ No newline at end of file
index 70af5d511e7d7f5ecd05b3333f0a2093ed030895..0cf261a94bfed271531a07f4352b5a382a75de63 100644 (file)
@@ -1,3 +1,2 @@
-numpy==1.18.1
-scipy==1.4.1
-xgboost==0.90
+geographiclib==1.50
+geopy==1.21.0