1 from .source import Source
3 from configparser import ConfigParser
4 from csv import DictReader
5 from datetime import datetime
6 from geopy.distance import vincenty
7 from logging import getLogger
8 from logging.config import fileConfig
9 from os import listdir, remove
10 from os.path import isfile, basename
11 from pathlib import Path
12 from shutil import rmtree
13 from urllib.request import urlretrieve
18 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
21 CSV_FILE = Path.cwd() / 'config' / 'features' / 'meteofrance_features.csv'
24 class MeteoFrance(Source):
33 def __init__(self, config_file):
35 Constructor of the MeteoFrance source of feature.
37 - It will reinitiate the data directory, if asked in the config
39 - It searches for the nb_stations meteo stations closest to the provided
40 point (longitude and latitude)
42 For more information about this source of feature, see:
43 https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
47 latitude (float): The latitude from which we want the meteo features.
48 longitude (float): The longitude from which we want the meteo features.
49 nb_stations (int): Number of closest stations to consider.
50 - provided to the constructor
51 features (list): Weather features that have to be integrated, according
52 to their names in meteofrance_features.csv (cf. config directory)
55 # Check for the integrity of feature names
58 self._config = ConfigParser()
59 self._config.read(config_file)
61 self._latitude = self._config['POSITION'].getfloat('latitude')
62 self._longitude = self._config['POSITION'].getfloat('longitude')
64 self._data_directory = (Path.cwd() / 'data') / 'features' / 'meteo_france'
66 self._dated_features = None
68 # Re-creating data directory architecture for MeteoFrance, if asked
69 if self._config['GENERAL'].getboolean('regenerate'):
70 self._regenerate_directory()
72 # Collecting the closest meteo station
73 self._nb_stations = self._config['STATIONS'].getint('nb_stations')
74 self._stations = self._get_stations()
76 # Collecting meteofrance features
77 with open(CSV_FILE, "r") as f:
78 reader = DictReader(f, delimiter=',')
79 self._features = [row['name'] for row in reader
80 if self._config['FEATURES'].getboolean(row['name'])]
103 return self._latitude
106 def latitude(self, x):
112 return self._longitude
115 def longitude(self, x):
120 def nb_stations(self):
121 return self._nb_stations
124 def nb_stations(self, x):
125 self._nb_stations = x
128 def _regenerate_directory(self):
130 Re-creating data directory architecture for MeteoFrance
132 logger.info("Regenerating meteofrance data directory")
134 rmtree(self._data_directory)
137 p = Path(self._data_directory / 'historical')
138 p.mkdir(exist_ok=True, parents=True)
139 p = Path(self._data_directory / 'config')
140 p.mkdir(exist_ok=True, parents=True)
144 def _get_stations(self):
146 Collect (after downloading them, if needed) the stations and their
147 locations in a dictionary
150 list: The self._nb_stations closest station IDs, starting by the
153 # The csv file of meteo stations (names, ids and locations) if downloaded,
154 # if not available in the config directory within data / meteo_france
155 link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
156 p = Path(self._data_directory / 'config' )
157 csv_file = p / basename(link)
158 if not isfile(csv_file):
159 logger.info('Downloading location stations from MeteoFrance')
160 urlretrieve(link, csv_file)
162 # A dictionary for the meteo stations is created
163 self._dict_stations = {}
164 logger.info('Collecting information about meteo stations')
165 with open(csv_file, "r") as f:
166 reader = DictReader(f, delimiter=';')
168 latitude, longitude = eval(row['Latitude']), eval(row['Longitude'])
169 self._dict_stations[row['Nom'].replace("'",'’')] = {
171 'longitude' : longitude,
172 'latitude' : latitude,
173 'distance' : vincenty(
174 (self._latitude, self._longitude),
175 (latitude, longitude)).km
178 # Find the closest stations
179 logger.info('Finding the closest stations')
180 stations_by_distance = sorted(self._dict_stations.keys(),
181 key = lambda x: self._dict_stations[x]['distance'])
182 logger.info(f'The {self._nb_stations} closest stations are: '
183 f'{", ".join(stations_by_distance[:self._nb_stations])}.')
184 return [self._dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
188 def _collect_historical_data(self):
190 We collect all csv files from January 1996 until the month
191 before now. The argument in the url to download are of the
192 form 201001 for January 2010. We start by computing all these
193 patterns, in historical list.
195 # List of year-months to consider
198 for year in range(self._start.year, date_end.year+1):
199 for month in range(1,13):
200 date = datetime(year, month, 1)
201 if date >= self._start and date <= date_end:
202 historical.append(date.strftime("%Y%m"))
204 # We download all csv files from meteofrance that are not in
205 # the data repository
206 meteo_data = self._data_directory / 'historical'
208 p.mkdir(exist_ok=True, parents=True)
209 for date in historical:
210 if not isfile(meteo_data / ('synop.'+date+'.csv')):
211 link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
212 link += date + '.csv.gz'
213 download_path = meteo_data / basename(link)
214 urlretrieve(link, download_path)
215 with gzip.open(download_path, 'rb') as f:
216 csv_file = meteo_data / basename(link[:-3])
217 with open(csv_file, 'w') as g:
218 g.write(f.read().decode())
219 remove(meteo_data / basename(link))
225 Update the MeteoFrance features with the last available data
227 # We collect archive files from MeteoFrance, until the current month
228 # by using the same method than for data generation : this is currently
229 # based on the presence of a synop.+date+.csv' file in the
230 # data/meteo_france/historical directory. The file corresponding to the
231 # current month is deleted first, so that its most recent version will
232 # be downloaded by calling self._collect_historical_data
234 logger.info('Update historical csv files from MeteoFrance, if needed')
235 today = datetime.now()
236 todel = 'synop.'+today.strftime("%Y%m")+".csv"
238 remove(self._data_directory / 'historical' / todel)
240 logger.warning(f"{self._data_directory / 'historical' / todel} not found")
241 self._collect_historical_data()
246 def dated_features(self):
248 If the attribute dated_features is None, then we create it: a dictionary
249 with datestamps as keys, and {features: values} as values.
250 - considered features are the ones from meteofrance_features.csv, found
251 in config/features/meteofrance directory
252 - only the closest meteo stations are considered
255 dict: the dictionary of features per datestamp
257 if self._dated_features == None:
258 logger.info(f'Collecting meteo feature information from {CSV_FILE}')
259 # A dictionary for the features
260 with open(CSV_FILE, "r") as f:
261 reader = DictReader(f, delimiter=',')
262 dico_features = {row["abbreviation"]:
264 'name': row['name'], # feature name
265 'type': row['type'] # qualitative (2) or quantitative (1)
267 for row in reader if row['name'] in self._features}
268 #print([row for row in reader])
269 #print([row for row in reader if row['name'] in self._features])
270 dir_data = Path.cwd() / 'data' / 'features' / 'meteo_france' / 'historical'
271 self._dated_features = {}
272 for csv_meteo in listdir(dir_data):
273 date = datetime.strptime(csv_meteo.split('.')[1], '%Y%m')
274 if (date >= self._start and date <= self._end)\
275 or (date.year == self._start.year and date.month == self._start.month)\
276 or (date.year == self._end.year and date.month == self._end.month):
277 logger.info(f'Inserting {csv_meteo} in intervention dictionary')
278 with open(dir_data / csv_meteo, "r") as f:
279 reader = DictReader(f, delimiter=';')
281 if row['numer_sta'] in self._stations:
282 date = datetime.strptime(row['date'], '%Y%m%d%H%M%S')
283 if date >= self._start and date <= self._end:
284 self._dated_features.setdefault(date,{}).update({dico_features[feat]['name']+'_'+str(self._stations.index(row['numer_sta'])): eval(row[feat].replace('mq','None')) for feat in dico_features})
285 return self._dated_features