1 from configparser import ConfigParser
2 from csv import DictReader
3 from datetime import datetime
4 from geopy.distance import vincenty
5 from logging import getLogger
6 from logging.config import fileConfig
7 from os import listdir, remove
8 from os.path import isfile, basename
9 from pathlib import Path
10 from shutil import rmtree
11 from urllib.request import urlretrieve
16 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
21 def __init__(self, latitude = 47.25, longitude = 6.0333, nb_stations = 3,
22 start = datetime.strptime('19960101000000', '%Y%m%d%H%M%S'),
26 Constructor of the MeteoFrance source of feature.
28 - It will reinitiate the data directory, if asked in the config
30 - It searches for the nb_stations meteo stations closest to the provided
31 point (longitude and latitude)
33 For more information about this source of feature, see:
34 https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
37 latitude (float): The latitude from which we want the meteo features.
38 longitude (float): The longitude from which we want the meteo features.
39 nb_stations (int): Number of closest stations to consider.
40 features (list): Weather features that have to be integrated, according
41 to their names in meteofrance_features.csv (cf. config directory)
44 self._latitude = latitude
45 self._longitude = longitude
46 self._nb_stations = nb_stations
49 self._features = features
51 self._data_directory = (Path.cwd() / 'data') / 'features' / 'meteo_france'
53 self._dated_features = None
55 # Re-creating data directory architecture for MeteoFrance, if asked
56 config = ConfigParser()
57 config.read((Path.cwd() / 'config') / 'features.cfg')
58 if eval(config['meteofrance']['regenerate']):
59 self._regenerate_directory()
61 # Collecting the closest meteo station
62 self._stations = self._get_stations()
66 def _regenerate_directory(self):
68 Re-creating data directory architecture for MeteoFrance
70 logger.info("Regenerating meteofrance data directory")
72 rmtree(self._data_directory)
75 p = Path(self._data_directory / 'historical')
76 p.mkdir(exist_ok=True, parents=True)
77 p = Path(self._data_directory / 'config')
78 p.mkdir(exist_ok=True, parents=True)
82 def _get_stations(self):
84 Collect (after downloading them, if needed) the stations and their
85 locations in a dictionary
88 list: The self._nb_stations closest station IDs, starting by the
91 # The csv file of meteo stations (names, ids and locations) if downloaded,
92 # if not available in the config directory within data / meteo_france
93 link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
94 p = Path(self._data_directory / 'config' )
95 csv_file = p / basename(link)
96 if not isfile(csv_file):
97 logger.info('Downloading location stations from MeteoFrance')
98 urlretrieve(link, csv_file)
100 # A dictionary for the meteo stations is created
101 self._dict_stations = {}
102 logger.info('Collecting information about meteo stations')
103 with open(csv_file, "r") as f:
104 reader = DictReader(f, delimiter=';')
106 latitude, longitude = eval(row['Latitude']), eval(row['Longitude'])
107 self._dict_stations[row['Nom'].replace("'",'’')] = {
109 'longitude' : longitude,
110 'latitude' : latitude,
111 'distance' : vincenty(
112 (self._latitude, self._longitude),
113 (latitude, longitude)).km
116 # Find the closest stations
117 logger.info('Finding the closest stations')
118 stations_by_distance = sorted(self._dict_stations.keys(),
119 key = lambda x: self._dict_stations[x]['distance'])
120 logger.info(f'The {self._nb_stations} closest stations are: '
121 f'{", ".join(stations_by_distance[:self._nb_stations])}.')
122 return [self._dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
126 def _collect_historical_data(self):
128 We collect all csv files from January 1996 until the month
129 before now. The argument in the url to download are of the
130 form 201001 for January 2010. We start by computing all these
131 patterns, in historical list.
133 # List of year-months to consider
136 for year in range(self._start.year, date_end.year+1):
137 for month in range(1,13):
138 date = datetime(year, month, 1)
139 if date >= self._start and date <= date_end:
140 historical.append(date.strftime("%Y%m"))
142 # We download all csv files from meteofrance that are not in
143 # the data repository
144 meteo_data = self._data_directory / 'historical'
146 p.mkdir(exist_ok=True, parents=True)
147 for date in historical:
148 if not isfile(meteo_data / ('synop.'+date+'.csv')):
149 link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
150 link += date + '.csv.gz'
151 download_path = meteo_data / basename(link)
152 urlretrieve(link, download_path)
153 with gzip.open(download_path, 'rb') as f:
154 csv_file = meteo_data / basename(link[:-3])
155 with open(csv_file, 'w') as g:
156 g.write(f.read().decode())
157 remove(meteo_data / basename(link))
163 Update the MeteoFrance features with the last available data
165 # We collect archive files from MeteoFrance, until the current month
166 # by using the same method than for data generation : this is currently
167 # based on the presence of a synop.+date+.csv' file in the
168 # data/meteo_france/historical directory. The file corresponding to the
169 # current month is deleted first, so that its most recent version will
170 # be downloaded by calling self._collect_historical_data
172 logger.info('Update historical csv files from MeteoFrance, if needed')
173 today = datetime.now()
174 todel = 'synop.'+today.strftime("%Y%m")+".csv"
176 remove(self._data_directory / 'historical' / todel)
178 logger.warning(f"{self._data_directory / 'historical' / todel} not found")
179 self._collect_historical_data()
184 def dated_features(self):
186 If the attribute dated_features is None, then we create it: a dictionary
187 with datestamps as keys, and {features: values} as values.
188 - considered features are the ones from meteofrance_features.csv, found
189 in config/features/meteofrance directory
190 - only the closest meteo stations are considered
193 dict: the dictionary of features per datestamp
195 if self._dated_features == None:
196 csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance_features.csv'
197 logger.info(f'Collecting meteo feature information from {csv_file}')
198 # A dictionary for the features
199 with open(csv_file, "r") as f:
200 reader = DictReader(f, delimiter=',')
201 dico_features = {row["abbreviation"]:
203 'name': row['name'], # feature name
204 'type': row['type'] # qualitative (2) or quantitative (1)
206 for row in reader if row['name'] in self._features}
207 dir_data = Path.cwd() / 'data' / 'features' / 'meteo_france' / 'historical'
208 self._dated_features = {}
209 for csv_meteo in listdir(dir_data):
210 date = datetime.strptime(csv_meteo.split('.')[1], '%Y%m')
211 if date >= self._start and date <= self._end:
212 logger.info(f'Inserting {csv_meteo} in intervention dictionary')
213 with open(dir_data / csv_meteo, "r") as f:
214 reader = DictReader(f, delimiter=';')
216 if row['numer_sta'] in self._stations:
217 date = datetime.strptime(row['date'], '%Y%m%d%H%M%S')
218 self._dated_features.setdefault(date,{}).update({dico_features[feat]['name']+'_'+str(self._stations.index(row['numer_sta'])): eval(row[feat].replace('mq','None')) for feat in dico_features})
219 return self._dated_features