]> AND Private Git Repository - predictops.git/commitdiff
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Nouvelle approche : csv -> dico -> dataframe, au lieu de postgresql, en
authorChristophe Guyeux <christophe.guyeux@univ-fcomte.fr>
Mon, 10 Feb 2020 10:00:54 +0000 (11:00 +0100)
committerChristophe Guyeux <christophe.guyeux@univ-fcomte.fr>
Mon, 10 Feb 2020 10:00:54 +0000 (11:00 +0100)
cours.

README.md
config/features/parameters.csv [deleted file]
lib/source/__init__.py
lib/source/meteofrance.py
main.py
requirements.txt

index b4cbde94c5926841fae2d14f8e15b7ac1e6f7375..660b7ef9940ff03411bb3d4b826008583bdf9fd7 100644 (file)
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
 Creer un environnement :
 pip install virtualenv
-virtualenv predictops
+python -m venv ~/.venvs/predictops
 
 activer l'environnement :
 source ~/.venvs/predictops/bin/activate
@@ -9,9 +9,9 @@ installer un package :
 pip install celery
 pip freeze > requirements.txt
 
-Lancer 
+Lancer
 celery -A test_celery worker --loglevel=info
-puis 
+puis
 python -m test_celery.run_tasks
 
 
diff --git a/config/features/parameters.csv b/config/features/parameters.csv
deleted file mode 100644 (file)
index 2d70417..0000000
+++ /dev/null
@@ -1,4 +0,0 @@
-PARAM_NAME
-Numerical
-Categorical
-Both
index d2c2a0f9910e5981106b55c3a539e2b30480dc6e..527538dc22068b2b132b65859781c166abdd8a3b 100644 (file)
@@ -1,25 +1 @@
-from .meteofrance import MeteoFrance
-
-from extomeAI.lib.connector import PostgreSQLDBConnection
-
-from csv import DictReader
-from logging.config import fileConfig
-from pathlib import Path
-
-import logging
-
-fileConfig((Path.cwd() / 'config') / 'logging.cfg')
-logger = logging.getLogger()
-
-with PostgreSQLDBConnection.Instance() as db:
-    db.cursor.execute('SELECT count(*) FROM "PARAMETER";')
-    nb_parameters = db.cursor.fetchone()[0]
-    if not nb_parameters:
-        logger.info('Inserting PARAMETER values from parameters.csv')            
-        csv_file = Path.cwd() / 'config' / 'features' / 'parameters.csv'
-        with open(csv_file, "r") as f:                
-            reader = DictReader(f, delimiter=',')
-            for row in reader:
-                request = f"""INSERT INTO "PARAMETER" ("PARAM_NAME")
-                              VALUES ('{row['PARAM_NAME']}');"""
-                db.cursor.execute(request)
+from .meteofrance import MeteoFrance
\ No newline at end of file
index 79c2546596e6a8e6f35a8e1ee6af40351d32116a..315aac3edef79e2178ad344ba52eea345f54d768 100644 (file)
-from extomeAI.lib.connector import PostgreSQLDBConnection
-
 from configparser import ConfigParser
 from csv import DictReader
-from datetime import datetime, timedelta
-from os import remove, system, listdir
+from geopy import distance
 from pathlib import Path
 from shutil import rmtree
-from timezonefinder import TimezoneFinder
 
 from logging.config import fileConfig
 from os.path import isfile, basename
 from urllib.request import urlretrieve
 
 import logging
-import gzip
-import pytz
-import tempfile
+
 
 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
 logger = logging.getLogger()
 
 class MeteoFrance:
