1 from extomeAI.lib.connector import PostgreSQLDBConnection
3 from configparser import ConfigParser
4 from csv import DictReader
5 from datetime import datetime, timedelta
6 from os import remove, system, listdir
7 from pathlib import Path
8 from shutil import rmtree
9 from timezonefinder import TimezoneFinder
11 from logging.config import fileConfig
12 from os.path import isfile, basename
13 from urllib.request import urlretrieve
20 fileConfig((Path.cwd() / 'config') / 'logging.cfg')
21 logger = logging.getLogger()
27 See: https://donneespubliques.meteofrance.fr/?fond=produit&id_produit=90&id_rubrique=32
29 self.__data_directory = (Path.cwd() / 'data') / 'meteo_france'
30 # Re-creating data directory architecture for MeteoFrance, if asked
31 config = ConfigParser()
32 config.read((Path.cwd() / 'config') / 'features.cfg')
33 if eval(config['meteofrance']['regenerate']):
34 logger.info("Regenerating meteofrance data directory")
36 rmtree(self.__data_directory)
39 p = Path(self.__data_directory / 'historical')
40 p.mkdir(exist_ok=True, parents=True)
41 p = Path(self.__data_directory / 'config')
42 p.mkdir(exist_ok=True, parents=True)
43 if eval(config['meteofrance']['reinsert']):
44 logger.info("Reinserting meteofrance database")
45 with PostgreSQLDBConnection.Instance() as db:
46 db.cursor.execute(f'DELETE FROM "METEO_FEATURE";')
47 db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_METFT_ID_seq" RESTART WITH 1;')
48 db.cursor.execute(f'DELETE FROM "METEO_STATION";')
49 db.cursor.execute(f'ALTER SEQUENCE "METEO_STATION_METST_ID_seq" RESTART WITH 1;')
50 db.cursor.execute(f'DELETE FROM "METEO_FEATURE_VALUE";')
51 db.cursor.execute(f'ALTER SEQUENCE "METEO_FEATURE_VALUE_METVA_ID_seq" RESTART WITH 1;')
55 def __collect_stations(self):
57 Filling METEO_STATION table from location schema
59 tf = TimezoneFinder(in_memory=True)
61 with PostgreSQLDBConnection.Instance() as db:
62 link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/postesSynop.csv'
63 p = Path(self.__data_directory / 'config' )
64 csv_file = p / basename(link)
65 if not isfile(csv_file):
66 logger.info('Downloading location stations from MeteoFrance')
67 urlretrieve(link, csv_file)
68 with open(csv_file, "r") as f:
69 reader = DictReader(f, delimiter=';')
70 logger.info(f'Inserting location stations in {db.dbname} database')
72 longitude, latitude = eval(row['Longitude']), eval(row['Latitude'])
73 point = (longitude, latitude)
74 timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude']))
75 if timezone_name is None:
76 timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']),
77 lat = eval(row['Latitude']),
79 cet = pytz.timezone(timezone_name)
81 offset = cet.utcoffset(dt, is_dst = True)
82 shift = int(offset / timedelta(hours=1))
83 request = f"""INSERT INTO "METEO_STATION" ("METST_NAME", "METST_IDNAME", "METST_LOCATION", "METST_TIMEZONE")
84 VALUES ('{row['Nom'].replace("'",'’')}', '{row['ID']}',
85 point({row['Latitude']}, {row['Longitude']}), {shift});"""
86 db.cursor.execute(request)
89 def __insert_features(self):
90 logger.info('Inserting MeteoFrance list of features from meteo_features.csv')
91 csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv'
92 with PostgreSQLDBConnection.Instance() as db:
93 with open(csv_file, "r") as f:
94 reader = DictReader(f, delimiter=',')
97 request = f"""INSERT INTO "METEO_FEATURE" ("METFT_NAME", "PARAM_ID_PARAMETER")
98 VALUES ('{row['METFT_NAME']}', {row['PARAM_ID_PARAMETER']});"""
99 db.cursor.execute(request)
103 def __collect_historical_data(self):
105 We collect all csv files from January 1996 until the month
106 before now. The argument in the url to download are of the
107 form 201001 for January 2010. We start by computing all these
108 patterns, in historical list.
110 # List of year-months to consider
112 date_end = datetime.now()
113 for year in range(1996, date_end.year+1):
114 for month in range(1,13):
115 date = datetime(year, month, 1)
117 historical.append(date.strftime("%Y%m"))
119 # We download all csv files from meteofrance that are not in
120 # the data repository
121 meteo_data = self.__data_directory / 'historical'
123 p.mkdir(exist_ok=True, parents=True)
124 for date in historical:
125 if not isfile(meteo_data / ('synop.'+date+'.csv')):
126 link = 'https://donneespubliques.meteofrance.fr/donnees_libres/Txt/Synop/Archive/synop.'
127 link += date + '.csv.gz'
128 download_path = meteo_data / basename(link)
129 urlretrieve(link, download_path)
130 with gzip.open(download_path, 'rb') as f:
131 csv_file = meteo_data / basename(link[:-3])
132 with open(csv_file, 'w') as g:
133 g.write(f.read().decode())
134 remove(meteo_data / basename(link))
137 def __from_date_to_datetz(self, date, a, b):
138 if not hasattr(self, '__meteo_station_tz'):
139 self.__meteo_station_tz = {}
140 tf = TimezoneFinder(in_memory=True)
141 with PostgreSQLDBConnection.Instance() as db:
142 db.cursor.execute('select "METST_IDNAME", "METST_LOCATION" from "METEO_STATION";')
143 list_of_rows = db.cursor.fetchall()
144 for k in list_of_rows:
146 longitude, latitude = eval(k[1])
147 print(longitude, latitude)
148 print(type(longitude))
149 timezone_name = tf.timezone_at(lng = longitude, lat = latitude)
150 if timezone_name is None:
151 timezone_name = tf.closest_timezone_at(lng = longitude,
154 exact_computation=True,
155 #return_distances=True,
156 force_evaluation=True)
157 cet = pytz.timezone(timezone_name)
159 offset = cet.utcoffset(dt, is_dst = True)
160 shift = int(offset / timedelta(hours=1))
161 self.__meteo_station_tz[k[0]] = shift
163 print(self.__meteo_station_tz)
165 '''longitude, latitude = eval(row['Longitude']), eval(row['Latitude'])
166 point = (longitude, latitude)
167 timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude']))
168 if timezone_name is None:
169 timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']),
170 lat = eval(row['Latitude']),
172 cet = pytz.timezone(timezone_name)
174 offset = cet.utcoffset(dt, is_dst = True)
175 shift = int(offset / timedelta(hours=1))
177 self.__meteo_station_tz'''
179 return date[:4]+'-'+date[4:6]+'-'+date[6:8]+' '+date[8:10]+':00:00+01'
182 def __insert_historical_data(self):
183 csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv'
184 with PostgreSQLDBConnection.Instance() as db:
185 db.cursor.execute('SELECT * from "METEO_FEATURE";')
186 list_of_rows = db.cursor.fetchall()
187 dico = {u[1]:u[0] for u in list_of_rows}
189 with open(csv_file, "r") as f:
190 reader = DictReader(f, delimiter=',')
192 dico_features = {row["abbreviation"]:dico[row["METFT_NAME"]] for row in reader}
194 with PostgreSQLDBConnection.Instance() as db:
195 db.cursor.execute('SELECT * from "METEO_STATION";')
196 list_of_rows = db.cursor.fetchall()
197 dico_station = {u[2]:u[0] for u in list_of_rows}
199 for feature in dico_features:
200 logger.info(f'Integrating {[u for u in dico if dico[u]==dico_features[feature]][0]} feature')
201 for station in dico_station:
202 logger.info(f' - Dealing with meteo station n°: {station}')
203 csv_file = tempfile.NamedTemporaryFile('w')
204 dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical'
205 for csv_meteo in listdir(dir_data):
206 with open(dir_data / csv_meteo, "r") as f:
207 reader = DictReader(f, delimiter=';')
208 csv_file.write(''.join([row[feature]+",'"+self.__from_date_to_datetz(row["date"], station, dico_station[station])+"',"+str(dico_features[feature])+','+str(dico_station[station])+'\n' for row in reader if row['numer_sta'] == station]))
210 with open(csv_file.name, 'r') as f:
211 with PostgreSQLDBConnection.Instance() as db:
212 db.cursor.copy_from(f, '"METEO_FEATURE_VALUE"', sep=',', null='mq',
213 columns=['"METVA_VALUE"','"METVA_DATETIME"','"METFT_ID_METEO_FEATURE"','"METST_ID_METEO_STATION"'])
221 def __generate(self):
222 # Meteo stations must be collected first, if not in the database
223 with PostgreSQLDBConnection.Instance() as db:
224 db.cursor.execute('SELECT count(*) FROM "METEO_STATION";')
225 updated_meteo_station = db.cursor.fetchone()[0]
226 if not updated_meteo_station:
227 self.__collect_stations()
229 # Features from data/meteo_france/config/meteo_features.csv
230 # must be inserted in the database, if not already done
231 with PostgreSQLDBConnection.Instance() as db:
232 db.cursor.execute('SELECT count(*) FROM "METEO_FEATURE";')
233 updated_meteo_features = db.cursor.fetchone()[0]
234 if not updated_meteo_features:
235 self.__insert_features()
237 # Downloading meteofrance historical csv files
238 logger.info('Downloading historical csv files from MeteoFrance, if needed')
239 self.__collect_historical_data()
241 self.__insert_historical_data()
245 Update the MeteoFrance features with the last available data
247 # We collect archive files from MeteoFrance, until the current month
248 # by using the same method than for data generation : this is currently
249 # based on the presence of a synop.+date+.csv' file in the
250 # data/meteo_france/historical directory. The file corresponding to the
251 # current month is deleted first, so that its most recent version will be downloaded
252 # by colling self.__collect_historical_data
253 # TODO: updates according to a PostgreSQL request ?
254 logger.info('Update historical csv files from MeteoFrance, if needed')
255 today = datetime.now()
256 todel = 'synop.'+today.strftime("%Y%m")+".csv"
257 remove(self.__data_directory / 'historical' / todel)
258 system("touch "+todel)
259 self.__collect_historical_data()
260 logger.info('Inserting csv files in database')
261 self.__insert_historical_data()