]> AND Private Git Repository - predictops.git/blob - predictops/source/meteofrance.py
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
lightgbm is now working
[predictops.git] / predictops / source / meteofrance.py
1 from configparser import ConfigParser
2 from csv import DictReader
3 from datetime import datetime
4 from geopy.distance import vincenty
5 from logging import getLogger
6 from logging.config import fileConfig
7 from os import listdir, remove
8 from os.path import isfile, basename
9 from pathlib import Path
10 from shutil import rmtree
11 from urllib.request import urlretrieve
12
13 import gzip
14
15
16 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
17 logger = getLogger()
18
19
20 class MeteoFrance:
21
22     _latitude    = None
23     _longitude   = None
24     _nb_stations = None
25     _start       = None
26     _end         = None
27     _features    = None
28
29     def __init__(self, config_file):
30         '''
31         Constructor of the MeteoFrance source of feature.
32         '''
33         self._config = ConfigParser()
34         self._config.read(config_file)
35
36         self._latitude = self._config['POSITION'].getfloat('latitude')
37         self._longitude = self._config['POSITION'].getfloat('longitude')
38
39         self._data_directory = (Path.cwd() / 'data') / 'features' / 'meteo_france'
40
41         self._dated_features = None
42
43         # Re-creating data directory architecture for MeteoFrance, if asked
44         if self._config['GENERAL'].getboolean('regenerate'):
45             self._regenerate_directory()
46
47         # Collecting the closest meteo station
48         self._nb_stations = self._config['STATIONS'].getint('nb_stations')
49         self._stations = self._get_stations()
50
51         # Collecting meteofrance features
52         self._features = [section for section in self._config
53                           if self._config.has_option(section, 'numerical')
54                           and (self._config[section]['numerical'] or
55                                self._config[section]['categorical'])]
56
57
58
59     @property
60     def start(self):
61         return self._start
62
63     @start.setter
64     def start(self, x):
65         self._start = x
66
67
68     @property
69     def end(self):
70         return self._end
71
72     @end.setter
73     def end(self, x):
74         self._end = x
75
76
77     @property
78     def latitude(self):
79         return self._latitude
80
81     @latitude.setter
82     def latitude(self, x):
83         self._latitude = x
84
85
86     @property
87     def longitude(self):
88         return self._longitude
89
90     @longitude.setter
91     def longitude(self, x):
92         self._longitude = x
93
94
95     @property
96     def nb_stations(self):
97         return self._nb_stations
98
99     @nb_stations.setter
100     def nb_stations(self, x):
101         self._nb_stations = x
102
103
104     def _regenerate_directory(self):
105         '''
106         Re-creating data directory architecture for MeteoFrance
107         '''
108         logger.info("Regenerating meteofrance data directory")
109         try:
110             rmtree(self._data_directory)
111         except:
112             pass
113         p = Path(self._data_directory / 'historical')
114         p.mkdir(exist_ok=True, parents=True)
115         p = Path(self._data_directory / 'config')
116         p.mkdir(exist_ok=True, parents=True)
117
118
119
120     def _get_stations(self):
121         '''
122         Collect (after downloading them, if needed) the stations and their
123         locations in a dictionary
124
125         Returns:
126             list: The self._nb_stations closest station IDs, starting by the
127                   closest one
128         '''
129         # The csv file of meteo stations (names, ids and locations) if downloaded,
130         # if not available in the config directory within data / meteo_france
131         link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
132         p = Path(self._data_directory / 'config' )
133         csv_file = p / basename(link)
134         if not isfile(csv_file):
135             logger.info('Downloading location stations from MeteoFrance')
136             urlretrieve(link, csv_file)
137
138         # A dictionary for the meteo stations is created
139         self._dict_stations = {}
140         logger.info('Collecting information about meteo stations')
141         with open(csv_file, "r") as f:
142             reader = DictReader(f, delimiter=';')
143             for row in reader:
144                 latitude, longitude = eval(row['Latitude']), eval(row['Longitude'])
145                 self._dict_stations[row['Nom'].replace("'",'’')] = {
146                     'id' : row['ID'],
147                     'longitude' : longitude,
148                     'latitude' : latitude,
149                     'distance' : vincenty(
150                         (self._latitude, self._longitude),
151                         (latitude, longitude)).km
152                 }
153
154         # Find the closest stations
155         logger.info('Finding the closest stations')
156         stations_by_distance = sorted(self._dict_stations.keys(),
157                                       key = lambda x: self._dict_stations[x]['distance'])
158         logger.info(f'The {self._nb_stations} closest stations are: '
159                     f'{", ".join(stations_by_distance[:self._nb_stations])}.')
160         return [self._dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
161
162
163
164     def _collect_historical_data(self):
165         '''
166         We collect all csv files from January 1996 until the month
167         before now. The argument in the url to download are of the
168         form 201001 for January 2010. We start by computing all these
169         patterns, in historical list.
170         '''
171         # List of year-months to consider
172         historical = []
173         date_end = self._end
174         for year in range(self._start.year, date_end.year+1):
175             for month in range(1,13):
176                 date = datetime(year, month, 1)
177                 if date >= self._start and date <= date_end:
178                     historical.append(date.strftime("%Y%m"))
179
180         # We download all csv files from meteofrance that are not in
181         # the data repository
182         meteo_data = self._data_directory / 'historical'
183         p = Path(meteo_data)
184         p.mkdir(exist_ok=True, parents=True)
185         for date in historical:
186             if not isfile(meteo_data / ('synop.'+date+'.csv')):
187                 link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
188                 link += date + '.csv.gz'
189                 download_path = meteo_data / basename(link)
190                 urlretrieve(link, download_path)
191                 with gzip.open(download_path, 'rb') as f:
192                     csv_file = meteo_data / basename(link[:-3])
193                     with open(csv_file, 'w') as g:
194                         g.write(f.read().decode())
195                         remove(meteo_data / basename(link))
196
197
198
199     def update(self):
200         '''
201         Update the MeteoFrance features with the last available data
202         '''
203         # We collect archive files from MeteoFrance, until the current month
204         # by using the same method than for data generation : this is currently
205         # based on the presence of a synop.+date+.csv' file in the
206         # data/meteo_france/historical directory. The file corresponding to the
207         # current month is deleted first, so that its most recent version will
208         # be downloaded by calling self._collect_historical_data
209
210         logger.info('Update historical csv files from MeteoFrance, if needed')
211         today = datetime.now()
212         todel = 'synop.'+today.strftime("%Y%m")+".csv"
213         try:
214             remove(self._data_directory / 'historical' / todel)
215         except:
216             logger.warning(f"{self._data_directory / 'historical' / todel} not found")
217         self._collect_historical_data()
218
219
220
221     @property
222     def dated_features(self):
223         '''
224         If the attribute dated_features is None, then we create it: a dictionary
225         with datestamps as keys, and {features: values} as values.
226          - considered features are the ones from meteofrance_features.csv, found
227            in config/features/meteofrance directory
228          - only the closest meteo stations are considered
229
230         Returns:
231             dict: the dictionary of features per datestamp
232         '''
233         if self._dated_features == None:
234             logger.info('Collecting meteofrance feature information')
235             # A dictionary for the features
236             dico_features = {self._config[section]["abbreviation"]:
237                                {
238                                    'name': section, # feature name
239                                    'numerical': self._config[section]['numerical'],
240                                    'categorical': self._config[section]['categorical']
241                                 }
242                             for section in self._features}
243             dir_data = Path.cwd() / 'data' / 'features' / 'meteo_france' / 'historical'
244             self._dated_features = {}
245             for csv_meteo in sorted(listdir(dir_data)):
246                 date = datetime.strptime(csv_meteo.split('.')[1], '%Y%m')
247                 if (date >= self._start and date <= self._end)\
248                 or (date.year == self._start.year and date.month == self._start.month)\
249                 or (date.year == self._end.year and date.month == self._end.month):
250                     logger.info(f'Adding meteofrance features from {csv_meteo}')
251                     with open(dir_data / csv_meteo, "r") as f:
252                         reader = DictReader(f, delimiter=';')
253                         for row in reader:
254                             if row['numer_sta'] in self._stations:
255                                 date = datetime.strptime(row['date'], '%Y%m%d%H%M%S')
256                                 if date  >= self._start and date <= self._end:
257                                     self._dated_features.setdefault(date,{}).update({dico_features[feat]['name']+'_'+str(self._stations.index(row['numer_sta'])): eval(row[feat].replace('mq','None')) for feat in dico_features})
258         return self._dated_features
259