-    def __init__(self):
-        '''
-        Constructor
-        See: https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
-        '''    
-        self.__data_directory = (Path.cwd() / 'data') / 'meteo_france'
-        # Re-creating data directory architecture for MeteoFrance, if asked
-        config = ConfigParser()
-        config.read((Path.cwd() / 'config') / 'features.cfg')
-        if eval(config['meteofrance']['regenerate']):
-            logger.info("Regenerating meteofrance data directory")
-            try:
-                rmtree(self.__data_directory)
-            except:
-                pass
-            p = Path(self.__data_directory  / 'historical')
-            p.mkdir(exist_ok=True, parents=True)
-            p = Path(self.__data_directory  / 'config')
-            p.mkdir(exist_ok=True, parents=True)
-        if eval(config['meteofrance']['reinsert']):
-            logger.info("Reinserting meteofrance database")
-            with PostgreSQLDBConnection.Instance() as db:
-                db.cursor.execute(f'DELETE FROM "METEO_FEATURE";')
-                db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_METFT_ID_seq" RESTART WITH 1;')
-                db.cursor.execute(f'DELETE FROM "METEO_STATION";')
-                db.cursor.execute(f'ALTER SEQUENCE "METEO_STATION_METST_ID_seq" RESTART WITH 1;')
-                db.cursor.execute(f'DELETE FROM "METEO_FEATURE_VALUE";')
-                db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_VALUE_METVA_ID_seq" RESTART WITH 1;')
-            self.__generate()
-        
 
-    def __collect_stations(self):
-        '''
-        Filling METEO_STATION table from location schema
+    def __init__(self, latitude = 47.25, longitude = 6.0333, nb_stations = 3):
         '''
-        tf = TimezoneFinder(in_memory=True)
-        
-        with PostgreSQLDBConnection.Instance() as db:
-            link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
-            p = Path(self.__data_directory / 'config' )
-            csv_file = p / basename(link)
-            if not isfile(csv_file):
-                logger.info('Downloading location stations from MeteoFrance')                    
-                urlretrieve(link, csv_file)
-            with open(csv_file, "r") as f:                
-                reader = DictReader(f, delimiter=';')
-                logger.info(f'Inserting location stations in {db.dbname} database')                    
-                for row in reader:
-                    longitude, latitude = eval(row['Longitude']), eval(row['Latitude'])
-                    point = (longitude, latitude)
-                    timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude']))
-                    if timezone_name is None:
-                        timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']), 
-                                                               lat = eval(row['Latitude']),
-                                                               delta_degree = 5) 
-                    cet = pytz.timezone(timezone_name)
-                    dt = datetime.now()
-                    offset = cet.utcoffset(dt, is_dst = True)
-                    shift = int(offset / timedelta(hours=1))
-                    request = f"""INSERT INTO "METEO_STATION" ("METST_NAME", "METST_IDNAME", "METST_LOCATION", "METST_TIMEZONE") 
-                                  VALUES ('{row['Nom'].replace("'",'’')}', '{row['ID']}',
-                                  point({row['Latitude']}, {row['Longitude']}), {shift});"""
-                    db.cursor.execute(request)
+        Constructor of the MeteoFrance source of feature.
 
+        - It will reinitiate the data directory, if asked in the config
+          features.cfg file.
+        - It searches for the nb_stations meteo stations closest to the provided
+          point (longitude and latitude)
 
