]> AND Private Git Repository - predictops.git/blobdiff - lib/source/meteofrance.py
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Adding calendar features
[predictops.git] / lib / source / meteofrance.py
index 315aac3edef79e2178ad344ba52eea345f54d768..c524089e6362d0ec48c6d6aab57596b9a95887bf 100644 (file)
@@ -1,22 +1,27 @@
 from configparser import ConfigParser
 from csv import DictReader
 from configparser import ConfigParser
 from csv import DictReader
-from geopy import distance
-from pathlib import Path
-from shutil import rmtree
-
+from datetime import datetime
+from geopy.distance import vincenty
+from logging import getLogger
 from logging.config import fileConfig
 from logging.config import fileConfig
+from os import listdir, remove, system
 from os.path import isfile, basename
 from os.path import isfile, basename
+from pathlib import Path
+from shutil import rmtree
 from urllib.request import urlretrieve
 
 from urllib.request import urlretrieve
 
-import logging
+import gzip
 
 
 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
 
 
 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
-logger = logging.getLogger()
+logger = getLogger()
 
 class MeteoFrance:
 
 
 class MeteoFrance:
 
-    def __init__(self, latitude = 47.25, longitude = 6.0333, nb_stations = 3):
+    def __init__(self, latitude = 47.25, longitude = 6.0333, nb_stations = 3,
+                 start = datetime.strptime('19960101000000', '%Y%m%d%H%M%S'),
+                 end = datetime.now(),
+                 features = []):
         '''
         Constructor of the MeteoFrance source of feature.
 
         '''
         Constructor of the MeteoFrance source of feature.
 
@@ -32,14 +37,21 @@ class MeteoFrance:
             latitude (float): The latitude from which we want the meteo features.
             longitude (float): The longitude from which we want the meteo features.
             nb_stations (int): Number of closest stations to consider.
             latitude (float): The latitude from which we want the meteo features.
             longitude (float): The longitude from which we want the meteo features.
             nb_stations (int): Number of closest stations to consider.
+            features (list): Weather features that have to be integrated, according
+                  to their names in meteofrance_features.csv (cf. config directory)
 
         '''
         self._latitude = latitude
         self._longitude = longitude
         self._nb_stations = nb_stations
 
         '''
         self._latitude = latitude
         self._longitude = longitude
         self._nb_stations = nb_stations
+        self._start = start
+        self._end = end
+        self._features = features
 
         self._data_directory = (Path.cwd() / 'data') / 'meteo_france'
 
 
         self._data_directory = (Path.cwd() / 'data') / 'meteo_france'
 
+        self._dated_features = None
+
         # Re-creating data directory architecture for MeteoFrance, if asked
         config = ConfigParser()
         config.read((Path.cwd() / 'config') / 'features.cfg')
         # Re-creating data directory architecture for MeteoFrance, if asked
         config = ConfigParser()
         config.read((Path.cwd() / 'config') / 'features.cfg')
@@ -48,7 +60,6 @@ class MeteoFrance:
 
         # Collecting the closest meteo station
         self._stations = self._get_stations()
 
         # Collecting the closest meteo station
         self._stations = self._get_stations()
-        print(self._stations)
 
 
 
 
 
 
@@ -87,33 +98,124 @@ class MeteoFrance:
             urlretrieve(link, csv_file)
 
         # A dictionary for the meteo stations is created
             urlretrieve(link, csv_file)
 
         # A dictionary for the meteo stations is created
-        dict_stations = {}
+        self._dict_stations = {}
         logger.info('Collecting information about meteo stations')
         with open(csv_file, "r") as f:
             reader = DictReader(f, delimiter=';')
             for row in reader:
                 latitude, longitude = eval(row['Latitude']), eval(row['Longitude'])
         logger.info('Collecting information about meteo stations')
         with open(csv_file, "r") as f:
             reader = DictReader(f, delimiter=';')
             for row in reader:
                 latitude, longitude = eval(row['Latitude']), eval(row['Longitude'])
-                dict_stations[row['Nom'].replace("'",'’')] = {
+                self._dict_stations[row['Nom'].replace("'",'’')] = {
                     'id' : row['ID'],
                     'longitude' : longitude,
                     'latitude' : latitude,
                     'id' : row['ID'],
                     'longitude' : longitude,
                     'latitude' : latitude,
-                    'distance' : distance.vincenty(
+                    'distance' : vincenty(
                         (self._latitude, self._longitude),
                         (latitude, longitude)).km
                 }
 
         # Find the closest stations
         logger.info('Finding the closest stations')
                         (self._latitude, self._longitude),
                         (latitude, longitude)).km
                 }
 
         # Find the closest stations
         logger.info('Finding the closest stations')
