]> AND Private Git Repository - predictops.git/blob - predictops/source/meteofrance.py
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Integrating historical features
[predictops.git] / predictops / source / meteofrance.py
1 from .source import Source
2
3 from configparser import ConfigParser
4 from csv import DictReader
5 from datetime import datetime
6 from geopy.distance import vincenty
7 from logging import getLogger
8 from logging.config import fileConfig
9 from os import listdir, remove
10 from os.path import isfile, basename
11 from pathlib import Path
12 from shutil import rmtree
13 from urllib.request import urlretrieve
14
15 import gzip
16
17
18 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
19 logger = getLogger()
20
21 CSV_FILE = Path.cwd() / 'config' / 'features' / 'meteofrance_features.csv'
22
23
24 class MeteoFrance(Source):
25
26     _latitude    = None
27     _longitude   = None
28     _nb_stations = None
29     _start       = None
30     _end         = None
31     _features    = None
32
33     def __init__(self, config_file):
34         '''
35         Constructor of the MeteoFrance source of feature.
36
37         - It will reinitiate the data directory, if asked in the config
38           features.cfg file.
39         - It searches for the nb_stations meteo stations closest to the provided
40           point (longitude and latitude)
41
42         For more information about this source of feature, see:
43     https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
44
45         Parameters:
46           - in config file:
47             latitude (float): The latitude from which we want the meteo features.
48             longitude (float): The longitude from which we want the meteo features.
49             nb_stations (int): Number of closest stations to consider.
50           - provided to the constructor
51             features (list): Weather features that have to be integrated, according
52                   to their names in meteofrance_features.csv (cf. config directory)
53
54         '''
55         # Check for the integrity of feature names
56         Source.__init__(self)
57
58         self._config = ConfigParser()
59         self._config.read(config_file)
60
61         self._data_directory = (Path.cwd() / 'data') / 'features' / 'meteo_france'
62
63         self._dated_features = None
64
65         # Re-creating data directory architecture for MeteoFrance, if asked
66         if self._config['GENERAL'].getboolean('regenerate'):
67             self._regenerate_directory()
68
69         # Collecting the closest meteo station
70         self._nb_stations = self._config['STATIONS'].getint('nb_stations')
71         self._stations = self._get_stations()
72
73         # Collecting meteofrance features
74         with open(CSV_FILE, "r") as f:
75             reader = DictReader(f, delimiter=',')
76             self._features = [row['name'] for row in reader
77                               if self._config['FEATURES'].getboolean(row['name'])]
78
79
80     @property
81     def start(self):
82         return self._start
83
84     @start.setter
85     def start(self, x):
86         self._start = x
87
88
89     @property
90     def end(self):
91         return self._end
92
93     @end.setter
94     def end(self, x):
95         self._end = x
96
97
98     @property
99     def latitude(self):
100         return self._latitude
101
102     @latitude.setter
103     def latitude(self, x):
104         self._latitude = x
105
106
107     @property
108     def longitude(self):
109         return self._longitude
110
111     @longitude.setter
112     def longitude(self, x):
113         self._longitude = x
114
115
116     @property
117     def nb_stations(self):
118         return self._nb_stations
119
120     @nb_stations.setter
121     def nb_stations(self, x):
122         self._nb_stations = x
123
124
125     def _regenerate_directory(self):
126         '''
127         Re-creating data directory architecture for MeteoFrance
128         '''
129         logger.info("Regenerating meteofrance data directory")
130         try:
131             rmtree(self._data_directory)
132         except:
133             pass
134         p = Path(self._data_directory / 'historical')
135         p.mkdir(exist_ok=True, parents=True)
136         p = Path(self._data_directory / 'config')
137         p.mkdir(exist_ok=True, parents=True)
138
139
140
141     def _get_stations(self):
142         '''
143         Collect (after downloading them, if needed) the stations and their
144         locations in a dictionary
145
146         Returns:
147             list: The self._nb_stations closest station IDs, starting by the
148                   closest one
149         '''
150         # The csv file of meteo stations (names, ids and locations) if downloaded,
151         # if not available in the config directory within data / meteo_france
152         link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
153         p = Path(self._data_directory / 'config' )
154         csv_file = p / basename(link)
155         if not isfile(csv_file):
156             logger.info('Downloading location stations from MeteoFrance')
157             urlretrieve(link, csv_file)
158
159         # A dictionary for the meteo stations is created
160         self._dict_stations = {}
161         logger.info('Collecting information about meteo stations')
162         with open(csv_file, "r") as f:
163             reader = DictReader(f, delimiter=';')
164             for row in reader:
165                 latitude, longitude = eval(row['Latitude']), eval(row['Longitude'])
166                 self._dict_stations[row['Nom'].replace("'",'’')] = {
167                     'id' : row['ID'],
168                     'longitude' : longitude,
169                     'latitude' : latitude,
170                     'distance' : vincenty(
171                         (self._latitude, self._longitude),
172                         (latitude, longitude)).km
173                 }
174
175         # Find the closest stations
176         logger.info('Finding the closest stations')
177         stations_by_distance = sorted(self._dict_stations.keys(),
178                                       key = lambda x: self._dict_stations[x]['distance'])
179         logger.info(f'The {self._nb_stations} closest stations are: '
180                     f'{", ".join(stations_by_distance[:self._nb_stations])}.')
181         return [self._dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
182
183
184
185     def _collect_historical_data(self):
186         '''
187         We collect all csv files from January 1996 until the month
188         before now. The argument in the url to download are of the
189         form 201001 for January 2010. We start by computing all these
190         patterns, in historical list.
191         '''
192         # List of year-months to consider
193         historical = []
194         date_end = self._end
195         for year in range(self._start.year, date_end.year+1):
196             for month in range(1,13):
197                 date = datetime(year, month, 1)
198                 if date >= self._start and date <= date_end:
199                     historical.append(date.strftime("%Y%m"))
200
201         # We download all csv files from meteofrance that are not in
202         # the data repository
203         meteo_data = self._data_directory / 'historical'
204         p = Path(meteo_data)
205         p.mkdir(exist_ok=True, parents=True)
206         for date in historical:
207             if not isfile(meteo_data / ('synop.'+date+'.csv')):
208                 link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
209                 link += date + '.csv.gz'
210                 download_path = meteo_data / basename(link)
211                 urlretrieve(link, download_path)
212                 with gzip.open(download_path, 'rb') as f:
213                     csv_file = meteo_data / basename(link[:-3])
214                     with open(csv_file, 'w') as g:
215                         g.write(f.read().decode())
216                         remove(meteo_data / basename(link))
217
218
219
220     def update(self):
221         '''
222         Update the MeteoFrance features with the last available data
223         '''
224         # We collect archive files from MeteoFrance, until the current month
225         # by using the same method than for data generation : this is currently
226         # based on the presence of a synop.+date+.csv' file in the
227         # data/meteo_france/historical directory. The file corresponding to the
228         # current month is deleted first, so that its most recent version will
229         # be downloaded by calling self._collect_historical_data
230
231         logger.info('Update historical csv files from MeteoFrance, if needed')
232         today = datetime.now()
233         todel = 'synop.'+today.strftime("%Y%m")+".csv"
234         try:
235             remove(self._data_directory / 'historical' / todel)
236         except:
237             logger.warning(f"{self._data_directory / 'historical' / todel} not found")
238         self._collect_historical_data()
239
240
241
242     @property
243     def dated_features(self):
244         '''
245         If the attribute dated_features is None, then we create it: a dictionary
246         with datestamps as keys, and {features: values} as values.
247          - considered features are the ones from meteofrance_features.csv, found
248            in config/features/meteofrance directory
249          - only the closest meteo stations are considered
250
251         Returns:
252             dict: the dictionary of features per datestamp
253         '''
254         if self._dated_features == None:
255             logger.info(f'Collecting meteo feature information from {CSV_FILE}')
256             # A dictionary for the features
257             with open(CSV_FILE, "r") as f:
258                 reader = DictReader(f, delimiter=',')
259                 dico_features = {row["abbreviation"]:
260                                    {
261                                        'name': row['name'], # feature name
262                                        'type': row['type']  # qualitative (2) or quantitative (1)
263                                     }
264                                 for row in reader if row['name'] in self._features}
265                 #print([row for row in reader])
266                 #print([row for row in reader if row['name'] in self._features])
267             dir_data = Path.cwd() / 'data' / 'features' / 'meteo_france' / 'historical'
268             self._dated_features = {}
269             for csv_meteo in listdir(dir_data):
270                 date = datetime.strptime(csv_meteo.split('.')[1], '%Y%m')
271                 if (date >= self._start and date <= self._end)\
272                 or (date.year == self._start.year and date.month == self._start.month)\
273                 or (date.year == self._end.year and date.month == self._end.month):
274                     logger.info(f'Inserting {csv_meteo} in intervention dictionary')
275                     with open(dir_data / csv_meteo, "r") as f:
276                         reader = DictReader(f, delimiter=';')
277                         for row in reader:
278                             if row['numer_sta'] in self._stations:
279                                 date = datetime.strptime(row['date'], '%Y%m%d%H%M%S')
280                                 if date  >= self._start and date <= self._end:
281                                     self._dated_features.setdefault(date,{}).update({dico_features[feat]['name']+'_'+str(self._stations.index(row['numer_sta'])): eval(row[feat].replace('mq','None')) for feat in dico_features})
282         return self._dated_features
283