-    def __insert_features(self):
-        logger.info('Inserting MeteoFrance list of features from meteo_features.csv')            
-        csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv'
-        with PostgreSQLDBConnection.Instance() as db:
-            with open(csv_file, "r") as f:                
-                reader = DictReader(f, delimiter=',')
-                next(reader)
-                for row in reader:
-                    request = f"""INSERT INTO "METEO_FEATURE" ("METFT_NAME", "PARAM_ID_PARAMETER")
-                                  VALUES ('{row['METFT_NAME']}', {row['PARAM_ID_PARAMETER']});"""
-                    db.cursor.execute(request)
-        
-        
-            
-    def __collect_historical_data(self):
-        '''
-        We collect all csv files from January 1996 until the month
-        before now. The argument in the url to download are of the
-        form 201001 for January 2010. We start by computing all these
-        patterns, in historical list.
-        '''
-        # List of year-months to consider
-        historical = []
-        date_end = datetime.now()
-        for year in range(1996, date_end.year+1):
-            for month in range(1,13):
-                date = datetime(year, month, 1)
-                if date <= date_end:
-                    historical.append(date.strftime("%Y%m"))
-                    
-        # We download all csv files from meteofrance that are not in 
-        # the data repository
-        meteo_data = self.__data_directory / 'historical' 
-        p = Path(meteo_data)
-        p.mkdir(exist_ok=True, parents=True)                
-        for date in historical:
-            if not isfile(meteo_data / ('synop.'+date+'.csv')):
-                link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
-                link += date + '.csv.gz' 
-                download_path = meteo_data / basename(link)
-                urlretrieve(link, download_path)
-                with gzip.open(download_path, 'rb') as f:
-                    csv_file = meteo_data / basename(link[:-3])
-                    with open(csv_file, 'w') as g:
-                        g.write(f.read().decode())
-                        remove(meteo_data / basename(link))
-    
-    
-    def __from_date_to_datetz(self, date, a, b):          
-        if not hasattr(self, '__meteo_station_tz'):
-            self.__meteo_station_tz = {}
-            tf = TimezoneFinder(in_memory=True)
-            with PostgreSQLDBConnection.Instance() as db:
-                db.cursor.execute('select "METST_IDNAME", "METST_LOCATION" from "METEO_STATION";')
-                list_of_rows = db.cursor.fetchall()                
-            for k in list_of_rows:
-                print('\n',k)
-                longitude, latitude = eval(k[1])
-                print(longitude, latitude)
-                print(type(longitude))
-                timezone_name = tf.timezone_at(lng = longitude, lat = latitude)
-                if timezone_name is None:
-                    timezone_name = tf.closest_timezone_at(lng = longitude, 
-                                                           lat = latitude,
-                                                           delta_degree = 13,
-                                                           exact_computation=True, 
-                                                           #return_distances=True,
-                                                           force_evaluation=True) 
-                cet = pytz.timezone(timezone_name)
-                dt = datetime.now()
-                offset = cet.utcoffset(dt, is_dst = True)
-                shift = int(offset / timedelta(hours=1))
-                self.__meteo_station_tz[k[0]] = shift
+        For more information about this source of feature, see:
+    https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
 
-            print(self.__meteo_station_tz)
-            exit()
-            '''longitude, latitude = eval(row['Longitude']), eval(row['Latitude'])
-            point = (longitude, latitude)
-            timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude']))
-            if timezone_name is None:
-                timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']), 
-                                                       lat = eval(row['Latitude']),
-                                                       delta_degree = 5) 
-            cet = pytz.timezone(timezone_name)
-            dt = datetime.now()
-            offset = cet.utcoffset(dt, is_dst = True)
-            shift = int(offset / timedelta(hours=1))
-            
-            self.__meteo_station_tz'''
-        exit()
-        return date[:4]+'-'+date[4:6]+'-'+date[6:8]+' '+date[8:10]+':00:00+01'
+        Parameters:
+            latitude (float): The latitude from which we want the meteo features.
+            longitude (float): The longitude from which we want the meteo features.
+            nb_stations (int): Number of closest stations to consider.
 
