]> AND Private Git Repository - predictops.git/blob - predictops/source/meteofrance.py
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
New version now drived by config files
[predictops.git] / predictops / source / meteofrance.py
1 from configparser import ConfigParser
2 from csv import DictReader
3 from datetime import datetime
4 from geopy.distance import vincenty
5 from logging import getLogger
6 from logging.config import fileConfig
7 from os import listdir, remove
8 from os.path import isfile, basename
9 from pathlib import Path
10 from shutil import rmtree
11 from urllib.request import urlretrieve
12
13 import gzip
14
15
16 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
17 logger = getLogger()
18
19 CSV_FILE = Path.cwd() / 'config' / 'features' / 'meteofrance_features.csv'
20
21
22 class MeteoFrance:
23
24     _latitude    = None
25     _longitude   = None
26     _nb_stations = None
27     _start       = None
28     _end         = None
29     _features    = None
30
31     def __init__(self, config_file):
32         '''
33         Constructor of the MeteoFrance source of feature.
34
35         - It will reinitiate the data directory, if asked in the config
36           features.cfg file.
37         - It searches for the nb_stations meteo stations closest to the provided
38           point (longitude and latitude)
39
40         For more information about this source of feature, see:
41     https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
42
43         Parameters:
44           - in config file:
45             latitude (float): The latitude from which we want the meteo features.
46             longitude (float): The longitude from which we want the meteo features.
47             nb_stations (int): Number of closest stations to consider.
48           - provided to the constructor
49             features (list): Weather features that have to be integrated, according
50                   to their names in meteofrance_features.csv (cf. config directory)
51
52         '''
53         self._config = ConfigParser()
54         self._config.read(config_file)
55
56         self._data_directory = (Path.cwd() / 'data') / 'features' / 'meteo_france'
57
58         self._dated_features = None
59
60         # Re-creating data directory architecture for MeteoFrance, if asked
61         if self._config['GENERAL'].getboolean('regenerate'):
62             self._regenerate_directory()
63
64         # Collecting the closest meteo station
65         self._nb_stations = self._config['STATIONS'].getint('nb_stations')
66         self._stations = self._get_stations()
67
68         # Collecting meteofrance features
69         with open(CSV_FILE, "r") as f:
70             reader = DictReader(f, delimiter=',')
71             self._features = [row['name'] for row in reader
72                               if self._config['FEATURES'].getboolean(row['name'])]
73
74
75     @property
76     def start(self):
77         return self._start
78
79     @start.setter
80     def start(self, x):
81         self._start = x
82
83
84     @property
85     def end(self):
86         return self._end
87
88     @end.setter
89     def end(self, x):
90         self._end = x
91
92
93     @property
94     def latitude(self):
95         return self._latitude
96
97     @latitude.setter
98     def latitude(self, x):
99         self._latitude = x
100
101
102     @property
103     def longitude(self):
104         return self._longitude
105
106     @longitude.setter
107     def longitude(self, x):
108         self._longitude = x
109
110
111     @property
112     def nb_stations(self):
113         return self._nb_stations
114
115     @nb_stations.setter
116     def nb_stations(self, x):
117         self._nb_stations = x
118
119
120     def _regenerate_directory(self):
121         '''
122         Re-creating data directory architecture for MeteoFrance
123         '''
124         logger.info("Regenerating meteofrance data directory")
125         try:
126             rmtree(self._data_directory)
127         except:
128             pass
129         p = Path(self._data_directory / 'historical')
130         p.mkdir(exist_ok=True, parents=True)
131         p = Path(self._data_directory / 'config')
132         p.mkdir(exist_ok=True, parents=True)
133
134
135
136     def _get_stations(self):
137         '''
138         Collect (after downloading them, if needed) the stations and their
139         locations in a dictionary
140
141         Returns:
142             list: The self._nb_stations closest station IDs, starting by the
143                   closest one
144         '''
145         # The csv file of meteo stations (names, ids and locations) if downloaded,
146         # if not available in the config directory within data / meteo_france
147         link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
148         p = Path(self._data_directory / 'config' )
149         csv_file = p / basename(link)
150         if not isfile(csv_file):
151             logger.info('Downloading location stations from MeteoFrance')
152             urlretrieve(link, csv_file)
153
154         # A dictionary for the meteo stations is created
155         self._dict_stations = {}
156         logger.info('Collecting information about meteo stations')
157         with open(csv_file, "r") as f:
158             reader = DictReader(f, delimiter=';')
159             for row in reader:
160                 latitude, longitude = eval(row['Latitude']), eval(row['Longitude'])
161                 self._dict_stations[row['Nom'].replace("'",'’')] = {
162                     'id' : row['ID'],
163                     'longitude' : longitude,
164                     'latitude' : latitude,
165                     'distance' : vincenty(
166                         (self._latitude, self._longitude),
167                         (latitude, longitude)).km
168                 }
169
170         # Find the closest stations
171         logger.info('Finding the closest stations')
172         stations_by_distance = sorted(self._dict_stations.keys(),
173                                       key = lambda x: self._dict_stations[x]['distance'])
174         logger.info(f'The {self._nb_stations} closest stations are: '
175                     f'{", ".join(stations_by_distance[:self._nb_stations])}.')
176         return [self._dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
177
178
179
180     def _collect_historical_data(self):
181         '''
182         We collect all csv files from January 1996 until the month
183         before now. The argument in the url to download are of the
184         form 201001 for January 2010. We start by computing all these
185         patterns, in historical list.
186         '''
187         # List of year-months to consider
188         historical = []
189         date_end = self._end
190         for year in range(self._start.year, date_end.year+1):
191             for month in range(1,13):
192                 date = datetime(year, month, 1)
193                 if date >= self._start and date <= date_end:
194                     historical.append(date.strftime("%Y%m"))
195
196         # We download all csv files from meteofrance that are not in
197         # the data repository
198         meteo_data = self._data_directory / 'historical'
199         p = Path(meteo_data)
200         p.mkdir(exist_ok=True, parents=True)
201         for date in historical:
202             if not isfile(meteo_data / ('synop.'+date+'.csv')):
203                 link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
204                 link += date + '.csv.gz'
205                 download_path = meteo_data / basename(link)
206                 urlretrieve(link, download_path)
207                 with gzip.open(download_path, 'rb') as f:
208                     csv_file = meteo_data / basename(link[:-3])
209                     with open(csv_file, 'w') as g:
210                         g.write(f.read().decode())
211                         remove(meteo_data / basename(link))
212
213
214
215     def update(self):
216         '''
217         Update the MeteoFrance features with the last available data
218         '''
219         # We collect archive files from MeteoFrance, until the current month
220         # by using the same method than for data generation : this is currently
221         # based on the presence of a synop.+date+.csv' file in the
222         # data/meteo_france/historical directory. The file corresponding to the
223         # current month is deleted first, so that its most recent version will
224         # be downloaded by calling self._collect_historical_data
225
226         logger.info('Update historical csv files from MeteoFrance, if needed')
227         today = datetime.now()
228         todel = 'synop.'+today.strftime("%Y%m")+".csv"
229         try:
230             remove(self._data_directory / 'historical' / todel)
231         except:
232             logger.warning(f"{self._data_directory / 'historical' / todel} not found")
233         self._collect_historical_data()
234
235
236
237     @property
238     def dated_features(self):
239         '''
240         If the attribute dated_features is None, then we create it: a dictionary
241         with datestamps as keys, and {features: values} as values.
242          - considered features are the ones from meteofrance_features.csv, found
243            in config/features/meteofrance directory
244          - only the closest meteo stations are considered
245
246         Returns:
247             dict: the dictionary of features per datestamp
248         '''
249         if self._dated_features == None:
250             logger.info(f'Collecting meteo feature information from {CSV_FILE}')
251             # A dictionary for the features
252             with open(CSV_FILE, "r") as f:
253                 reader = DictReader(f, delimiter=',')
254                 dico_features = {row["abbreviation"]:
255                                    {
256                                        'name': row['name'], # feature name
257                                        'type': row['type']  # qualitative (2) or quantitative (1)
258                                     }
259                                 for row in reader if row['name'] in self._features}
260                 #print([row for row in reader])
261                 #print([row for row in reader if row['name'] in self._features])
262             dir_data = Path.cwd() / 'data' / 'features' / 'meteo_france' / 'historical'
263             self._dated_features = {}
264             for csv_meteo in listdir(dir_data):
265                 date = datetime.strptime(csv_meteo.split('.')[1], '%Y%m')
266                 if (date >= self._start and date <= self._end)\
267                 or (date.year == self._start.year and date.month == self._start.month)\
268                 or (date.year == self._end.year and date.month == self._end.month):
269                     logger.info(f'Inserting {csv_meteo} in intervention dictionary')
270                     with open(dir_data / csv_meteo, "r") as f:
271                         reader = DictReader(f, delimiter=';')
272                         for row in reader:
273                             if row['numer_sta'] in self._stations:
274                                 date = datetime.strptime(row['date'], '%Y%m%d%H%M%S')
275                                 if date  >= self._start and date <= self._end:
276                                     self._dated_features.setdefault(date,{}).update({dico_features[feat]['name']+'_'+str(self._stations.index(row['numer_sta'])): eval(row[feat].replace('mq','None')) for feat in dico_features})
277         return self._dated_features
278