]> AND Private Git Repository - predictops.git/blobdiff - lib/source/meteofrance.py
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Adding calendar features
[predictops.git] / lib / source / meteofrance.py
index 79c2546596e6a8e6f35a8e1ee6af40351d32116a..c524089e6362d0ec48c6d6aab57596b9a95887bf 100644 (file)
-from extomeAI.lib.connector import PostgreSQLDBConnection
-
 from configparser import ConfigParser
 from csv import DictReader
 from configparser import ConfigParser
 from csv import DictReader
-from datetime import datetime, timedelta
-from os import remove, system, listdir
-from pathlib import Path
-from shutil import rmtree
-from timezonefinder import TimezoneFinder
-
+from datetime import datetime
+from geopy.distance import vincenty
+from logging import getLogger
 from logging.config import fileConfig
 from logging.config import fileConfig
+from os import listdir, remove, system
 from os.path import isfile, basename
 from os.path import isfile, basename
+from pathlib import Path
+from shutil import rmtree
 from urllib.request import urlretrieve
 
 from urllib.request import urlretrieve
 
-import logging
 import gzip
 import gzip
-import pytz
-import tempfile
+
 
 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
 
 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
-logger = logging.getLogger()
+logger = getLogger()
 
 class MeteoFrance:
 
 class MeteoFrance:
-    def __init__(self):
+
+    def __init__(self, latitude = 47.25, longitude = 6.0333, nb_stations = 3,
+                 start = datetime.strptime('19960101000000', '%Y%m%d%H%M%S'),
+                 end = datetime.now(),
+                 features = []):
+        '''
+        Constructor of the MeteoFrance source of feature.
+
+        - It will reinitiate the data directory, if asked in the config
+          features.cfg file.
+        - It searches for the nb_stations meteo stations closest to the provided
+          point (longitude and latitude)
+
+        For more information about this source of feature, see:
+    https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
+
+        Parameters:
+            latitude (float): The latitude from which we want the meteo features.
+            longitude (float): The longitude from which we want the meteo features.
+            nb_stations (int): Number of closest stations to consider.
+            features (list): Weather features that have to be integrated, according
+                  to their names in meteofrance_features.csv (cf. config directory)
+
         '''
         '''
-        Constructor
-        See: https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
-        '''    
-        self.__data_directory = (Path.cwd() / 'data') / 'meteo_france'
+        self._latitude = latitude
+        self._longitude = longitude
+        self._nb_stations = nb_stations
+        self._start = start
+        self._end = end
+        self._features = features
+
+        self._data_directory = (Path.cwd() / 'data') / 'meteo_france'
+
+        self._dated_features = None
+
         # Re-creating data directory architecture for MeteoFrance, if asked
         config = ConfigParser()
         config.read((Path.cwd() / 'config') / 'features.cfg')
         if eval(config['meteofrance']['regenerate']):
         # Re-creating data directory architecture for MeteoFrance, if asked
         config = ConfigParser()
         config.read((Path.cwd() / 'config') / 'features.cfg')
         if eval(config['meteofrance']['regenerate']):
-            logger.info("Regenerating meteofrance data directory")
-            try:
-                rmtree(self.__data_directory)
-            except:
-                pass
-            p = Path(self.__data_directory  / 'historical')
-            p.mkdir(exist_ok=True, parents=True)
-            p = Path(self.__data_directory  / 'config')
-            p.mkdir(exist_ok=True, parents=True)
-        if eval(config['meteofrance']['reinsert']):
-            logger.info("Reinserting meteofrance database")
-            with PostgreSQLDBConnection.Instance() as db:
-                db.cursor.execute(f'DELETE FROM "METEO_FEATURE";')
-                db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_METFT_ID_seq" RESTART WITH 1;')
-                db.cursor.execute(f'DELETE FROM "METEO_STATION";')
-                db.cursor.execute(f'ALTER SEQUENCE "METEO_STATION_METST_ID_seq" RESTART WITH 1;')
-                db.cursor.execute(f'DELETE FROM "METEO_FEATURE_VALUE";')
-                db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_VALUE_METVA_ID_seq" RESTART WITH 1;')
-            self.__generate()
-        
-
-    def __collect_stations(self):
+            self._regenerate_directory()
+
+        # Collecting the closest meteo station
+        self._stations = self._get_stations()
+
+
+
+    def _regenerate_directory(self):
         '''
         '''
-        Filling METEO_STATION table from location schema
+        Re-creating data directory architecture for MeteoFrance
         '''
         '''
