]> AND Private Git Repository - predictops.git/blob - predictops/source/meteofrance.py
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Improving csv -> dataframe module
[predictops.git] / predictops / source / meteofrance.py
1 from configparser import ConfigParser
2 from csv import DictReader
3 from datetime import datetime
4 from geopy.distance import vincenty
5 from logging import getLogger
6 from logging.config import fileConfig
7 from os import listdir, remove
8 from os.path import isfile, basename
9 from pathlib import Path
10 from shutil import rmtree
11 from urllib.request import urlretrieve
12
13 import gzip
14
15
16 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
17 logger = getLogger()
18
19 class MeteoFrance:
20
21     def __init__(self, latitude = 47.25, longitude = 6.0333, nb_stations = 3,
22                  start = datetime.strptime('19960101000000', '%Y%m%d%H%M%S'),
23                  end = datetime.now(),
24                  features = []):
25         '''
26         Constructor of the MeteoFrance source of feature.
27
28         - It will reinitiate the data directory, if asked in the config
29           features.cfg file.
30         - It searches for the nb_stations meteo stations closest to the provided
31           point (longitude and latitude)
32
33         For more information about this source of feature, see:
34     https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
35
36         Parameters:
37             latitude (float): The latitude from which we want the meteo features.
38             longitude (float): The longitude from which we want the meteo features.
39             nb_stations (int): Number of closest stations to consider.
40             features (list): Weather features that have to be integrated, according
41                   to their names in meteofrance_features.csv (cf. config directory)
42
43         '''
44         self._latitude = latitude
45         self._longitude = longitude
46         self._nb_stations = nb_stations
47         self._start = start
48         self._end = end
49         self._features = features
50
51         self._data_directory = (Path.cwd() / 'data') / 'features' / 'meteo_france'
52
53         self._dated_features = None
54
55         # Re-creating data directory architecture for MeteoFrance, if asked
56         config = ConfigParser()
57         config.read((Path.cwd() / 'config') / 'features.cfg')
58         if eval(config['meteofrance']['regenerate']):
59             self._regenerate_directory()
60
61         # Collecting the closest meteo station
62         self._stations = self._get_stations()
63
64
65
66     def _regenerate_directory(self):
67         '''
68         Re-creating data directory architecture for MeteoFrance
69         '''
70         logger.info("Regenerating meteofrance data directory")
71         try:
72             rmtree(self._data_directory)
73         except:
74             pass
75         p = Path(self._data_directory / 'historical')
76         p.mkdir(exist_ok=True, parents=True)
77         p = Path(self._data_directory / 'config')
78         p.mkdir(exist_ok=True, parents=True)
79
80
81
82     def _get_stations(self):
83         '''
84         Collect (after downloading them, if needed) the stations and their
85         locations in a dictionary
86
87         Returns:
88             list: The self._nb_stations closest station IDs, starting by the
89                   closest one
90         '''
91         # The csv file of meteo stations (names, ids and locations) if downloaded,
92         # if not available in the config directory within data / meteo_france
93         link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
94         p = Path(self._data_directory / 'config' )
95         csv_file = p / basename(link)
96         if not isfile(csv_file):
97             logger.info('Downloading location stations from MeteoFrance')
98             urlretrieve(link, csv_file)
99
100         # A dictionary for the meteo stations is created
101         self._dict_stations = {}
102         logger.info('Collecting information about meteo stations')
103         with open(csv_file, "r") as f:
104             reader = DictReader(f, delimiter=';')
105             for row in reader:
106                 latitude, longitude = eval(row['Latitude']), eval(row['Longitude'])
107                 self._dict_stations[row['Nom'].replace("'",'’')] = {
108                     'id' : row['ID'],
109                     'longitude' : longitude,
110                     'latitude' : latitude,
111                     'distance' : vincenty(
112                         (self._latitude, self._longitude),
113                         (latitude, longitude)).km
114                 }
115
116         # Find the closest stations
117         logger.info('Finding the closest stations')
118         stations_by_distance = sorted(self._dict_stations.keys(),
119                                       key = lambda x: self._dict_stations[x]['distance'])
120         logger.info(f'The {self._nb_stations} closest stations are: '
121                     f'{", ".join(stations_by_distance[:self._nb_stations])}.')
122         return [self._dict_stations[sta]['id'] for sta in stations_by_distance][:self._nb_stations]
123
124
125
126     def _collect_historical_data(self):
127         '''
128         We collect all csv files from January 1996 until the month
129         before now. The argument in the url to download are of the
130         form 201001 for January 2010. We start by computing all these
131         patterns, in historical list.
132         '''
133         # List of year-months to consider
134         historical = []
135         date_end = self._end
136         for year in range(self._start.year, date_end.year+1):
137             for month in range(1,13):
138                 date = datetime(year, month, 1)
139                 if date >= self._start and date <= date_end:
140                     historical.append(date.strftime("%Y%m"))
141
142         # We download all csv files from meteofrance that are not in
143         # the data repository
144         meteo_data = self._data_directory / 'historical'
145         p = Path(meteo_data)
146         p.mkdir(exist_ok=True, parents=True)
147         for date in historical:
148             if not isfile(meteo_data / ('synop.'+date+'.csv')):
149                 link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
150                 link += date + '.csv.gz'
151                 download_path = meteo_data / basename(link)
152                 urlretrieve(link, download_path)
153                 with gzip.open(download_path, 'rb') as f:
154                     csv_file = meteo_data / basename(link[:-3])
155                     with open(csv_file, 'w') as g:
156                         g.write(f.read().decode())
157                         remove(meteo_data / basename(link))
158
159
160
161     def update(self):
162         '''
163         Update the MeteoFrance features with the last available data
164         '''
165         # We collect archive files from MeteoFrance, until the current month
166         # by using the same method than for data generation : this is currently
167         # based on the presence of a synop.+date+.csv' file in the
168         # data/meteo_france/historical directory. The file corresponding to the
169         # current month is deleted first, so that its most recent version will
170         # be downloaded by calling self._collect_historical_data
171
172         logger.info('Update historical csv files from MeteoFrance, if needed')
173         today = datetime.now()
174         todel = 'synop.'+today.strftime("%Y%m")+".csv"
175         try:
176             remove(self._data_directory / 'historical' / todel)
177         except:
178             logger.warning(f"{self._data_directory / 'historical' / todel} not found")
179         self._collect_historical_data()
180
181
182
183     @property
184     def dated_features(self):
185         '''
186         If the attribute dated_features is None, then we create it: a dictionary
187         with datestamps as keys, and {features: values} as values.
188          - considered features are the ones from meteofrance_features.csv, found
189            in config/features/meteofrance directory
190          - only the closest meteo stations are considered
191
192         Returns:
193             dict: the dictionary of features per datestamp
194         '''
195         if self._dated_features == None:
196             csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance_features.csv'
197             logger.info(f'Collecting meteo feature information from {csv_file}')
198             # A dictionary for the features
199             with open(csv_file, "r") as f:
200                 reader = DictReader(f, delimiter=',')
201                 dico_features = {row["abbreviation"]:
202                                    {
203                                        'name': row['name'], # feature name
204                                        'type': row['type']  # qualitative (2) or quantitative (1)
205                                     }
206                                 for row in reader if row['name'] in self._features}
207             dir_data = Path.cwd() / 'data' / 'features' / 'meteo_france' / 'historical'
208             self._dated_features = {}
209             for csv_meteo in listdir(dir_data):
210                 date = datetime.strptime(csv_meteo.split('.')[1], '%Y%m')
211                 if (date >= self._start and date <= self._end)\
212                 or (date.year == self._start.year and date.month == self._start.month)\
213                 or (date.year == self._end.year and date.month == self._end.month):
214                     logger.info(f'Inserting {csv_meteo} in intervention dictionary')
215                     with open(dir_data / csv_meteo, "r") as f:
216                         reader = DictReader(f, delimiter=';')
217                         for row in reader:
218                             if row['numer_sta'] in self._stations:
219                                 date = datetime.strptime(row['date'], '%Y%m%d%H%M%S')
220                                 if date  >= self._start and date <= self._end:
221                                     self._dated_features.setdefault(date,{}).update({dico_features[feat]['name']+'_'+str(self._stations.index(row['numer_sta'])): eval(row[feat].replace('mq','None')) for feat in dico_features})
222         return self._dated_features
223