]> AND Private Git Repository - predictops.git/blob - predictops/source/meteofrance.py
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
XGBoost integrated
[predictops.git] / predictops / source / meteofrance.py
1 from .source import Source
2
3 from configparser import ConfigParser
4 from csv import DictReader
5 from datetime import datetime
6 from geopy.distance import vincenty
7 from logging import getLogger
8 from logging.config import fileConfig
9 from os import listdir, remove
10 from os.path import isfile, basename
11 from pathlib import Path
12 from shutil import rmtree
13 from urllib.request import urlretrieve
14
15 import gzip
16
17
18 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
19 logger = getLogger()
20
21 CSV_FILE = Path.cwd() / 'config' / 'features' / 'meteofrance_features.csv'
22
23
24 class MeteoFrance(Source):
25
26     _latitude    = None
27     _longitude   = None
28     _nb_stations = None
29     _start       = None
30     _end         = None
31     _features    = None
32
33     def __init__(self, config_file):
34         '''
35         Constructor of the MeteoFrance source of feature.
36
37         - It will reinitiate the data directory, if asked in the config
38           features.cfg file.
39         - It searches for the nb_stations meteo stations closest to the provided
40           point (longitude and latitude)
41
42         For more information about this source of feature, see:
43     https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
44
45         Parameters:
46           - in config file:
47             latitude (float): The latitude from which we want the meteo features.
48             longitude (float): The longitude from which we want the meteo features.
49             nb_stations (int): Number of closest stations to consider.
50           - provided to the constructor
51             features (list): Weather features that have to be integrated, according
52                   to their names in meteofrance_features.csv (cf. config directory)
53
54         '''
55         # Check for the integrity of feature names
56         Source.__init__(self)
57
58         self._config = ConfigParser()
59         self._config.read(config_file)
60
61         self._latitude = self._config['POSITION'].getfloat('latitude')
62         self._longitude = self._config['POSITION'].getfloat('longitude')
63
64         self._data_directory = (Path.cwd() / 'data') / 'features' / 'meteo_france'
65
66         self._dated_features = None
67
68         # Re-creating data directory architecture for MeteoFrance, if asked
69         if self._config['GENERAL'].getboolean('regenerate'):
70             self._regenerate_directory()
71
72         # Collecting the closest meteo station
73         self._nb_stations = self._config['STATIONS'].getint('nb_stations')
74         self._stations = self._get_stations()
75
76         # Collecting meteofrance features
77         with open(CSV_FILE, "r") as f:
78             reader = DictReader(f, delimiter=',')
79             self._features = [row['name'] for row in reader
80                               if self._config['FEATURES'].getboolean(row['name'])]
81
82
83     @property
84     def start(self):
85         return self._start
86
87     @start.setter
88     def start(self, x):
89         self._start = x
90
91
92     @property
93     def end(self):
94         return self._end
95
96     @end.setter
97     def end(self, x):
98         self._end = x
99
100
101     @property
102     def latitude(self):
103         return self._latitude
104
105     @latitude.setter
106     def latitude(self, x):
107         self._latitude = x
108
109
110     @property
111     def longitude(self):
112         return self._longitude
113
114     @longitude.setter
115     def longitude(self, x):
116         self._longitude = x
117
118
119     @property
120     def nb_stations(self):
121         return self._nb_stations
122
123     @nb_stations.setter
124     def nb_stations(self, x):
125         self._nb_stations = x
126
127
128     def _regenerate_directory(self):
129         '''
130         Re-creating data directory architecture for MeteoFrance
131         '''
132         logger.info("Regenerating meteofrance data directory")
133         try:
134             rmtree(self._data_directory)
135         except:
136             pass
137         p = Path(self._data_directory / 'historical')
138         p.mkdir(exist_ok=True, parents=True)
139         p = Path(self._data_directory / 'config')
140         p.mkdir(exist_ok=True, parents=True)
141
142
143
144     def _get_stations(self):
145         '''
146         Collect (after downloading them, if needed) the stations and their
147         locations in a dictionary
148
149         Returns:
150             list: The self._nb_stations closest station IDs, starting by the
151                   closest one
152         '''
153         # The csv file of meteo stations (names, ids and locations) if downloaded,
154         # if not available in the config directory within data / meteo_france
155         link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
156         p = Path(self._data_directory / 'config' )
157         csv_file = p / basename(link)
158         if not isfile(csv_file):
159             logger.info('Downloading location stations from MeteoFrance')
160             urlretrieve(link, csv_file)
161
162         # A dictionary for the meteo stations is created
163         self._dict_stations = {}
164         logger.info('Collecting information about meteo stations')
165         with open(csv_file, "r") as f:
166             reader = DictReader(f, delimiter=';')
167             for row in reader:
168                 latitude, longitude = eval(row['Latitude']), eval(row['Longitude'])
169                 self._dict_stations[row['Nom'].replace("'",'’')] = {
170                     'id' : row['ID'],
171                     'longitude' : longitude,
172                     'latitude' : latitude,
173                     'distance' : vincenty(
174                         (self._latitude, self._longitude),
175                         (latitude, longitude)).km
176                 }
177
178         # Find the closest stations
179         logger.info('Finding the closest stations')
180         stations_by_distance = sorted(self._dict_stations.keys(),
181                                       key = lambda x: self._dict_stations[x]['distance'])
182         logger.info(f'The {self._nb_stations} closest stations are: '
183                     f'{", ".join(stations_by_distance[:self._nb_stations])}.')
184         return [self._dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
185
186
187
188     def _collect_historical_data(self):
189         '''
190         We collect all csv files from January 1996 until the month
191         before now. The argument in the url to download are of the
192         form 201001 for January 2010. We start by computing all these
193         patterns, in historical list.
194         '''
195         # List of year-months to consider
196         historical = []
197         date_end = self._end
198         for year in range(self._start.year, date_end.year+1):
199             for month in range(1,13):
200                 date = datetime(year, month, 1)
201                 if date >= self._start and date <= date_end:
202                     historical.append(date.strftime("%Y%m"))
203
204         # We download all csv files from meteofrance that are not in
205         # the data repository
206         meteo_data = self._data_directory / 'historical'
207         p = Path(meteo_data)
208         p.mkdir(exist_ok=True, parents=True)
209         for date in historical:
210             if not isfile(meteo_data / ('synop.'+date+'.csv')):
211                 link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
212                 link += date + '.csv.gz'
213                 download_path = meteo_data / basename(link)
214                 urlretrieve(link, download_path)
215                 with gzip.open(download_path, 'rb') as f:
216                     csv_file = meteo_data / basename(link[:-3])
217                     with open(csv_file, 'w') as g:
218                         g.write(f.read().decode())
219                         remove(meteo_data / basename(link))
220
221
222
223     def update(self):
224         '''
225         Update the MeteoFrance features with the last available data
226         '''
227         # We collect archive files from MeteoFrance, until the current month
228         # by using the same method than for data generation : this is currently
229         # based on the presence of a synop.+date+.csv' file in the
230         # data/meteo_france/historical directory. The file corresponding to the
231         # current month is deleted first, so that its most recent version will
232         # be downloaded by calling self._collect_historical_data
233
234         logger.info('Update historical csv files from MeteoFrance, if needed')
235         today = datetime.now()
236         todel = 'synop.'+today.strftime("%Y%m")+".csv"
237         try:
238             remove(self._data_directory / 'historical' / todel)
239         except:
240             logger.warning(f"{self._data_directory / 'historical' / todel} not found")
241         self._collect_historical_data()
242
243
244
245     @property
246     def dated_features(self):
247         '''
248         If the attribute dated_features is None, then we create it: a dictionary
249         with datestamps as keys, and {features: values} as values.
250          - considered features are the ones from meteofrance_features.csv, found
251            in config/features/meteofrance directory
252          - only the closest meteo stations are considered
253
254         Returns:
255             dict: the dictionary of features per datestamp
256         '''
257         if self._dated_features == None:
258             logger.info(f'Collecting meteo feature information from {CSV_FILE}')
259             # A dictionary for the features
260             with open(CSV_FILE, "r") as f:
261                 reader = DictReader(f, delimiter=',')
262                 dico_features = {row["abbreviation"]:
263                                    {
264                                        'name': row['name'], # feature name
265                                        'type': row['type']  # qualitative (2) or quantitative (1)
266                                     }
267                                 for row in reader if row['name'] in self._features}
268                 #print([row for row in reader])
269                 #print([row for row in reader if row['name'] in self._features])
270             dir_data = Path.cwd() / 'data' / 'features' / 'meteo_france' / 'historical'
271             self._dated_features = {}
272             for csv_meteo in listdir(dir_data):
273                 date = datetime.strptime(csv_meteo.split('.')[1], '%Y%m')
274                 if (date >= self._start and date <= self._end)\
275                 or (date.year == self._start.year and date.month == self._start.month)\
276                 or (date.year == self._end.year and date.month == self._end.month):
277                     logger.info(f'Inserting {csv_meteo} in intervention dictionary')
278                     with open(dir_data / csv_meteo, "r") as f:
279                         reader = DictReader(f, delimiter=';')
280                         for row in reader:
281                             if row['numer_sta'] in self._stations:
282                                 date = datetime.strptime(row['date'], '%Y%m%d%H%M%S')
283                                 if date  >= self._start and date <= self._end:
284                                     self._dated_features.setdefault(date,{}).update({dico_features[feat]['name']+'_'+str(self._stations.index(row['numer_sta'])): eval(row[feat].replace('mq','None')) for feat in dico_features})
285         return self._dated_features
286