-        tf = TimezoneFinder(in_memory=True)
-        
-        with PostgreSQLDBConnection.Instance() as db:
-            link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
-            p = Path(self.__data_directory / 'config' )
-            csv_file = p / basename(link)
-            if not isfile(csv_file):
-                logger.info('Downloading location stations from MeteoFrance')                    
-                urlretrieve(link, csv_file)
-            with open(csv_file, "r") as f:                
-                reader = DictReader(f, delimiter=';')
-                logger.info(f'Inserting location stations in {db.dbname} database')                    
-                for row in reader:
-                    longitude, latitude = eval(row['Longitude']), eval(row['Latitude'])
-                    point = (longitude, latitude)
-                    timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude']))
-                    if timezone_name is None:
-                        timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']), 
-                                                               lat = eval(row['Latitude']),
-                                                               delta_degree = 5) 
-                    cet = pytz.timezone(timezone_name)
-                    dt = datetime.now()
-                    offset = cet.utcoffset(dt, is_dst = True)
-                    shift = int(offset / timedelta(hours=1))
-                    request = f"""INSERT INTO "METEO_STATION" ("METST_NAME", "METST_IDNAME", "METST_LOCATION", "METST_TIMEZONE") 
-                                  VALUES ('{row['Nom'].replace("'",'’')}', '{row['ID']}',
-                                  point({row['Latitude']}, {row['Longitude']}), {shift});"""
-                    db.cursor.execute(request)
-
-
-    def __insert_features(self):
-        logger.info('Inserting MeteoFrance list of features from meteo_features.csv')            
-        csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv'
-        with PostgreSQLDBConnection.Instance() as db:
-            with open(csv_file, "r") as f:                
-                reader = DictReader(f, delimiter=',')
-                next(reader)
-                for row in reader:
-                    request = f"""INSERT INTO "METEO_FEATURE" ("METFT_NAME", "PARAM_ID_PARAMETER")
-                                  VALUES ('{row['METFT_NAME']}', {row['PARAM_ID_PARAMETER']});"""
-                    db.cursor.execute(request)
-        
-        
-            
-    def __collect_historical_data(self):
+        logger.info("Regenerating meteofrance data directory")
+        try:
+            rmtree(self._data_directory)
+        except:
+            pass
+        p = Path(self._data_directory / 'historical')
+        p.mkdir(exist_ok=True, parents=True)
+        p = Path(self._data_directory / 'config')
+        p.mkdir(exist_ok=True, parents=True)
+
+
+
+    def _get_stations(self):
+        '''
+        Collect (after downloading them, if needed) the stations and their
+        locations in a dictionary
+
+        Returns:
+            list: The self._nb_stations closest station IDs, starting by the
+                  closest one
+        '''
+        # The csv file of meteo stations (names, ids and locations) if downloaded,
+        # if not available in the config directory within data / meteo_france
+        link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
+        p = Path(self._data_directory / 'config' )
+        csv_file = p / basename(link)
+        if not isfile(csv_file):
+            logger.info('Downloading location stations from MeteoFrance')
+            urlretrieve(link, csv_file)
+
+        # A dictionary for the meteo stations is created
+        self._dict_stations = {}
+        logger.info('Collecting information about meteo stations')
+        with open(csv_file, "r") as f:
+            reader = DictReader(f, delimiter=';')
+            for row in reader:
+                latitude, longitude = eval(row['Latitude']), eval(row['Longitude'])
+                self._dict_stations[row['Nom'].replace("'",'’')] = {
+                    'id' : row['ID'],
+                    'longitude' : longitude,
+                    'latitude' : latitude,
+                    'distance' : vincenty(
+                        (self._latitude, self._longitude),
+                        (latitude, longitude)).km
+                }
+
+        # Find the closest stations
+        logger.info('Finding the closest stations')
+        stations_by_distance = sorted(self._dict_stations.keys(),
+                                      key = lambda x: self._dict_stations[x]['distance'])
+        logger.info(f'The {self._nb_stations} closest stations are: '
+                    f'{", ".join(stations_by_distance[:self._nb_stations])}.')
+        return [self._dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
+
+
+
+    def _collect_historical_data(self):
         '''
         We collect all csv files from January 1996 until the month
         before now. The argument in the url to download are of the
         '''
         We collect all csv files from January 1996 until the month
         before now. The argument in the url to download are of the
@@ -109,22 +132,22 @@ class MeteoFrance:
         '''
         # List of year-months to consider
         historical = []
         '''
         # List of year-months to consider
         historical = []
-        date_end = datetime.now()
-        for year in range(1996, date_end.year+1):
+        date_end = self._end
+        for year in range(self._start.year, date_end.year+1):
             for month in range(1,13):
                 date = datetime(year, month, 1)
             for month in range(1,13):
                 date = datetime(year, month, 1)
-                if date <= date_end:
+                if date >= self._start and date <= date_end:
                     historical.append(date.strftime("%Y%m"))
                     historical.append(date.strftime("%Y%m"))
-                    
-        # We download all csv files from meteofrance that are not in 
+
+        # We download all csv files from meteofrance that are not in
         # the data repository
         # the data repository
-        meteo_data = self.__data_directory / 'historical' 
+        meteo_data = self._data_directory / 'historical'
         p = Path(meteo_data)
         p = Path(meteo_data)
-        p.mkdir(exist_ok=True, parents=True)                
+        p.mkdir(exist_ok=True, parents=True)
         for date in historical:
             if not isfile(meteo_data / ('synop.'+date+'.csv')):
                 link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
         for date in historical:
             if not isfile(meteo_data / ('synop.'+date+'.csv')):
                 link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
-                link += date + '.csv.gz' 
+                link += date + '.csv.gz'
                 download_path = meteo_data / basename(link)
                 urlretrieve(link, download_path)
                 with gzip.open(download_path, 'rb') as f:
                 download_path = meteo_data / basename(link)
                 urlretrieve(link, download_path)
                 with gzip.open(download_path, 'rb') as f:
@@ -132,113 +155,8 @@ class MeteoFrance:
                     with open(csv_file, 'w') as g:
                         g.write(f.read().decode())
                         remove(meteo_data / basename(link))
                     with open(csv_file, 'w') as g:
                         g.write(f.read().decode())
                         remove(meteo_data / basename(link))
-    
-    
-    def __from_date_to_datetz(self, date, a, b):          
-        if not hasattr(self, '__meteo_station_tz'):
-            self.__meteo_station_tz = {}
-            tf = TimezoneFinder(in_memory=True)
-            with PostgreSQLDBConnection.Instance() as db:
-                db.cursor.execute('select "METST_IDNAME", "METST_LOCATION" from "METEO_STATION";')
-                list_of_rows = db.cursor.fetchall()                
-            for k in list_of_rows:
-                print('\n',k)
-                longitude, latitude = eval(k[1])
-                print(longitude, latitude)
-                print(type(longitude))
-                timezone_name = tf.timezone_at(lng = longitude, lat = latitude)
-                if timezone_name is None:
-                    timezone_name = tf.closest_timezone_at(lng = longitude, 
-                                                           lat = latitude,
-                                                           delta_degree = 13,
-                                                           exact_computation=True, 
-                                                           #return_distances=True,
-                                                           force_evaluation=True) 
-                cet = pytz.timezone(timezone_name)
-                dt = datetime.now()
-                offset = cet.utcoffset(dt, is_dst = True)
-                shift = int(offset / timedelta(hours=1))
-                self.__meteo_station_tz[k[0]] = shift
-
-            print(self.__meteo_station_tz)
-            exit()
-            '''longitude, latitude = eval(row['Longitude']), eval(row['Latitude'])
-            point = (longitude, latitude)
-            timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude']))
-            if timezone_name is None:
-                timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']), 
-                                                       lat = eval(row['Latitude']),
-                                                       delta_degree = 5) 
-            cet = pytz.timezone(timezone_name)
-            dt = datetime.now()
-            offset = cet.utcoffset(dt, is_dst = True)
-            shift = int(offset / timedelta(hours=1))
-            
-            self.__meteo_station_tz'''
-        exit()
-        return date[:4]+'-'+date[4:6]+'-'+date[6:8]+' '+date[8:10]+':00:00+01'
-
-
-    def __insert_historical_data(self):
-        csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv'
-        with PostgreSQLDBConnection.Instance() as db:
-            db.cursor.execute('SELECT * from "METEO_FEATURE";')
-            list_of_rows = db.cursor.fetchall()
-            dico = {u[1]:u[0] for u in list_of_rows}
-
-        with open(csv_file, "r") as f:                
-            reader = DictReader(f, delimiter=',')
-            next(reader)
-            dico_features = {row["abbreviation"]:dico[row["METFT_NAME"]] for row in reader}
-
-        with PostgreSQLDBConnection.Instance() as db:
-            db.cursor.execute('SELECT * from "METEO_STATION";')
-            list_of_rows = db.cursor.fetchall()
-            dico_station = {u[2]:u[0] for u in list_of_rows}
-
-        for feature in dico_features:
-            logger.info(f'Integrating {[u for u in dico if dico[u]==dico_features[feature]][0]} feature')
-            for station in dico_station:
-                logger.info(f'  - Dealing with meteo station n°: {station}')                            
-                csv_file = tempfile.NamedTemporaryFile('w')
-                dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical'
-                for csv_meteo in listdir(dir_data):
-                    with open(dir_data / csv_meteo, "r") as f:                
-                        reader = DictReader(f, delimiter=';')
-                        csv_file.write(''.join([row[feature]+",'"+self.__from_date_to_datetz(row["date"], station, dico_station[station])+"',"+str(dico_features[feature])+','+str(dico_station[station])+'\n' for row in reader if row['numer_sta'] == station]))
-                csv_file.flush()
-                with open(csv_file.name, 'r') as f:
-                    with PostgreSQLDBConnection.Instance() as db:
-                        db.cursor.copy_from(f, '"METEO_FEATURE_VALUE"', sep=',', null='mq',
-                                            columns=['"METVA_VALUE"','"METVA_DATETIME"','"METFT_ID_METEO_FEATURE"','"METST_ID_METEO_STATION"'])
-
-                        
-
-
-        
-            
-    
-    def __generate(self):
-        # Meteo stations must be collected first, if not in the database
-        with PostgreSQLDBConnection.Instance() as db:
-            db.cursor.execute('SELECT count(*) FROM "METEO_STATION";')
-            updated_meteo_station = db.cursor.fetchone()[0]
-        if not updated_meteo_station:
-            self.__collect_stations()
-
-        # Features from data/meteo_france/config/meteo_features.csv
-        # must be inserted in the database, if not already done
-        with PostgreSQLDBConnection.Instance() as db:
-            db.cursor.execute('SELECT count(*) FROM "METEO_FEATURE";')
-            updated_meteo_features = db.cursor.fetchone()[0]
-        if not updated_meteo_features:
-            self.__insert_features()
-        
-        # Downloading meteofrance historical csv files
-        logger.info('Downloading historical csv files from MeteoFrance, if needed')            
-        self.__collect_historical_data()
-        
-        self.__insert_historical_data()
+
+
 
     def update(self):
         '''
 
     def update(self):
         '''
@@ -246,17 +164,58 @@ class MeteoFrance:
         '''
         # We collect archive files from MeteoFrance, until the current month
         # by using the same method than for data generation : this is currently
         '''
         # We collect archive files from MeteoFrance, until the current month
         # by using the same method than for data generation : this is currently
-        # based on the presence of a synop.+date+.csv' file in the 
-        # data/meteo_france/historical directory. The file corresponding to the 
-        # current month is deleted first, so that its most recent version will be downloaded
-        # by colling self.__collect_historical_data
-        # TODO: updates according to a PostgreSQL request ?
+        # based on the presence of a synop.+date+.csv' file in the
+        # data/meteo_france/historical directory. The file corresponding to the
+        # current month is deleted first, so that its most recent version will
+        # be downloaded by calling self._collect_historical_data
+
         logger.info('Update historical csv files from MeteoFrance, if needed')
         today = datetime.now()
         todel = 'synop.'+today.strftime("%Y%m")+".csv"
         logger.info('Update historical csv files from MeteoFrance, if needed')
         today = datetime.now()
         todel = 'synop.'+today.strftime("%Y%m")+".csv"
-        remove(self.__data_directory / 'historical' / todel)
+        try:
+            remove(self._data_directory / 'historical' / todel)
+        except:
+            logger.warning(f"{self._data_directory / 'historical' / todel} not found")
         system("touch "+todel)
         system("touch "+todel)
-        self.__collect_historical_data()
-        logger.info('Inserting csv files in database')
-        self.__insert_historical_data()
-        
+        self._collect_historical_data()
+
+
+
+    @property
+    def dated_features(self):
+        '''
+        If the attribute dated_features is None, then we create it: a dictionary
+        with datestamps as keys, and {features: values} as values.
+         - considered features are the ones from meteofrance_features.csv, found
+           in config/features/meteofrance directory
+         - only the closest meteo stations are considered
+
+        Returns:
+            dict: the dictionary of features per datestamp
+        '''
+        if self._dated_features == None:
+            csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance_features.csv'
+            logger.info(f'Collecting meteo feature information from {csv_file}')
+            # A dictionary for the features
+            with open(csv_file, "r") as f:
+                reader = DictReader(f, delimiter=',')
+                dico_features = {row["abbreviation"]:
+                                   {
+                                       'name': row['name'], # feature name
+                                       'type': row['type']  # qualitative (2) or quantitative (1)
+                                    }
+                                for row in reader if row['name'] in self._features}
+            dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical'
+            self._dated_features = {}
+            for csv_meteo in listdir(dir_data):
+                date = datetime.strptime(csv_meteo.split('.')[1], '%Y%m')
+                if date >= self._start and date <= self._end:
+                    logger.info(f'Inserting {csv_meteo} in intervention dictionary')
+                    with open(dir_data / csv_meteo, "r") as f:
+                        reader = DictReader(f, delimiter=';')
+                        for row in reader:
+                            if row['numer_sta'] in self._stations:
+                                date = datetime.strptime(row['date'], '%Y%m%d%H%M%S')
+                                self._dated_features.setdefault(date,{}).update({dico_features[feat]['name']+'_'+str(self._stations.index(row['numer_sta'])): eval(row[feat].replace('mq','None')) for feat in dico_features})
+        return self._dated_features
+