-
-
- def __from_date_to_datetz(self, date, a, b):
- if not hasattr(self, '__meteo_station_tz'):
- self.__meteo_station_tz = {}
- tf = TimezoneFinder(in_memory=True)
- with PostgreSQLDBConnection.Instance() as db:
- db.cursor.execute('select "METST_IDNAME", "METST_LOCATION" from "METEO_STATION";')
- list_of_rows = db.cursor.fetchall()
- for k in list_of_rows:
- print('\n',k)
- longitude, latitude = eval(k[1])
- print(longitude, latitude)
- print(type(longitude))
- timezone_name = tf.timezone_at(lng = longitude, lat = latitude)
- if timezone_name is None:
- timezone_name = tf.closest_timezone_at(lng = longitude,
- lat = latitude,
- delta_degree = 13,
- exact_computation=True,
- #return_distances=True,
- force_evaluation=True)
- cet = pytz.timezone(timezone_name)
- dt = datetime.now()
- offset = cet.utcoffset(dt, is_dst = True)
- shift = int(offset / timedelta(hours=1))
- self.__meteo_station_tz[k[0]] = shift
-
- print(self.__meteo_station_tz)
- exit()
- '''longitude, latitude = eval(row['Longitude']), eval(row['Latitude'])
- point = (longitude, latitude)
- timezone_name = tf.timezone_at(lng = eval(row['Longitude']), lat = eval(row['Latitude']))
- if timezone_name is None:
- timezone_name = tf.closest_timezone_at(lng = eval(row['Longitude']),
- lat = eval(row['Latitude']),
- delta_degree = 5)
- cet = pytz.timezone(timezone_name)
- dt = datetime.now()
- offset = cet.utcoffset(dt, is_dst = True)
- shift = int(offset / timedelta(hours=1))
-
- self.__meteo_station_tz'''
- exit()
- return date[:4]+'-'+date[4:6]+'-'+date[6:8]+' '+date[8:10]+':00:00+01'
-
-
- def __insert_historical_data(self):
- csv_file = Path.cwd() / 'config' / 'features' / 'meteofrance' / 'meteofrance_features.csv'
- with PostgreSQLDBConnection.Instance() as db:
- db.cursor.execute('SELECT * from "METEO_FEATURE";')
- list_of_rows = db.cursor.fetchall()
- dico = {u[1]:u[0] for u in list_of_rows}
-
- with open(csv_file, "r") as f:
- reader = DictReader(f, delimiter=',')
- next(reader)
- dico_features = {row["abbreviation"]:dico[row["METFT_NAME"]] for row in reader}
-
- with PostgreSQLDBConnection.Instance() as db:
- db.cursor.execute('SELECT * from "METEO_STATION";')
- list_of_rows = db.cursor.fetchall()
- dico_station = {u[2]:u[0] for u in list_of_rows}
-
- for feature in dico_features:
- logger.info(f'Integrating {[u for u in dico if dico[u]==dico_features[feature]][0]} feature')
- for station in dico_station:
- logger.info(f' - Dealing with meteo station n°: {station}')
- csv_file = tempfile.NamedTemporaryFile('w')
- dir_data = Path.cwd() / 'data' / 'meteo_france' / 'historical'
- for csv_meteo in listdir(dir_data):
- with open(dir_data / csv_meteo, "r") as f:
- reader = DictReader(f, delimiter=';')
- csv_file.write(''.join([row[feature]+",'"+self.__from_date_to_datetz(row["date"], station, dico_station[station])+"',"+str(dico_features[feature])+','+str(dico_station[station])+'\n' for row in reader if row['numer_sta'] == station]))
- csv_file.flush()
- with open(csv_file.name, 'r') as f:
- with PostgreSQLDBConnection.Instance() as db:
- db.cursor.copy_from(f, '"METEO_FEATURE_VALUE"', sep=',', null='mq',
- columns=['"METVA_VALUE"','"METVA_DATETIME"','"METFT_ID_METEO_FEATURE"','"METST_ID_METEO_STATION"'])
-
-
-
-
-
-
-
- def __generate(self):
- # Meteo stations must be collected first, if not in the database
- with PostgreSQLDBConnection.Instance() as db:
- db.cursor.execute('SELECT count(*) FROM "METEO_STATION";')
- updated_meteo_station = db.cursor.fetchone()[0]
- if not updated_meteo_station:
- self.__collect_stations()
-
- # Features from data/meteo_france/config/meteo_features.csv
- # must be inserted in the database, if not already done
- with PostgreSQLDBConnection.Instance() as db:
- db.cursor.execute('SELECT count(*) FROM "METEO_FEATURE";')
- updated_meteo_features = db.cursor.fetchone()[0]
- if not updated_meteo_features:
- self.__insert_features()
-
- # Downloading meteofrance historical csv files
- logger.info('Downloading historical csv files from MeteoFrance, if needed')
- self.__collect_historical_data()
-
- self.__insert_historical_data()