+        '''
+        self._latitude = latitude
+        self._longitude = longitude
+        self._nb_stations = nb_stations
+
+        self._data_directory = (Path.cwd() / 'data') / 'meteo_france'
 
-    def __insert_historical_data(self):
-        csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv'
-        with PostgreSQLDBConnection.Instance() as db:
-            db.cursor.execute('SELECT * from "METEO_FEATURE";')
-            list_of_rows = db.cursor.fetchall()
-            dico = {u[1]:u[0] for u in list_of_rows}
+        # Re-creating data directory architecture for MeteoFrance, if asked
+        config = ConfigParser()
+        config.read((Path.cwd() / 'config') / 'features.cfg')
+        if eval(config['meteofrance']['regenerate']):
+            self._regenerate_directory()
 
-        with open(csv_file, "r") as f:                
-            reader = DictReader(f, delimiter=',')
-            next(reader)
-            dico_features = {row["abbreviation"]:dico[row["METFT_NAME"]] for row in reader}
+        # Collecting the closest meteo station
+        self._stations = self._get_stations()
+        print(self._stations)
 
-        with PostgreSQLDBConnection.Instance() as db:
-            db.cursor.execute('SELECT * from "METEO_STATION";')
-            list_of_rows = db.cursor.fetchall()
-            dico_station = {u[2]:u[0] for u in list_of_rows}
 
-        for feature in dico_features:
-            logger.info(f'Integrating {[u for u in dico if dico[u]==dico_features[feature]][0]} feature')
-            for station in dico_station:
-                logger.info(f'  - Dealing with meteo station n°: {station}')                            
-                csv_file = tempfile.NamedTemporaryFile('w')
-                dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical'
-                for csv_meteo in listdir(dir_data):
-                    with open(dir_data / csv_meteo, "r") as f:                
-                        reader = DictReader(f, delimiter=';')
-                        csv_file.write(''.join([row[feature]+",'"+self.__from_date_to_datetz(row["date"], station, dico_station[station])+"',"+str(dico_features[feature])+','+str(dico_station[station])+'\n' for row in reader if row['numer_sta'] == station]))
-                csv_file.flush()
-                with open(csv_file.name, 'r') as f:
-                    with PostgreSQLDBConnection.Instance() as db:
-                        db.cursor.copy_from(f, '"METEO_FEATURE_VALUE"', sep=',', null='mq',
-                                            columns=['"METVA_VALUE"','"METVA_DATETIME"','"METFT_ID_METEO_FEATURE"','"METST_ID_METEO_STATION"'])
 
-                        
+    def _regenerate_directory(self):
+        '''
+        Re-creating data directory architecture for MeteoFrance
+        '''
+        logger.info("Regenerating meteofrance data directory")
+        try:
+            rmtree(self._data_directory)
+        except:
+            pass
+        p = Path(self._data_directory / 'historical')
+        p.mkdir(exist_ok=True, parents=True)
+        p = Path(self._data_directory / 'config')
+        p.mkdir(exist_ok=True, parents=True)
 
 
-        
-            
-    
-    def __generate(self):
-        # Meteo stations must be collected first, if not in the database
-        with PostgreSQLDBConnection.Instance() as db:
-            db.cursor.execute('SELECT count(*) FROM "METEO_STATION";')
-            updated_meteo_station = db.cursor.fetchone()[0]
-        if not updated_meteo_station:
-            self.__collect_stations()
 
-        # Features from data/meteo_france/config/meteo_features.csv
-        # must be inserted in the database, if not already done
-        with PostgreSQLDBConnection.Instance() as db:
-            db.cursor.execute('SELECT count(*) FROM "METEO_FEATURE";')
-            updated_meteo_features = db.cursor.fetchone()[0]
-        if not updated_meteo_features:
-            self.__insert_features()
-        
-        # Downloading meteofrance historical csv files
-        logger.info('Downloading historical csv files from MeteoFrance, if needed')            
-        self.__collect_historical_data()
-        
-        self.__insert_historical_data()
+    def _get_stations(self):
+        '''
+        Collect (after downloading them, if needed) the stations and their
+        locations in a dictionary
 
-    def update(self):
+        Returns:
+            list: The self._nb_stations closest station IDs, starting by the
+                  closest one
+        '''
+        # The csv file of meteo stations (names, ids and locations) if downloaded,
+        # if not available in the config directory within data / meteo_france
+        link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
+        p = Path(self._data_directory / 'config' )
+        csv_file = p / basename(link)
+        if not isfile(csv_file):
+            logger.info('Downloading location stations from MeteoFrance')
+            urlretrieve(link, csv_file)
+
+        # A dictionary for the meteo stations is created
+        dict_stations = {}
+        logger.info('Collecting information about meteo stations')
+        with open(csv_file, "r") as f:
+            reader = DictReader(f, delimiter=';')
+            for row in reader:
+                latitude, longitude = eval(row['Latitude']), eval(row['Longitude'])
+                dict_stations[row['Nom'].replace("'",'’')] = {
+                    'id' : row['ID'],
+                    'longitude' : longitude,
+                    'latitude' : latitude,
+                    'distance' : distance.vincenty(
+                        (self._latitude, self._longitude),
+                        (latitude, longitude)).km
+                }
+
+        # Find the closest stations
+        logger.info('Finding the closest stations')
+        stations_by_distance = sorted(dict_stations.keys(),
+                                      key = lambda x: dict_stations[x]['distance'])
+        logger.info(f'The {self._nb_stations} closest stations are: '
+                    f'{", ".join(stations_by_distance[:self._nb_stations])}.')
+        return [dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
+
+
+
+    def _get_feature(self):
         '''
