]> AND Private Git Repository - predictops.git/blob - predictops/source/meteofrance.py
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Refactoring, fin du lever/coucher de soleil, et début de sentinelles
[predictops.git] / predictops / source / meteofrance.py
1 from configparser import ConfigParser
2 from csv import DictReader
3 from datetime import datetime
4 from geopy.distance import vincenty
5 from logging import getLogger
6 from logging.config import fileConfig
7 from os import listdir, remove
8 from os.path import isfile, basename
9 from pathlib import Path
10 from shutil import rmtree
11 from urllib.request import urlretrieve
12
13 import gzip
14
15
16 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
17 logger = getLogger()
18
19
20 class MeteoFrance:
21
22     _latitude = None
23     _longitude = None
24     _nb_stations = None
25     _start = None
26     _end = None
27     _features = None
28
29     def __init__(self, config_file):
30         '''
31         Constructor of the MeteoFrance source of feature.
32         '''
33         self._config = ConfigParser()
34         self._config.read(config_file)
35
36         self._latitude = self._config['POSITION'].getfloat('latitude')
37         self._longitude = self._config['POSITION'].getfloat('longitude')
38
39         self._data_directory = (Path.cwd() / 'data') / 'features' / 'meteo_france'
40
41         self._dated_features = None
42
43         # Re-creating data directory architecture for MeteoFrance, if asked
44         if self._config['GENERAL'].getboolean('regenerate'):
45             self._regenerate_directory()
46
47         # Collecting the closest meteo station
48         self._nb_stations = self._config['STATIONS'].getint('nb_stations')
49         self._stations = self._get_stations()
50
51         # Collecting meteofrance features
52         self._features = [section for section in self._config
53                           if self._config.has_option(section, 'numerical')
54                           and (self._config[section]['binary'] or
55                                self._config[section]['categorical'] or
56                                self._config[section]['numerical'])]
57
58     @property
59     def start(self):
60         return self._start
61
62     @start.setter
63     def start(self, x):
64         self._start = x
65
66     @property
67     def end(self):
68         return self._end
69
70     @end.setter
71     def end(self, x):
72         self._end = x
73
74     @property
75     def latitude(self):
76         return self._latitude
77
78     @latitude.setter
79     def latitude(self, x):
80         self._latitude = x
81
82     @property
83     def longitude(self):
84         return self._longitude
85
86     @longitude.setter
87     def longitude(self, x):
88         self._longitude = x
89
90     @property
91     def nb_stations(self):
92         return self._nb_stations
93
94     @nb_stations.setter
95     def nb_stations(self, x):
96         self._nb_stations = x
97
98     def _regenerate_directory(self):
99         '''
100         Re-creating data directory architecture for MeteoFrance
101         '''
102         logger.info("Regenerating meteofrance data directory")
103         try:
104             rmtree(self._data_directory)
105         except:
106             pass
107         p = Path(self._data_directory / 'historical')
108         p.mkdir(exist_ok=True, parents=True)
109         p = Path(self._data_directory / 'config')
110         p.mkdir(exist_ok=True, parents=True)
111
112     def _get_stations(self):
113         '''
114         Collect (after downloading them, if needed) the stations and their
115         locations in a dictionary
116
117         Returns:
118             list: The self._nb_stations closest station IDs, starting by the
119                   closest one
120         '''
121         # The csv file of meteo stations (names, ids and locations) if downloaded,
122         # if not available in the config directory within data / meteo_france
123         link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
124         p = Path(self._data_directory / 'config')
125         csv_file = p / basename(link)
126         if not isfile(csv_file):
127             logger.info('Downloading location stations from MeteoFrance')
128             urlretrieve(link, csv_file)
129
130         # A dictionary for the meteo stations is created
131         self._dict_stations = {}
132         logger.info('Collecting information about meteo stations')
133         with open(csv_file, "r") as f:
134             reader = DictReader(f, delimiter=';')
135             for row in reader:
136                 latitude, longitude = eval(row['Latitude']), eval(row['Longitude'])
137                 self._dict_stations[row['Nom'].replace("'", '’')] = {
138                     'id': row['ID'],
139                     'longitude': longitude,
140                     'latitude': latitude,
141                     'distance': vincenty(
142                         (self._latitude, self._longitude),
143                         (latitude, longitude)).km
144                 }
145
146         # Find the closest stations
147         logger.info('Finding the closest stations')
148         stations_by_distance = sorted(self._dict_stations.keys(),
149                                       key=lambda x: self._dict_stations[x]['distance'])
150         logger.info(f'The {self._nb_stations} closest stations are: '
151                     f'{", ".join(stations_by_distance[:self._nb_stations])}.')
152         return [self._dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
153
154     def _collect_historical_data(self):
155         '''
156         We collect all csv files from January 1996 until the month
157         before now. The argument in the url to download are of the
158         form 201001 for January 2010. We start by computing all these
159         patterns, in historical list.
160         '''
161         # List of year-months to consider
162         historical = []
163         date_end = self._end
164         for year in range(self._start.year, date_end.year + 1):
165             for month in range(1, 13):
166                 date = datetime(year, month, 1)
167                 if date >= self._start and date <= date_end:
168                     historical.append(date.strftime("%Y%m"))
169
170         # We download all csv files from meteofrance that are not in
171         # the data repository
172         meteo_data = self._data_directory / 'historical'
173         p = Path(meteo_data)
174         p.mkdir(exist_ok=True, parents=True)
175         for date in historical:
176             if not isfile(meteo_data / ('synop.' + date + '.csv')):
177                 link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
178                 link += date + '.csv.gz'
179                 download_path = meteo_data / basename(link)
180                 urlretrieve(link, download_path)
181                 with gzip.open(download_path, 'rb') as f:
182                     csv_file = meteo_data / basename(link[:-3])
183                     with open(csv_file, 'w') as g:
184                         g.write(f.read().decode())
185                         remove(meteo_data / basename(link))
186
187     def update(self):
188         '''
189         Update the MeteoFrance features with the last available data
190         '''
191         # We collect archive files from MeteoFrance, until the current month
192         # by using the same method than for data generation : this is currently
193         # based on the presence of a synop.+date+.csv' file in the
194         # data/meteo_france/historical directory. The file corresponding to the
195         # current month is deleted first, so that its most recent version will
196         # be downloaded by calling self._collect_historical_data
197
198         logger.info('Update historical csv files from MeteoFrance, if needed')
199         today = datetime.now()
200         todel = 'synop.' + today.strftime("%Y%m") + ".csv"
201         try:
202             remove(self._data_directory / 'historical' / todel)
203         except:
204             logger.warning(f"{self._data_directory / 'historical' / todel} not found")
205         self._collect_historical_data()
206
207     @property
208     def dated_features(self):
209         '''
210         If the attribute dated_features is None, then we create it: a dictionary
211         with datestamps as keys, and {features: values} as values.
212          - considered features are the ones from meteofrance_features.csv, found
213            in config/features/meteofrance directory
214          - only the closest meteo stations are considered
215
216         Returns:
217             dict: the dictionary of features per datestamp
218         '''
219         if self._dated_features == None:
220             logger.info('Collecting meteofrance feature information')
221             # A dictionary for the features
222             dico_features = {self._config[section]["abbreviation"]:
223                              {
224                 'name': section,  # feature name
225                 'numerical': self._config[section]['numerical'],
226                 'categorical': self._config[section]['categorical']
227             }
228                 for section in self._features}
229             dir_data = Path.cwd() / 'data' / 'features' / 'meteo_france' / 'historical'
230             self._dated_features = {}
231
232             for csv_meteo in sorted(listdir(dir_data)):
233                 date = datetime.strptime(csv_meteo.split('.')[1], '%Y%m')
234                 if (date >= self._start and date <= self._end)\
235                         or (date.year == self._start.year and date.month == self._start.month)\
236                         or (date.year == self._end.year and date.month == self._end.month):
237                     logger.info(f'Adding meteofrance features from {csv_meteo}')
238                     with open(dir_data / csv_meteo, "r") as f:
239                         reader = DictReader(f, delimiter=';')
240                         for row in reader:
241                             if row['numer_sta'] in self._stations:
242                                 date = datetime.strptime(row['date'], '%Y%m%d%H%M%S')
243                                 if date >= self._start and date <= self._end:
244                                     self._dated_features.setdefault(date, {}).update({dico_features[feat]['name'] + '_' + str(self._stations.index(row['numer_sta'])): eval(row[feat].replace('mq', 'None')) for feat in dico_features})
245         return self._dated_features
246