]> AND Private Git Repository - predictops.git/blob - lib/source/meteofrance.py
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
79c2546596e6a8e6f35a8e1ee6af40351d32116a
[predictops.git] / lib / source / meteofrance.py
1 from extomeAI.lib.connector import PostgreSQLDBConnection
2
3 from configparser import ConfigParser
4 from csv import DictReader
5 from datetime import datetime, timedelta
6 from os import remove, system, listdir
7 from pathlib import Path
8 from shutil import rmtree
9 from timezonefinder import TimezoneFinder
10
11 from logging.config import fileConfig
12 from os.path import isfile, basename
13 from urllib.request import urlretrieve
14
15 import logging
16 import gzip
17 import pytz
18 import tempfile
19
20 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
21 logger = logging.getLogger()
22
23 class MeteoFrance:
24     def __init__(self):
25         '''
26         Constructor
27         See: https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
28         '''    
29         self.__data_directory = (Path.cwd() / 'data') / 'meteo_france'
30         # Re-creating data directory architecture for MeteoFrance, if asked
31         config = ConfigParser()
32         config.read((Path.cwd() / 'config') / 'features.cfg')
33         if eval(config['meteofrance']['regenerate']):
34             logger.info("Regenerating meteofrance data directory")
35             try:
36                 rmtree(self.__data_directory)
37             except:
38                 pass
39             p = Path(self.__data_directory  / 'historical')
40             p.mkdir(exist_ok=True, parents=True)
41             p = Path(self.__data_directory  / 'config')
42             p.mkdir(exist_ok=True, parents=True)
43         if eval(config['meteofrance']['reinsert']):
44             logger.info("Reinserting meteofrance database")
45             with PostgreSQLDBConnection.Instance() as db:
46                 db.cursor.execute(f'DELETE FROM "METEO_FEATURE";')
47                 db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_METFT_ID_seq" RESTART WITH 1;')
48                 db.cursor.execute(f'DELETE FROM "METEO_STATION";')
49                 db.cursor.execute(f'ALTER SEQUENCE "METEO_STATION_METST_ID_seq" RESTART WITH 1;')
50                 db.cursor.execute(f'DELETE FROM "METEO_FEATURE_VALUE";')
51                 db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_VALUE_METVA_ID_seq" RESTART WITH 1;')
52             self.__generate()
53         
54
55     def __collect_stations(self):
56         '''
57         Filling METEO_STATION table from location schema
58         '''
59         tf = TimezoneFinder(in_memory=True)
60         
61         with PostgreSQLDBConnection.Instance() as db:
62             link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
63             p = Path(self.__data_directory / 'config' )
64             csv_file = p / basename(link)
65             if not isfile(csv_file):
66                 logger.info('Downloading location stations from MeteoFrance')                    
67                 urlretrieve(link, csv_file)
68             with open(csv_file, "r") as f:                
69                 reader = DictReader(f, delimiter=';')
70                 logger.info(f'Inserting location stations in {db.dbname} database')                    
71                 for row in reader:
72                     longitude, latitude = eval(row['Longitude']), eval(row['Latitude'])
73                     point = (longitude, latitude)
74                     timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude']))
75                     if timezone_name is None:
76                         timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']), 
77                                                                lat = eval(row['Latitude']),
78                                                                delta_degree = 5) 
79                     cet = pytz.timezone(timezone_name)
80                     dt = datetime.now()
81                     offset = cet.utcoffset(dt, is_dst = True)
82                     shift = int(offset / timedelta(hours=1))
83                     request = f"""INSERT INTO "METEO_STATION" ("METST_NAME", "METST_IDNAME", "METST_LOCATION", "METST_TIMEZONE") 
84                                   VALUES ('{row['Nom'].replace("'",'’')}', '{row['ID']}',
85                                   point({row['Latitude']}, {row['Longitude']}), {shift});"""
86                     db.cursor.execute(request)
87
88
89     def __insert_features(self):
90         logger.info('Inserting MeteoFrance list of features from meteo_features.csv')            
91         csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv'
92         with PostgreSQLDBConnection.Instance() as db:
93             with open(csv_file, "r") as f:                
94                 reader = DictReader(f, delimiter=',')
95                 next(reader)
96                 for row in reader:
97                     request = f"""INSERT INTO "METEO_FEATURE" ("METFT_NAME", "PARAM_ID_PARAMETER")
98                                   VALUES ('{row['METFT_NAME']}', {row['PARAM_ID_PARAMETER']});"""
99                     db.cursor.execute(request)
100         
101         
102             
103     def __collect_historical_data(self):
104         '''
105         We collect all csv files from January 1996 until the month
106         before now. The argument in the url to download are of the
107         form 201001 for January 2010. We start by computing all these
108         patterns, in historical list.
109         '''
110         # List of year-months to consider
111         historical = []
112         date_end = datetime.now()
113         for year in range(1996, date_end.year+1):
114             for month in range(1,13):
115                 date = datetime(year, month, 1)
116                 if date <= date_end:
117                     historical.append(date.strftime("%Y%m"))
118                     
119         # We download all csv files from meteofrance that are not in 
120         # the data repository
121         meteo_data = self.__data_directory / 'historical' 
122         p = Path(meteo_data)
123         p.mkdir(exist_ok=True, parents=True)                
124         for date in historical:
125             if not isfile(meteo_data / ('synop.'+date+'.csv')):
126                 link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
127                 link += date + '.csv.gz' 
128                 download_path = meteo_data / basename(link)
129                 urlretrieve(link, download_path)
130                 with gzip.open(download_path, 'rb') as f:
131                     csv_file = meteo_data / basename(link[:-3])
132                     with open(csv_file, 'w') as g:
133                         g.write(f.read().decode())
134                         remove(meteo_data / basename(link))
135     
136     
137     def __from_date_to_datetz(self, date, a, b):          
138         if not hasattr(self, '__meteo_station_tz'):
139             self.__meteo_station_tz = {}
140             tf = TimezoneFinder(in_memory=True)
141             with PostgreSQLDBConnection.Instance() as db:
142                 db.cursor.execute('select "METST_IDNAME", "METST_LOCATION" from "METEO_STATION";')
143                 list_of_rows = db.cursor.fetchall()                
144             for k in list_of_rows:
145                 print('\n',k)
146                 longitude, latitude = eval(k[1])
147                 print(longitude, latitude)
148                 print(type(longitude))
149                 timezone_name = tf.timezone_at(lng = longitude, lat = latitude)
150                 if timezone_name is None:
151                     timezone_name = tf.closest_timezone_at(lng = longitude, 
152                                                            lat = latitude,
153                                                            delta_degree = 13,
154                                                            exact_computation=True, 
155                                                            #return_distances=True,
156                                                            force_evaluation=True) 
157                 cet = pytz.timezone(timezone_name)
158                 dt = datetime.now()
159                 offset = cet.utcoffset(dt, is_dst = True)
160                 shift = int(offset / timedelta(hours=1))
161                 self.__meteo_station_tz[k[0]] = shift
162
163             print(self.__meteo_station_tz)
164             exit()
165             '''longitude, latitude = eval(row['Longitude']), eval(row['Latitude'])
166             point = (longitude, latitude)
167             timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude']))
168             if timezone_name is None:
169                 timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']), 
170                                                        lat = eval(row['Latitude']),
171                                                        delta_degree = 5) 
172             cet = pytz.timezone(timezone_name)
173             dt = datetime.now()
174             offset = cet.utcoffset(dt, is_dst = True)
175             shift = int(offset / timedelta(hours=1))
176             
177             self.__meteo_station_tz'''
178         exit()
179         return date[:4]+'-'+date[4:6]+'-'+date[6:8]+' '+date[8:10]+':00:00+01'
180
181
182     def __insert_historical_data(self):
183         csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv'
184         with PostgreSQLDBConnection.Instance() as db:
185             db.cursor.execute('SELECT * from "METEO_FEATURE";')
186             list_of_rows = db.cursor.fetchall()
187             dico = {u[1]:u[0] for u in list_of_rows}
188
189         with open(csv_file, "r") as f:                
190             reader = DictReader(f, delimiter=',')
191             next(reader)
192             dico_features = {row["abbreviation"]:dico[row["METFT_NAME"]] for row in reader}
193
194         with PostgreSQLDBConnection.Instance() as db:
195             db.cursor.execute('SELECT * from "METEO_STATION";')
196             list_of_rows = db.cursor.fetchall()
197             dico_station = {u[2]:u[0] for u in list_of_rows}
198
199         for feature in dico_features:
200             logger.info(f'Integrating {[u for u in dico if dico[u]==dico_features[feature]][0]} feature')
201             for station in dico_station:
202                 logger.info(f'  - Dealing with meteo station n°: {station}')                            
203                 csv_file = tempfile.NamedTemporaryFile('w')
204                 dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical'
205                 for csv_meteo in listdir(dir_data):
206                     with open(dir_data / csv_meteo, "r") as f:                
207                         reader = DictReader(f, delimiter=';')
208                         csv_file.write(''.join([row[feature]+",'"+self.__from_date_to_datetz(row["date"], station, dico_station[station])+"',"+str(dico_features[feature])+','+str(dico_station[station])+'\n' for row in reader if row['numer_sta'] == station]))
209                 csv_file.flush()
210                 with open(csv_file.name, 'r') as f:
211                     with PostgreSQLDBConnection.Instance() as db:
212                         db.cursor.copy_from(f, '"METEO_FEATURE_VALUE"', sep=',', null='mq',
213                                             columns=['"METVA_VALUE"','"METVA_DATETIME"','"METFT_ID_METEO_FEATURE"','"METST_ID_METEO_STATION"'])
214
215                         
216
217
218         
219             
220     
221     def __generate(self):
222         # Meteo stations must be collected first, if not in the database
223         with PostgreSQLDBConnection.Instance() as db:
224             db.cursor.execute('SELECT count(*) FROM "METEO_STATION";')
225             updated_meteo_station = db.cursor.fetchone()[0]
226         if not updated_meteo_station:
227             self.__collect_stations()
228
229         # Features from data/meteo_france/config/meteo_features.csv
230         # must be inserted in the database, if not already done
231         with PostgreSQLDBConnection.Instance() as db:
232             db.cursor.execute('SELECT count(*) FROM "METEO_FEATURE";')
233             updated_meteo_features = db.cursor.fetchone()[0]
234         if not updated_meteo_features:
235             self.__insert_features()
236         
237         # Downloading meteofrance historical csv files
238         logger.info('Downloading historical csv files from MeteoFrance, if needed')            
239         self.__collect_historical_data()
240         
241         self.__insert_historical_data()
242
243     def update(self):
244         '''
245         Update the MeteoFrance features with the last available data
246         '''
247         # We collect archive files from MeteoFrance, until the current month
248         # by using the same method than for data generation : this is currently
249         # based on the presence of a synop.+date+.csv' file in the 
250         # data/meteo_france/historical directory. The file corresponding to the 
251         # current month is deleted first, so that its most recent version will be downloaded
252         # by colling self.__collect_historical_data
253         # TODO: updates according to a PostgreSQL request ?
254         logger.info('Update historical csv files from MeteoFrance, if needed')
255         today = datetime.now()
256         todel = 'synop.'+today.strftime("%Y%m")+".csv"
257         remove(self.__data_directory / 'historical' / todel)
258         system("touch "+todel)
259         self.__collect_historical_data()
260         logger.info('Inserting csv files in database')
261         self.__insert_historical_data()
262