-        Update the MeteoFrance features with the last available data
+        TODO
         '''
-        # We collect archive files from MeteoFrance, until the current month
-        # by using the same method than for data generation : this is currently
-        # based on the presence of a synop.+date+.csv' file in the 
-        # data/meteo_france/historical directory. The file corresponding to the 
-        # current month is deleted first, so that its most recent version will be downloaded
-        # by colling self.__collect_historical_data
-        # TODO: updates according to a PostgreSQL request ?
-        logger.info('Update historical csv files from MeteoFrance, if needed')
-        today = datetime.now()
-        todel = 'synop.'+today.strftime("%Y%m")+".csv"
-        remove(self.__data_directory / 'historical' / todel)
-        system("touch "+todel)
-        self.__collect_historical_data()
-        logger.info('Inserting csv files in database')
-        self.__insert_historical_data()
-        
+        pass
diff --git a/main.py b/main.py
index 3d3739fa46c6016c995f341d6140aec3bb58af49..6810673765688db13b0a2b8ec6e3cef5ff53bb1f 100644 (file)
--- a/main.py
+++ b/main.py
@@ -1,6 +1,5 @@
-from extomeAI.source import MeteoFrance
+from lib.source import MeteoFrance
 
-from celery import Celery
 from configparser import ConfigParser
 from logging.config import fileConfig
 from logging import getLogger
@@ -13,14 +12,14 @@ fileConfig((Path.cwd() / 'config') / 'logging.cfg')
 logger = getLogger()
 
 
-class ExtomeEngine:
+class Engine:
     def __init__(self, clean = False):
-        logger.info("Extome-IA engine launched")
+        logger.info("Predictops engine launched")
         if clean:
             self.clean()
             print("Ne pas oublier d'exporter la BDD dans pgModeler")
             print("Ni de copier l'archive dans la data")
-    
+
     def clean(self):
         # Cleaning the data directory
         logger.info("Cleaning and restoring data directory")
@@ -29,30 +28,30 @@ class ExtomeEngine:
             rmtree(directory)
         p = Path(Path.cwd() / 'data')
         p.mkdir()
-    
+
         # Cleaning the postgresql database
         config = ConfigParser()
         config.read((Path.cwd() / 'config') / 'main.cfg')
-        
+
         host   = config['postgresql']['host']
         user   = config['postgresql']['user']
         port   = config['postgresql']['port']
         dbname = config['postgresql']['dbname']
-        
+
         logger.info("PostgreSQL database deletion")
         command = ['dropdb', '-h', host, '-U', user, '-p', port, dbname]
         process = Popen(command, stdout=PIPE, stderr=PIPE)
         process.communicate()
-        
+
         logger.info("PostgreSQL database creation")
         command = ['createdb', '-h', host, '-U', user, '-p', port, dbname]
         process = Popen(command, stdout=PIPE, stderr=PIPE)
-        process.communicate() 
-    
+        process.communicate()
+
     def add_meteofrance(self):
         self.meteofrance = MeteoFrance()
-        
-        
 
-engine = ExtomeEngine(clean = False)
+
+
+engine = Engine(clean = False)
 engine.add_meteofrance()
\ No newline at end of file
index 70af5d511e7d7f5ecd05b3333f0a2093ed030895..0cf261a94bfed271531a07f4352b5a382a75de63 100644 (file)
@@ -1,3 +1,2 @@
-numpy==1.18.1
-scipy==1.4.1
-xgboost==0.90
+geographiclib==1.50
+geopy==1.21.0