]> AND Private Git Repository - predictops.git/blobdiff - lib/source/meteofrance.py
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Nouvelle approche : csv -> dico -> dataframe, au lieu de postgresql, en
[predictops.git] / lib / source / meteofrance.py
index 79c2546596e6a8e6f35a8e1ee6af40351d32116a..315aac3edef79e2178ad344ba52eea345f54d768 100644 (file)
-from extomeAI.lib.connector import PostgreSQLDBConnection
-
 from configparser import ConfigParser
 from csv import DictReader
 from configparser import ConfigParser
 from csv import DictReader
-from datetime import datetime, timedelta
-from os import remove, system, listdir
+from geopy import distance
 from pathlib import Path
 from shutil import rmtree
 from pathlib import Path
 from shutil import rmtree
-from timezonefinder import TimezoneFinder
 
 from logging.config import fileConfig
 from os.path import isfile, basename
 from urllib.request import urlretrieve
 
 import logging
 
 from logging.config import fileConfig
 from os.path import isfile, basename
 from urllib.request import urlretrieve
 
 import logging
-import gzip
-import pytz
-import tempfile
+
 
 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
 logger = logging.getLogger()
 
 class MeteoFrance:
 
 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
 logger = logging.getLogger()
 
 class MeteoFrance:
-    def __init__(self):
-        '''
-        Constructor
-        See: https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
-        '''    
-        self.__data_directory = (Path.cwd() / 'data') / 'meteo_france'
-        # Re-creating data directory architecture for MeteoFrance, if asked
-        config = ConfigParser()
-        config.read((Path.cwd() / 'config') / 'features.cfg')
-        if eval(config['meteofrance']['regenerate']):
-            logger.info("Regenerating meteofrance data directory")
-            try:
-                rmtree(self.__data_directory)
-            except:
-                pass
-            p = Path(self.__data_directory  / 'historical')
-            p.mkdir(exist_ok=True, parents=True)
-            p = Path(self.__data_directory  / 'config')
-            p.mkdir(exist_ok=True, parents=True)
-        if eval(config['meteofrance']['reinsert']):
-            logger.info("Reinserting meteofrance database")
-            with PostgreSQLDBConnection.Instance() as db:
-                db.cursor.execute(f'DELETE FROM "METEO_FEATURE";')
-                db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_METFT_ID_seq" RESTART WITH 1;')
-                db.cursor.execute(f'DELETE FROM "METEO_STATION";')
-                db.cursor.execute(f'ALTER SEQUENCE "METEO_STATION_METST_ID_seq" RESTART WITH 1;')
-                db.cursor.execute(f'DELETE FROM "METEO_FEATURE_VALUE";')
-                db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_VALUE_METVA_ID_seq" RESTART WITH 1;')
-            self.__generate()
-        
 
 
-    def __collect_stations(self):
-        '''
-        Filling METEO_STATION table from location schema
+    def __init__(self, latitude = 47.25, longitude = 6.0333, nb_stations = 3):
         '''
         '''
-        tf = TimezoneFinder(in_memory=True)
-        
-        with PostgreSQLDBConnection.Instance() as db:
-            link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
-            p = Path(self.__data_directory / 'config' )
-            csv_file = p / basename(link)
-            if not isfile(csv_file):
-                logger.info('Downloading location stations from MeteoFrance')                    
-                urlretrieve(link, csv_file)
-            with open(csv_file, "r") as f:                
-                reader = DictReader(f, delimiter=';')
-                logger.info(f'Inserting location stations in {db.dbname} database')                    
-                for row in reader:
-                    longitude, latitude = eval(row['Longitude']), eval(row['Latitude'])
-                    point = (longitude, latitude)
-                    timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude']))
-                    if timezone_name is None:
-                        timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']), 
-                                                               lat = eval(row['Latitude']),
-                                                               delta_degree = 5) 
-                    cet = pytz.timezone(timezone_name)
-                    dt = datetime.now()
-                    offset = cet.utcoffset(dt, is_dst = True)
-                    shift = int(offset / timedelta(hours=1))
-                    request = f"""INSERT INTO "METEO_STATION" ("METST_NAME", "METST_IDNAME", "METST_LOCATION", "METST_TIMEZONE") 
-                                  VALUES ('{row['Nom'].replace("'",'’')}', '{row['ID']}',
-                                  point({row['Latitude']}, {row['Longitude']}), {shift});"""
-                    db.cursor.execute(request)
+        Constructor of the MeteoFrance source of feature.
 
 
+        - It will reinitiate the data directory, if asked in the config
+          features.cfg file.
+        - It searches for the nb_stations meteo stations closest to the provided
+          point (longitude and latitude)
 
 
-    def __insert_features(self):
-        logger.info('Inserting MeteoFrance list of features from meteo_features.csv')            
-        csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv'
-        with PostgreSQLDBConnection.Instance() as db:
-            with open(csv_file, "r") as f:                
-                reader = DictReader(f, delimiter=',')
-                next(reader)
-                for row in reader:
-                    request = f"""INSERT INTO "METEO_FEATURE" ("METFT_NAME", "PARAM_ID_PARAMETER")
-                                  VALUES ('{row['METFT_NAME']}', {row['PARAM_ID_PARAMETER']});"""
-                    db.cursor.execute(request)
-        
-        
-            
-    def __collect_historical_data(self):
-        '''
-        We collect all csv files from January 1996 until the month
-        before now. The argument in the url to download are of the
-        form 201001 for January 2010. We start by computing all these
-        patterns, in historical list.
-        '''
-        # List of year-months to consider
-        historical = []
-        date_end = datetime.now()
-        for year in range(1996, date_end.year+1):
-            for month in range(1,13):
-                date = datetime(year, month, 1)
-                if date <= date_end:
-                    historical.append(date.strftime("%Y%m"))
-                    
-        # We download all csv files from meteofrance that are not in 
-        # the data repository
-        meteo_data = self.__data_directory / 'historical' 
-        p = Path(meteo_data)
-        p.mkdir(exist_ok=True, parents=True)                
-        for date in historical:
-            if not isfile(meteo_data / ('synop.'+date+'.csv')):
-                link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
-                link += date + '.csv.gz' 
-                download_path = meteo_data / basename(link)
-                urlretrieve(link, download_path)
-                with gzip.open(download_path, 'rb') as f:
-                    csv_file = meteo_data / basename(link[:-3])
-                    with open(csv_file, 'w') as g:
-                        g.write(f.read().decode())
-                        remove(meteo_data / basename(link))
-    
-    
-    def __from_date_to_datetz(self, date, a, b):          
-        if not hasattr(self, '__meteo_station_tz'):
-            self.__meteo_station_tz = {}
-            tf = TimezoneFinder(in_memory=True)
-            with PostgreSQLDBConnection.Instance() as db:
-                db.cursor.execute('select "METST_IDNAME", "METST_LOCATION" from "METEO_STATION";')
-                list_of_rows = db.cursor.fetchall()                
-            for k in list_of_rows:
-                print('\n',k)
-                longitude, latitude = eval(k[1])
-                print(longitude, latitude)
-                print(type(longitude))
-                timezone_name = tf.timezone_at(lng = longitude, lat = latitude)
-                if timezone_name is None:
-                    timezone_name = tf.closest_timezone_at(lng = longitude, 
-                                                           lat = latitude,
-                                                           delta_degree = 13,
-                                                           exact_computation=True, 
-                                                           #return_distances=True,
-                                                           force_evaluation=True) 
-                cet = pytz.timezone(timezone_name)
-                dt = datetime.now()
-                offset = cet.utcoffset(dt, is_dst = True)
-                shift = int(offset / timedelta(hours=1))
-                self.__meteo_station_tz[k[0]] = shift
+        For more information about this source of feature, see:
+    https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
 
 
-            print(self.__meteo_station_tz)
-            exit()
-            '''longitude, latitude = eval(row['Longitude']), eval(row['Latitude'])
-            point = (longitude, latitude)
-            timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude']))
-            if timezone_name is None:
-                timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']), 
-                                                       lat = eval(row['Latitude']),
-                                                       delta_degree = 5) 
-            cet = pytz.timezone(timezone_name)
-            dt = datetime.now()
-            offset = cet.utcoffset(dt, is_dst = True)
-            shift = int(offset / timedelta(hours=1))
-            
-            self.__meteo_station_tz'''
-        exit()
-        return date[:4]+'-'+date[4:6]+'-'+date[6:8]+' '+date[8:10]+':00:00+01'
+        Parameters:
+            latitude (float): The latitude from which we want the meteo features.
+            longitude (float): The longitude from which we want the meteo features.
+            nb_stations (int): Number of closest stations to consider.
 
 
+        '''
+        self._latitude = latitude
+        self._longitude = longitude
+        self._nb_stations = nb_stations
+
+        self._data_directory = (Path.cwd() / 'data') / 'meteo_france'
 
 
-    def __insert_historical_data(self):
-        csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv'
-        with PostgreSQLDBConnection.Instance() as db:
-            db.cursor.execute('SELECT * from "METEO_FEATURE";')
-            list_of_rows = db.cursor.fetchall()
-            dico = {u[1]:u[0] for u in list_of_rows}
+        # Re-creating data directory architecture for MeteoFrance, if asked
+        config = ConfigParser()
+        config.read((Path.cwd() / 'config') / 'features.cfg')
+        if eval(config['meteofrance']['regenerate']):
+            self._regenerate_directory()
 
 
-        with open(csv_file, "r") as f:                
-            reader = DictReader(f, delimiter=',')
-            next(reader)
-            dico_features = {row["abbreviation"]:dico[row["METFT_NAME"]] for row in reader}
+        # Collecting the closest meteo station
+        self._stations = self._get_stations()
+        print(self._stations)
 
 
-        with PostgreSQLDBConnection.Instance() as db:
-            db.cursor.execute('SELECT * from "METEO_STATION";')
-            list_of_rows = db.cursor.fetchall()
-            dico_station = {u[2]:u[0] for u in list_of_rows}
 
 
-        for feature in dico_features:
-            logger.info(f'Integrating {[u for u in dico if dico[u]==dico_features[feature]][0]} feature')
-            for station in dico_station:
-                logger.info(f'  - Dealing with meteo station n°: {station}')                            
-                csv_file = tempfile.NamedTemporaryFile('w')
-                dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical'
-                for csv_meteo in listdir(dir_data):
-                    with open(dir_data / csv_meteo, "r") as f:                
-                        reader = DictReader(f, delimiter=';')
-                        csv_file.write(''.join([row[feature]+",'"+self.__from_date_to_datetz(row["date"], station, dico_station[station])+"',"+str(dico_features[feature])+','+str(dico_station[station])+'\n' for row in reader if row['numer_sta'] == station]))
-                csv_file.flush()
-                with open(csv_file.name, 'r') as f:
-                    with PostgreSQLDBConnection.Instance() as db:
-                        db.cursor.copy_from(f, '"METEO_FEATURE_VALUE"', sep=',', null='mq',
-                                            columns=['"METVA_VALUE"','"METVA_DATETIME"','"METFT_ID_METEO_FEATURE"','"METST_ID_METEO_STATION"'])
 
 
-                        
+    def _regenerate_directory(self):
+        '''
+        Re-creating data directory architecture for MeteoFrance
+        '''
+        logger.info("Regenerating meteofrance data directory")
+        try:
+            rmtree(self._data_directory)
+        except:
+            pass
+        p = Path(self._data_directory / 'historical')
+        p.mkdir(exist_ok=True, parents=True)
+        p = Path(self._data_directory / 'config')
+        p.mkdir(exist_ok=True, parents=True)
 
 
 
 
-        
-            
-    
-    def __generate(self):
-        # Meteo stations must be collected first, if not in the database
-        with PostgreSQLDBConnection.Instance() as db:
-            db.cursor.execute('SELECT count(*) FROM "METEO_STATION";')
-            updated_meteo_station = db.cursor.fetchone()[0]
-        if not updated_meteo_station:
-            self.__collect_stations()
 
 
-        # Features from data/meteo_france/config/meteo_features.csv
-        # must be inserted in the database, if not already done
-        with PostgreSQLDBConnection.Instance() as db:
-            db.cursor.execute('SELECT count(*) FROM "METEO_FEATURE";')
-            updated_meteo_features = db.cursor.fetchone()[0]
-        if not updated_meteo_features:
-            self.__insert_features()
-        
-        # Downloading meteofrance historical csv files
-        logger.info('Downloading historical csv files from MeteoFrance, if needed')            
-        self.__collect_historical_data()
-        
-        self.__insert_historical_data()
+    def _get_stations(self):
+        '''
+        Collect (after downloading them, if needed) the stations and their
+        locations in a dictionary
 
 
-    def update(self):
+        Returns:
+            list: The self._nb_stations closest station IDs, starting by the
+                  closest one
+        '''
+        # The csv file of meteo stations (names, ids and locations) if downloaded,
+        # if not available in the config directory within data / meteo_france
+        link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
+        p = Path(self._data_directory / 'config' )
+        csv_file = p / basename(link)
+        if not isfile(csv_file):
+            logger.info('Downloading location stations from MeteoFrance')
+            urlretrieve(link, csv_file)
+
+        # A dictionary for the meteo stations is created
+        dict_stations = {}
+        logger.info('Collecting information about meteo stations')
+        with open(csv_file, "r") as f:
+            reader = DictReader(f, delimiter=';')
+            for row in reader:
+                latitude, longitude = eval(row['Latitude']), eval(row['Longitude'])
+                dict_stations[row['Nom'].replace("'",'’')] = {
+                    'id' : row['ID'],
+                    'longitude' : longitude,
+                    'latitude' : latitude,
+                    'distance' : distance.vincenty(
+                        (self._latitude, self._longitude),
+                        (latitude, longitude)).km
+                }
+
+        # Find the closest stations
+        logger.info('Finding the closest stations')
+        stations_by_distance = sorted(dict_stations.keys(),
+                                      key = lambda x: dict_stations[x]['distance'])
+        logger.info(f'The {self._nb_stations} closest stations are: '
+                    f'{", ".join(stations_by_distance[:self._nb_stations])}.')
+        return [dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
+
+
+
+    def _get_feature(self):
         '''
         '''
-        Update the MeteoFrance features with the last available data
+        TODO
         '''
         '''
-        # We collect archive files from MeteoFrance, until the current month
-        # by using the same method than for data generation : this is currently
-        # based on the presence of a synop.+date+.csv' file in the 
-        # data/meteo_france/historical directory. The file corresponding to the 
-        # current month is deleted first, so that its most recent version will be downloaded
-        # by colling self.__collect_historical_data
-        # TODO: updates according to a PostgreSQL request ?
-        logger.info('Update historical csv files from MeteoFrance, if needed')
-        today = datetime.now()
-        todel = 'synop.'+today.strftime("%Y%m")+".csv"
-        remove(self.__data_directory / 'historical' / todel)
-        system("touch "+todel)
-        self.__collect_historical_data()
-        logger.info('Inserting csv files in database')
-        self.__insert_historical_data()
-        
+        pass