]> AND Private Git Repository - predictops.git/blob - lib/source/meteofrance.py
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
meteofrance module now operational, with the production of a dictionary
[predictops.git] / lib / source / meteofrance.py
1 from configparser import ConfigParser
2 from csv import DictReader
3 from datetime import datetime
4 from geopy.distance import vincenty
5 from logging import getLogger
6 from logging.config import fileConfig
7 from os import listdir, remove, system
8 from os.path import isfile, basename
9 from pathlib import Path
10 from shutil import rmtree
11 from urllib.request import urlretrieve
12
13 import gzip
14
15 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
16 logger = getLogger()
17
18 class MeteoFrance:
19
20     def __init__(self, latitude = 47.25, longitude = 6.0333, nb_stations = 3):
21         '''
22         Constructor of the MeteoFrance source of feature.
23
24         - It will reinitiate the data directory, if asked in the config
25           features.cfg file.
26         - It searches for the nb_stations meteo stations closest to the provided
27           point (longitude and latitude)
28
29         For more information about this source of feature, see:
30     https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
31
32         Parameters:
33             latitude (float): The latitude from which we want the meteo features.
34             longitude (float): The longitude from which we want the meteo features.
35             nb_stations (int): Number of closest stations to consider.
36
37         '''
38         self._latitude = latitude
39         self._longitude = longitude
40         self._nb_stations = nb_stations
41
42         self._data_directory = (Path.cwd() / 'data') / 'meteo_france'
43
44         self._dated_features = None
45
46         # Re-creating data directory architecture for MeteoFrance, if asked
47         config = ConfigParser()
48         config.read((Path.cwd() / 'config') / 'features.cfg')
49         if eval(config['meteofrance']['regenerate']):
50             self._regenerate_directory()
51
52         # Collecting the closest meteo station
53         self._stations = self._get_stations()
54
55
56
57     def _regenerate_directory(self):
58         '''
59         Re-creating data directory architecture for MeteoFrance
60         '''
61         logger.info("Regenerating meteofrance data directory")
62         try:
63             rmtree(self._data_directory)
64         except:
65             pass
66         p = Path(self._data_directory / 'historical')
67         p.mkdir(exist_ok=True, parents=True)
68         p = Path(self._data_directory / 'config')
69         p.mkdir(exist_ok=True, parents=True)
70
71
72
73     def _get_stations(self):
74         '''
75         Collect (after downloading them, if needed) the stations and their
76         locations in a dictionary
77
78         Returns:
79             list: The self._nb_stations closest station IDs, starting by the
80                   closest one
81         '''
82         # The csv file of meteo stations (names, ids and locations) if downloaded,
83         # if not available in the config directory within data / meteo_france
84         link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
85         p = Path(self._data_directory / 'config' )
86         csv_file = p / basename(link)
87         if not isfile(csv_file):
88             logger.info('Downloading location stations from MeteoFrance')
89             urlretrieve(link, csv_file)
90
91         # A dictionary for the meteo stations is created
92         self._dict_stations = {}
93         logger.info('Collecting information about meteo stations')
94         with open(csv_file, "r") as f:
95             reader = DictReader(f, delimiter=';')
96             for row in reader:
97                 latitude, longitude = eval(row['Latitude']), eval(row['Longitude'])
98                 self._dict_stations[row['Nom'].replace("'",'’')] = {
99                     'id' : row['ID'],
100                     'longitude' : longitude,
101                     'latitude' : latitude,
102                     'distance' : vincenty(
103                         (self._latitude, self._longitude),
104                         (latitude, longitude)).km
105                 }
106
107         # Find the closest stations
108         logger.info('Finding the closest stations')
109         stations_by_distance = sorted(self._dict_stations.keys(),
110                                       key = lambda x: self._dict_stations[x]['distance'])
111         logger.info(f'The {self._nb_stations} closest stations are: '
112                     f'{", ".join(stations_by_distance[:self._nb_stations])}.')
113         return [self._dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
114
115
116
117     def _collect_historical_data(self):
118         '''
119         We collect all csv files from January 1996 until the month
120         before now. The argument in the url to download are of the
121         form 201001 for January 2010. We start by computing all these
122         patterns, in historical list.
123         '''
124         # List of year-months to consider
125         historical = []
126         date_end = datetime.now()
127         for year in range(1996, date_end.year+1):
128             for month in range(1,13):
129                 date = datetime(year, month, 1)
130                 if date <= date_end:
131                     historical.append(date.strftime("%Y%m"))
132
133         # We download all csv files from meteofrance that are not in
134         # the data repository
135         meteo_data = self._data_directory / 'historical'
136         p = Path(meteo_data)
137         p.mkdir(exist_ok=True, parents=True)
138         for date in historical:
139             if not isfile(meteo_data / ('synop.'+date+'.csv')):
140                 link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
141                 link += date + '.csv.gz'
142                 download_path = meteo_data / basename(link)
143                 urlretrieve(link, download_path)
144                 with gzip.open(download_path, 'rb') as f:
145                     csv_file = meteo_data / basename(link[:-3])
146                     with open(csv_file, 'w') as g:
147                         g.write(f.read().decode())
148                         remove(meteo_data / basename(link))
149
150
151
152     def update(self):
153         '''
154         Update the MeteoFrance features with the last available data
155         '''
156         # We collect archive files from MeteoFrance, until the current month
157         # by using the same method than for data generation : this is currently
158         # based on the presence of a synop.+date+.csv' file in the
159         # data/meteo_france/historical directory. The file corresponding to the
160         # current month is deleted first, so that its most recent version will
161         # be downloaded by calling self._collect_historical_data
162
163         logger.info('Update historical csv files from MeteoFrance, if needed')
164         today = datetime.now()
165         todel = 'synop.'+today.strftime("%Y%m")+".csv"
166         try:
167             remove(self._data_directory / 'historical' / todel)
168         except:
169             logger.warning(f"{self._data_directory / 'historical' / todel} not found")
170         system("touch "+todel)
171         self._collect_historical_data()
172
173
174
175     @property
176     def dated_features(self):
177         '''
178         '''
179         if self._dated_features == None:
180             csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv'
181             logger.info(f'Collecting meteo feature information from {csv_file}')
182             # A dictionary for the features
183             with open(csv_file, "r") as f:
184                 reader = DictReader(f, delimiter=',')
185                 next(reader)
186                 dico_features = {row["abbreviation"]:
187                                    {
188                                        'name': row['name'], # feature name
189                                        'type': row['type']  # qualitative (2) or quantitative (1)
190                                     }
191                                 for row in reader}
192
193             dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical'
194             self._dated_features = {}
195             for csv_meteo in listdir(dir_data):
196                 logger.info(f'Inserting {csv_meteo} in intervention dictionary')
197                 with open(dir_data / csv_meteo, "r") as f:
198                     reader = DictReader(f, delimiter=';')
199                     for row in reader:
200                         if row['numer_sta'] in self._stations:
201                             self._dated_features.setdefault(row['date'],{}).update({dico_features[feat]['name']+'_'+str(self._stations.index(row['numer_sta'])): eval(row[feat].replace('mq','None')) for feat in dico_features})
202         return self._dated_features
203