-        stations_by_distance = sorted(dict_stations.keys(),
-                                      key = lambda x: dict_stations[x]['distance'])
+        stations_by_distance = sorted(self._dict_stations.keys(),
+                                      key = lambda x: self._dict_stations[x]['distance'])
         logger.info(f'The {self._nb_stations} closest stations are: '
                     f'{", ".join(stations_by_distance[:self._nb_stations])}.')
         logger.info(f'The {self._nb_stations} closest stations are: '
                     f'{", ".join(stations_by_distance[:self._nb_stations])}.')
-        return [dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
+        return [self._dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
+
+
+
+    def _collect_historical_data(self):
+        '''
+        We collect all csv files from January 1996 until the month
+        before now. The argument in the url to download are of the
+        form 201001 for January 2010. We start by computing all these
+        patterns, in historical list.
+        '''
+        # List of year-months to consider
+        historical = []
+        date_end = self._end
+        for year in range(self._start.year, date_end.year+1):
+            for month in range(1,13):
+                date = datetime(year, month, 1)
+                if date >= self._start and date <= date_end:
+                    historical.append(date.strftime("%Y%m"))
+
+        # We download all csv files from meteofrance that are not in
+        # the data repository
+        meteo_data = self._data_directory / 'historical'
+        p = Path(meteo_data)
+        p.mkdir(exist_ok=True, parents=True)
+        for date in historical:
+            if not isfile(meteo_data / ('synop.'+date+'.csv')):
+                link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
+                link += date + '.csv.gz'
+                download_path = meteo_data / basename(link)
+                urlretrieve(link, download_path)
+                with gzip.open(download_path, 'rb') as f:
+                    csv_file = meteo_data / basename(link[:-3])
+                    with open(csv_file, 'w') as g:
+                        g.write(f.read().decode())
+                        remove(meteo_data / basename(link))
 
 
 
 
 
 
-    def _get_feature(self):
+    def update(self):
         '''
         '''
-        TODO
+        Update the MeteoFrance features with the last available data
         '''
         '''
-        pass
+        # We collect archive files from MeteoFrance, until the current month
+        # by using the same method than for data generation : this is currently
+        # based on the presence of a synop.+date+.csv' file in the
+        # data/meteo_france/historical directory. The file corresponding to the
+        # current month is deleted first, so that its most recent version will
+        # be downloaded by calling self._collect_historical_data
+
+        logger.info('Update historical csv files from MeteoFrance, if needed')
+        today = datetime.now()
+        todel = 'synop.'+today.strftime("%Y%m")+".csv"
+        try:
+            remove(self._data_directory / 'historical' / todel)
+        except:
+            logger.warning(f"{self._data_directory / 'historical' / todel} not found")
+        system("touch "+todel)
+        self._collect_historical_data()
+
+
+
+    @property
+    def dated_features(self):
+        '''
+        If the attribute dated_features is None, then we create it: a dictionary
+        with datestamps as keys, and {features: values} as values.
+         - considered features are the ones from meteofrance_features.csv, found
+           in config/features/meteofrance directory
+         - only the closest meteo stations are considered
+
+        Returns:
+            dict: the dictionary of features per datestamp
+        '''
+        if self._dated_features == None:
+            csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance_features.csv'
+            logger.info(f'Collecting meteo feature information from {csv_file}')
+            # A dictionary for the features
+            with open(csv_file, "r") as f:
+                reader = DictReader(f, delimiter=',')
+                dico_features = {row["abbreviation"]:
+                                   {
+                                       'name': row['name'], # feature name
+                                       'type': row['type']  # qualitative (2) or quantitative (1)
+                                    }
+                                for row in reader if row['name'] in self._features}
+            dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical'
+            self._dated_features = {}
+            for csv_meteo in listdir(dir_data):
+                date = datetime.strptime(csv_meteo.split('.')[1], '%Y%m')
+                if date >= self._start and date <= self._end:
+                    logger.info(f'Inserting {csv_meteo} in intervention dictionary')
+                    with open(dir_data / csv_meteo, "r") as f:
+                        reader = DictReader(f, delimiter=';')
+                        for row in reader:
+                            if row['numer_sta'] in self._stations:
+                                date = datetime.strptime(row['date'], '%Y%m%d%H%M%S')
+                                self._dated_features.setdefault(date,{}).update({dico_features[feat]['name']+'_'+str(self._stations.index(row['numer_sta'])): eval(row[feat].replace('mq','None')) for feat in dico_features})
+        return self._dated_features
+