From dd9fff23280040f1937cd8d21e92869a9e57aeb3 Mon Sep 17 00:00:00 2001 From: kiran morrison Date: Fri, 28 Apr 2023 16:51:23 +0100 Subject: [PATCH 01/18] first pass at ne-iso --- dweather_client/client.py | 38 ++++++++++++++++++++++++--------- dweather_client/ipfs_queries.py | 30 ++++++++++++++++++++------ 2 files changed, 51 insertions(+), 17 deletions(-) diff --git a/dweather_client/client.py b/dweather_client/client.py index c9d75bc..8b7447c 100644 --- a/dweather_client/client.py +++ b/dweather_client/client.py @@ -9,7 +9,8 @@ from dweather_client.struct_utils import tupleify, convert_nans_to_none import datetime import pytz -import csv, json +import csv +import json import inspect import numpy as np import pandas as pd @@ -485,6 +486,7 @@ def get_hourly_station_history(dataset, station_id, weather_variable, use_imperi v) for k, v in final_resp_series.to_dict().items()} return result + def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_units=True, desired_units=None, ipfs_timeout=None): """ This is almost an exact copy of get_hourly_station_history @@ -543,14 +545,24 @@ def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_ "Invalid weather variable for this station") try: - if dataset == "inmet_brazil-hourly": + if dataset in ["inmet_brazil-hourly"]: with CsvStationDataset(dataset=dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: - csv_text = dataset_obj.get_data(station_id, weather_variable) + csv_text_list = dataset_obj.get_data( + station_id, weather_variable) + if dataset in ["ne_iso-hourly"]: + with CsvStationDataset(dataset=dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: + csv_text_list = dataset_obj.get_data_recursive(station_id) else: raise DatasetError("No such dataset in dClimate") except ipfshttpclient.exceptions.ErrorResponse: raise StationNotFoundError("Invalid station ID for dataset") - df = pd.read_csv(StringIO(csv_text)) + + # Create df from all individual files + dfs = [] + for csv_text in csv_text_list: + dfs.append(pd.read_csv(StringIO(csv_text))) + + df = pd.concat(dfs, ignore_index=True) str_resp_series = df[column_name].astype(str) df = df.set_index("dt") if desired_units: @@ -579,8 +591,9 @@ def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_ v) for k, v in final_resp_series.to_dict().items()} return result + def get_station_forecast_history(dataset, station_id, forecast_date, desired_units=None, ipfs_timeout=None): - try: + try: with StationForecastDataset(dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: csv_text = dataset_obj.get_data(station_id, forecast_date) history = {} @@ -588,9 +601,11 @@ def get_station_forecast_history(dataset, station_id, forecast_date, desired_uni headers = next(reader) date_col = headers.index('DATE') try: # Make sure weather variable is correct. - data_col = headers.index("SETT") #at the moment the only variable is "SETT" + # at the moment the only variable is "SETT" + data_col = headers.index("SETT") except ValueError: - raise WeatherVariableNotFoundError("Invalid weather variable for this station") + raise WeatherVariableNotFoundError( + "Invalid weather variable for this station") for row in reader: try: if not row: @@ -599,11 +614,12 @@ def get_station_forecast_history(dataset, station_id, forecast_date, desired_uni row[date_col], "%Y-%m-%d").date()] = float(row[data_col]) except ValueError: history[datetime.datetime.strptime( - row[date_col], "%Y-%m-%d").date()] = row[data_col] + row[date_col], "%Y-%m-%d").date()] = row[data_col] return history except ipfshttpclient.exceptions.ErrorResponse: raise StationNotFoundError("Invalid station ID for dataset") + def get_station_forecast_stations(dataset, forecast_date, desired_units=None, ipfs_timeout=None): with StationForecastDataset(dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: csv_text = dataset_obj.get_stations(forecast_date) @@ -847,6 +863,7 @@ def has_dataset_updated(dataset, slices, as_of, ipfs_timeout=None): ranges = dataset_obj.get_data(as_of) return has_changed(slices, ranges) + def get_teleconnections_history(weather_variable, ipfs_timeout=None): with TeleconnectionsDataset(ipfs_timeout=ipfs_timeout) as dataset_obj: csv_text = dataset_obj.get_data() @@ -874,6 +891,7 @@ def get_teleconnections_history(weather_variable, ipfs_timeout=None): row[date_col], "%Y-%m-%d").date()] = row[data_col] return history + def get_eaufrance_history(station, weather_variable, use_imperial_units=False, desired_units=None, ipfs_timeout=None): try: with EauFranceDataset(ipfs_timeout=ipfs_timeout) as dataset_obj: @@ -892,7 +910,8 @@ def get_eaufrance_history(station, weather_variable, use_imperial_units=False, d converted_resp_series = pd.Series( converter(df[weather_variable].values*dweather_unit), index=df.index) except ValueError: - raise UnitError(f"Specified unit is incompatible with original, original units are {original_units} and requested units are {desired_units}") + raise UnitError( + f"Specified unit is incompatible with original, original units are {original_units} and requested units are {desired_units}") if desired_units is not None: rounded_resp_array = np.vectorize(rounding_formula_temperature)( str_resp_series, converted_resp_series) @@ -908,4 +927,3 @@ def get_eaufrance_history(station, weather_variable, use_imperial_units=False, d return result except ipfshttpclient.exceptions.ErrorResponse: raise StationNotFoundError("Invalid station ID for dataset") - \ No newline at end of file diff --git a/dweather_client/ipfs_queries.py b/dweather_client/ipfs_queries.py index ad97e58..80b8299 100644 --- a/dweather_client/ipfs_queries.py +++ b/dweather_client/ipfs_queries.py @@ -662,7 +662,16 @@ def get_data(self, station, weather_variable=None): # so this is an optional arg super().get_data() file_name = f"{self.head}/{station}.csv" - return self.get_file_object(file_name).read().decode("utf-8") + return [self.get_file_object(file_name).read().decode("utf-8")] + + def get_data_recursive(self, station, weather_variable=None): + hashes = self.traverse_ll(self.head) + ret_list = [] + for h in hashes: + file_name = f"{h}/{station}.csv" + ret_list.append(self.get_file_object( + file_name).read().decode("utf-8")) + return ret_list class YieldDatasets(IpfsDataset): @@ -1111,7 +1120,8 @@ def get_full_date_range_from_metadata(self, h): return: list of [start_time, end_time] """ metadata = self.get_metadata(h) - str_dates = (metadata["api documentation"]["full date range"][0], metadata["api documentation"]["full date range"][1]) + str_dates = (metadata["api documentation"]["full date range"] + [0], metadata["api documentation"]["full date range"][1]) return [datetime.datetime.fromisoformat(dt).date() for dt in str_dates] def get_relevant_hash(self, forecast_date): @@ -1142,10 +1152,13 @@ def get_relevant_hash(self, forecast_date): d) for d in prev_metadata["date range"]] if prev_date_range[0] <= forecast_date <= prev_date_range[1]: return prev_hash - prev_hash = prev_metadata['previous hash'] # iterate backwards in the link list one step + # iterate backwards in the link list one step + prev_hash = prev_metadata['previous hash'] # If this script runs to the end without returning anything or an error, the forecast date must fall in a hole in the data - raise DateOutOfRangeError("forecast date unavailable due to holes in data") # NOTE only returns if there are holes in the data + # NOTE only returns if there are holes in the data + raise DateOutOfRangeError( + "forecast date unavailable due to holes in data") def get_weather_dict(self, forecast_date, ipfs_hash, lat, lon): """ @@ -1190,6 +1203,7 @@ def get_data(self, lat, lon, forecast_date): return (float(ret_lat), float(ret_lon)), pd.Series(weather_dict) + class StationForecastDataset(ForecastDataset): """ Instantiable class for pulling in station data that is also forecast data. @@ -1198,11 +1212,11 @@ class StationForecastDataset(ForecastDataset): @property def dataset(self): return self._dataset - + def __init__(self, dataset, **kwargs): super().__init__(dataset, 1) self.head = get_heads()[self.dataset] - + def get_data(self, station, forecast_date): relevant_hash = self.get_relevant_hash(forecast_date) return self.get_file_object(f"{relevant_hash}/{station}.csv").read().decode("utf-8") @@ -1211,6 +1225,7 @@ def get_stations(self, forecast_date): relevant_hash = self.get_relevant_hash(forecast_date) return self.get_file_object(f"{relevant_hash}/stations.json").read().decode("utf-8") + class TeleconnectionsDataset(IpfsDataset): """ Instantiable class used for pulling in el nino teleconnections data @@ -1227,6 +1242,7 @@ def get_data(self): file_name = f"{self.head}/teleconnections_{year_month}.csv" return self.get_file_object(file_name).read().decode("utf-8") + class EauFranceDataset(IpfsDataset): """ Instantiable class used for pulling in el nino teleconnections data @@ -1240,4 +1256,4 @@ def get_data(self, station): super().get_data() metadata = self.get_metadata(self.head) file_name = f"{self.head}/{station}.csv" - return self.get_file_object(file_name).read().decode("utf-8") \ No newline at end of file + return self.get_file_object(file_name).read().decode("utf-8") From d8632faa38d165f6411be7d4863da1d5f9ad927b Mon Sep 17 00:00:00 2001 From: kiran morrison Date: Tue, 2 May 2023 12:35:52 +0100 Subject: [PATCH 02/18] solving heads issue --- dweather_client/client.py | 3 ++- dweather_client/ipfs_queries.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/dweather_client/client.py b/dweather_client/client.py index 8b7447c..eb4862a 100644 --- a/dweather_client/client.py +++ b/dweather_client/client.py @@ -495,6 +495,7 @@ def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_ instead of the others here in client. That list currently stands at: - inmet_brazil-hourly + - ne_iso-hourly """ # Get original units from metadata original_units = None @@ -522,7 +523,7 @@ def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_ # doesn't exist at all for this dataset if original_units == None: raise WeatherVariableNotFoundError( - "Invalid weather variable for this dataset, none of the stations contain it") + f"Invalid weather variable for this dataset {weather_variable}, none of the stations contain it") # Check each station to see if it has the same station name # if none do, then the station is invalid diff --git a/dweather_client/ipfs_queries.py b/dweather_client/ipfs_queries.py index 80b8299..d4dba01 100644 --- a/dweather_client/ipfs_queries.py +++ b/dweather_client/ipfs_queries.py @@ -665,7 +665,7 @@ def get_data(self, station, weather_variable=None): return [self.get_file_object(file_name).read().decode("utf-8")] def get_data_recursive(self, station, weather_variable=None): - hashes = self.traverse_ll(self.head) + hashes = super().traverse_ll(self.head) ret_list = [] for h in hashes: file_name = f"{h}/{station}.csv" From 03bb7036815fe9a65770e06546a9185b4be80420 Mon Sep 17 00:00:00 2001 From: kiran morrison Date: Tue, 2 May 2023 13:07:43 +0100 Subject: [PATCH 03/18] resetting to master --- dweather_client/client.py | 19 ++++--------------- dweather_client/ipfs_queries.py | 11 +---------- 2 files changed, 5 insertions(+), 25 deletions(-) diff --git a/dweather_client/client.py b/dweather_client/client.py index eb4862a..b76302c 100644 --- a/dweather_client/client.py +++ b/dweather_client/client.py @@ -495,7 +495,6 @@ def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_ instead of the others here in client. That list currently stands at: - inmet_brazil-hourly - - ne_iso-hourly """ # Get original units from metadata original_units = None @@ -523,7 +522,7 @@ def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_ # doesn't exist at all for this dataset if original_units == None: raise WeatherVariableNotFoundError( - f"Invalid weather variable for this dataset {weather_variable}, none of the stations contain it") + "Invalid weather variable for this dataset, none of the stations contain it") # Check each station to see if it has the same station name # if none do, then the station is invalid @@ -546,24 +545,14 @@ def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_ "Invalid weather variable for this station") try: - if dataset in ["inmet_brazil-hourly"]: + if dataset == "inmet_brazil-hourly": with CsvStationDataset(dataset=dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: - csv_text_list = dataset_obj.get_data( - station_id, weather_variable) - if dataset in ["ne_iso-hourly"]: - with CsvStationDataset(dataset=dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: - csv_text_list = dataset_obj.get_data_recursive(station_id) + csv_text = dataset_obj.get_data(station_id, weather_variable) else: raise DatasetError("No such dataset in dClimate") except ipfshttpclient.exceptions.ErrorResponse: raise StationNotFoundError("Invalid station ID for dataset") - - # Create df from all individual files - dfs = [] - for csv_text in csv_text_list: - dfs.append(pd.read_csv(StringIO(csv_text))) - - df = pd.concat(dfs, ignore_index=True) + df = pd.read_csv(StringIO(csv_text)) str_resp_series = df[column_name].astype(str) df = df.set_index("dt") if desired_units: diff --git a/dweather_client/ipfs_queries.py b/dweather_client/ipfs_queries.py index d4dba01..335c792 100644 --- a/dweather_client/ipfs_queries.py +++ b/dweather_client/ipfs_queries.py @@ -662,16 +662,7 @@ def get_data(self, station, weather_variable=None): # so this is an optional arg super().get_data() file_name = f"{self.head}/{station}.csv" - return [self.get_file_object(file_name).read().decode("utf-8")] - - def get_data_recursive(self, station, weather_variable=None): - hashes = super().traverse_ll(self.head) - ret_list = [] - for h in hashes: - file_name = f"{h}/{station}.csv" - ret_list.append(self.get_file_object( - file_name).read().decode("utf-8")) - return ret_list + return self.get_file_object(file_name).read().decode("utf-8") class YieldDatasets(IpfsDataset): From 141adbef2a0f6b56db9f874baffa5bbc6ebe8877 Mon Sep 17 00:00:00 2001 From: kiran morrison Date: Tue, 2 May 2023 13:10:23 +0100 Subject: [PATCH 04/18] using a list of csv text instead of just one --- dweather_client/client.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/dweather_client/client.py b/dweather_client/client.py index b76302c..612490e 100644 --- a/dweather_client/client.py +++ b/dweather_client/client.py @@ -547,12 +547,18 @@ def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_ try: if dataset == "inmet_brazil-hourly": with CsvStationDataset(dataset=dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: - csv_text = dataset_obj.get_data(station_id, weather_variable) + csv_text_list = [dataset_obj.get_data( + station_id, weather_variable)] else: raise DatasetError("No such dataset in dClimate") except ipfshttpclient.exceptions.ErrorResponse: raise StationNotFoundError("Invalid station ID for dataset") - df = pd.read_csv(StringIO(csv_text)) + + # concat together all retrieved station csv texts + dfs = [] + for csv_text in csv_text_list: + dfs.append(pd.read_csv(StringIO(csv_text))) + df = pd.concat(dfs, ignore_index=True) str_resp_series = df[column_name].astype(str) df = df.set_index("dt") if desired_units: From 424595d2e6a67ca7d292397ee660716f3c68f1a2 Mon Sep 17 00:00:00 2001 From: kiran morrison Date: Tue, 2 May 2023 13:18:48 +0100 Subject: [PATCH 05/18] attempt at ne-iso with list system --- dweather_client/client.py | 6 +++++- dweather_client/ipfs_queries.py | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/dweather_client/client.py b/dweather_client/client.py index 612490e..3a324d3 100644 --- a/dweather_client/client.py +++ b/dweather_client/client.py @@ -545,10 +545,14 @@ def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_ "Invalid weather variable for this station") try: - if dataset == "inmet_brazil-hourly": + if dataset in ["inmet_brazil-hourly"]: with CsvStationDataset(dataset=dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: csv_text_list = [dataset_obj.get_data( station_id, weather_variable)] + if dataset in ["ne_iso-hourly"]: + with CsvStationDataset(dataset=dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: + csv_text_list = dataset_obj.get_data_recursive( + station_id, weather_variable) else: raise DatasetError("No such dataset in dClimate") except ipfshttpclient.exceptions.ErrorResponse: diff --git a/dweather_client/ipfs_queries.py b/dweather_client/ipfs_queries.py index 335c792..1689b73 100644 --- a/dweather_client/ipfs_queries.py +++ b/dweather_client/ipfs_queries.py @@ -657,6 +657,13 @@ def __init__(self, dataset, ipfs_timeout=None): super().__init__(ipfs_timeout=ipfs_timeout) self._dataset = dataset + def get_hashes(self): + """ + return: list of all hashes in dataset + """ + hashes = self.traverse_ll(self.head, self.as_of) + return list(hashes) + def get_data(self, station, weather_variable=None): # only some stations need weather variable # so this is an optional arg @@ -664,6 +671,18 @@ def get_data(self, station, weather_variable=None): file_name = f"{self.head}/{station}.csv" return self.get_file_object(file_name).read().decode("utf-8") + def get_data_recursive(self, station, weather_variable=None): + # only some stations need weather variable + # so this is an optional arg + super().get_data() + hashes = self.get_hashes() + csv_text_list = [] + for hash_ in hashes: + file_name = f"{hash_}/{station}.csv" + csv_text_list.append(self.get_file_object( + file_name).read().decode("utf-8")) + return csv_text_list + class YieldDatasets(IpfsDataset): """ From 5546fdd510f14ac09924f4d9a7a4a961bc26908e Mon Sep 17 00:00:00 2001 From: kiran morrison Date: Tue, 2 May 2023 13:20:21 +0100 Subject: [PATCH 06/18] removing list approach for dataset name matching --- dweather_client/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dweather_client/client.py b/dweather_client/client.py index 3a324d3..302e6a0 100644 --- a/dweather_client/client.py +++ b/dweather_client/client.py @@ -545,11 +545,11 @@ def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_ "Invalid weather variable for this station") try: - if dataset in ["inmet_brazil-hourly"]: + if dataset == "inmet_brazil-hourly": with CsvStationDataset(dataset=dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: csv_text_list = [dataset_obj.get_data( station_id, weather_variable)] - if dataset in ["ne_iso-hourly"]: + if dataset == "ne_iso-hourly": with CsvStationDataset(dataset=dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: csv_text_list = dataset_obj.get_data_recursive( station_id, weather_variable) From 320d1a204f0eed78552470042898818a4e5ed0f7 Mon Sep 17 00:00:00 2001 From: kiran morrison Date: Tue, 2 May 2023 13:27:17 +0100 Subject: [PATCH 07/18] attempting to fix with if/elif --- dweather_client/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dweather_client/client.py b/dweather_client/client.py index 302e6a0..6d94508 100644 --- a/dweather_client/client.py +++ b/dweather_client/client.py @@ -545,11 +545,11 @@ def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_ "Invalid weather variable for this station") try: - if dataset == "inmet_brazil-hourly": + if dataset in ["inmet_brazil-hourly"]: with CsvStationDataset(dataset=dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: csv_text_list = [dataset_obj.get_data( station_id, weather_variable)] - if dataset == "ne_iso-hourly": + elif dataset in ["ne_iso-hourly"]: with CsvStationDataset(dataset=dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: csv_text_list = dataset_obj.get_data_recursive( station_id, weather_variable) From 7b9e51477630237d5c92fcecf045631cbb8a588a Mon Sep 17 00:00:00 2001 From: kiran morrison Date: Tue, 2 May 2023 13:44:19 +0100 Subject: [PATCH 08/18] ne-iso cleaned comments --- dweather_client/client.py | 2 ++ dweather_client/ipfs_queries.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/dweather_client/client.py b/dweather_client/client.py index 6d94508..2b68c63 100644 --- a/dweather_client/client.py +++ b/dweather_client/client.py @@ -545,10 +545,12 @@ def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_ "Invalid weather variable for this station") try: + # RawSet style where we only want the most recent file if dataset in ["inmet_brazil-hourly"]: with CsvStationDataset(dataset=dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: csv_text_list = [dataset_obj.get_data( station_id, weather_variable)] + # ClimateSet style where we need the entire linked list history elif dataset in ["ne_iso-hourly"]: with CsvStationDataset(dataset=dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: csv_text_list = dataset_obj.get_data_recursive( diff --git a/dweather_client/ipfs_queries.py b/dweather_client/ipfs_queries.py index 1689b73..8bca5a2 100644 --- a/dweather_client/ipfs_queries.py +++ b/dweather_client/ipfs_queries.py @@ -675,6 +675,8 @@ def get_data_recursive(self, station, weather_variable=None): # only some stations need weather variable # so this is an optional arg super().get_data() + # get all hashes and then effectively just use get_data + # recursively to get a full list of csvs hashes = self.get_hashes() csv_text_list = [] for hash_ in hashes: From feb1dc1779ff5646b2ac70da2c95f849000de6ec Mon Sep 17 00:00:00 2001 From: PrettyHertz Date: Wed, 3 May 2023 08:55:13 -0500 Subject: [PATCH 09/18] Updated requirements to non-breaking pandas version Updated requirements to non-breaking pandas version --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 822e4ad..20f34db 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ ipfshttpclient==0.8.0a2 -pandas>=1.1.4 +pandas>=1.1.4, < 2.0.0 pytest>=6.1.2 pytest-mock==3.6.1 requests From d1fa1ded122ea8f70bcf968764162e546a44a724 Mon Sep 17 00:00:00 2001 From: PrettyHertz Date: Mon, 15 May 2023 12:01:15 -0500 Subject: [PATCH 10/18] Revert "Initial changes for teleconnections" This reverts commit 81368bd421c753267af75404ce3047920fa5fb2a. From 4e6b4e52bddc061983ccdaa039527a0840a2de26 Mon Sep 17 00:00:00 2001 From: PrettyHertz Date: Mon, 15 May 2023 12:03:39 -0500 Subject: [PATCH 11/18] changes for new telecon stations --- dweather_client/client.py | 11 +++++------ dweather_client/ipfs_queries.py | 6 +++--- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/dweather_client/client.py b/dweather_client/client.py index 2b68c63..8a01620 100644 --- a/dweather_client/client.py +++ b/dweather_client/client.py @@ -868,16 +868,15 @@ def has_dataset_updated(dataset, slices, as_of, ipfs_timeout=None): def get_teleconnections_history(weather_variable, ipfs_timeout=None): with TeleconnectionsDataset(ipfs_timeout=ipfs_timeout) as dataset_obj: - csv_text = dataset_obj.get_data() + try: + csv_text = dataset_obj.get_data(weather_variable) + except ValueError: + raise WeatherVariableNotFoundError( + "Invalid weather variable for this station") history = {} reader = csv.reader(csv_text.split('\n')) headers = next(reader) date_col = headers.index('DATE') - try: # Make sure weather variable is correct. - data_col = headers.index(weather_variable) - except ValueError: - raise WeatherVariableNotFoundError( - "Invalid weather variable for this station") for row in reader: try: if row[data_col] == '': diff --git a/dweather_client/ipfs_queries.py b/dweather_client/ipfs_queries.py index 8bca5a2..a08afd0 100644 --- a/dweather_client/ipfs_queries.py +++ b/dweather_client/ipfs_queries.py @@ -1247,11 +1247,11 @@ class TeleconnectionsDataset(IpfsDataset): def __init__(self, ipfs_timeout=None): super().__init__(ipfs_timeout=ipfs_timeout) - def get_data(self): + def get_data(self, station): super().get_data() metadata = self.get_metadata(self.head) - year_month = metadata["time generated"][:7] - file_name = f"{self.head}/teleconnections_{year_month}.csv" + + file_name = f"{self.head}/{station}.csv" return self.get_file_object(file_name).read().decode("utf-8") From cf44fcfc97a3a8174254596de60f17d54f82233d Mon Sep 17 00:00:00 2001 From: PrettyHertz Date: Mon, 15 May 2023 12:59:37 -0500 Subject: [PATCH 12/18] undid changes that broke data column --- dweather_client/client.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/dweather_client/client.py b/dweather_client/client.py index 8a01620..e349943 100644 --- a/dweather_client/client.py +++ b/dweather_client/client.py @@ -868,15 +868,16 @@ def has_dataset_updated(dataset, slices, as_of, ipfs_timeout=None): def get_teleconnections_history(weather_variable, ipfs_timeout=None): with TeleconnectionsDataset(ipfs_timeout=ipfs_timeout) as dataset_obj: - try: - csv_text = dataset_obj.get_data(weather_variable) - except ValueError: - raise WeatherVariableNotFoundError( - "Invalid weather variable for this station") + csv_text = dataset_obj.get_data(weather_variable) history = {} reader = csv.reader(csv_text.split('\n')) headers = next(reader) date_col = headers.index('DATE') + try: + data_col=headers.index(weather_variable) + except ValueError: + raise WeatherVariableNotFoundError( + "Invalid weather variable for this station") for row in reader: try: if row[data_col] == '': From 8d11fadbfb8f7d4a09d5d4689c2dbb22738b7aec Mon Sep 17 00:00:00 2001 From: PrettyHertz Date: Mon, 15 May 2023 13:02:31 -0500 Subject: [PATCH 13/18] adjusted client to call "value" --- dweather_client/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dweather_client/client.py b/dweather_client/client.py index e349943..bbbc8dd 100644 --- a/dweather_client/client.py +++ b/dweather_client/client.py @@ -874,7 +874,7 @@ def get_teleconnections_history(weather_variable, ipfs_timeout=None): headers = next(reader) date_col = headers.index('DATE') try: - data_col=headers.index(weather_variable) + data_col=headers.index("value") except ValueError: raise WeatherVariableNotFoundError( "Invalid weather variable for this station") From 07e99e917e1415fea6e9322405bf3d12998f2da2 Mon Sep 17 00:00:00 2001 From: PrettyHertz Date: Wed, 24 May 2023 10:59:45 -0500 Subject: [PATCH 14/18] Changes for as_of Added as_of as a kwarg for historical storms (IBTRaCS) --- dweather_client/client.py | 7 +++--- dweather_client/storms_datasets.py | 32 +++++++++++++++++++++++++++- dweather_client/tests/test_client.py | 5 +++++ 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/dweather_client/client.py b/dweather_client/client.py index bbbc8dd..6b54341 100644 --- a/dweather_client/client.py +++ b/dweather_client/client.py @@ -247,6 +247,7 @@ def get_tropical_storms( min_lon=None, max_lat=None, max_lon=None, + as_of=None, ipfs_timeout=None): """ return: @@ -289,11 +290,11 @@ def get_tropical_storms( with cm as storm_getter: if radius: - return storm_getter.get_data(basin, radius=radius, lat=lat, lon=lon) + return storm_getter.get_data(basin, radius=radius, lat=lat, lon=lon, as_of=as_of) elif min_lat: - return storm_getter.get_data(basin, min_lat=min_lat, min_lon=min_lon, max_lat=max_lat, max_lon=max_lon) + return storm_getter.get_data(basin, min_lat=min_lat, min_lon=min_lon, max_lat=max_lat, max_lon=max_lon, as_of=as_of) else: - return storm_getter.get_data(basin) + return storm_getter.get_data(basin, as_of=as_of) def get_station_history( diff --git a/dweather_client/storms_datasets.py b/dweather_client/storms_datasets.py index 86b122f..e579901 100644 --- a/dweather_client/storms_datasets.py +++ b/dweather_client/storms_datasets.py @@ -1,5 +1,6 @@ import gzip import json +import datetime from abc import abstractmethod import pandas as pd @@ -23,7 +24,8 @@ def get_data(self, basin, **kwargs): if basin not in {'NI', 'SI', 'NA', 'EP', 'WP', 'SP', 'SA'}: raise ValueError("Invalid basin ID") super().get_data() - file_obj = self.get_file_object(f"{self.head}/ibtracs-{basin}.csv.gz") + ipfs_hash = self.get_relevant_hash(kwargs["as_of"]) + file_obj = self.get_file_object(f"{ipfs_hash}/ibtracs-{basin}.csv.gz") df = pd.read_csv( file_obj, na_values=["", " "], keep_default_na=False, low_memory=False, compression="gzip" ) @@ -40,6 +42,34 @@ def get_data(self, basin, **kwargs): return processed_df + def get_relevant_hash(self, as_of_date): + """ + return the ipfs hash required to pull in data for a forecast date + """ + cur_hash = self.head + if as_of_date == None: + return cur_hash + cur_metadata = self.get_metadata(cur_hash) + # This routine is agnostic to the order of data contained in the hashes (at a cost of inefficiency) -- if the data contains the forecast date, it WILL be found, eventually + most_recent_date = datetime.datetime.fromisoformat(cur_metadata["time generated"]).date() + if as_of_date >= most_recent_date: + return cur_hash + prev_hash = cur_metadata['previous hash'] + while prev_hash is not None: + prev_metadata = self.get_metadata(prev_hash) + prev_date = datetime.datetime.fromisoformat(prev_metadata["time generated"]).date() + if prev_date <= as_of_date <= most_recent_date: + return prev_hash + # iterate backwards in the link list one step + try: + prev_hash = prev_metadata['previous hash'] + most_recent_date = prev_date + except KeyError: + # Because we added the as_of after a while to this ETL prev_hash won't be 'None' we'll just run into an exception + raise DateOutOfRangeError( + "as_of data is earlier than earliest available hash") + + class AtcfDataset(IpfsDataset): dataset = "atcf_btk-seasonal" diff --git a/dweather_client/tests/test_client.py b/dweather_client/tests/test_client.py index 61dd8c0..664bc05 100644 --- a/dweather_client/tests/test_client.py +++ b/dweather_client/tests/test_client.py @@ -360,6 +360,11 @@ def test_historical_storms(): assert len(df_subset_box_na) < len(df_all_na) +def test_hist_storm_as_of(): + df_all_na = get_tropical_storms( + 'historical', 'NA', as_of=datetime.date(2023, 5, 20), ipfs_timeout=IPFS_TIMEOUT) # This will throw an exception if there's ever a break in the chain + + def test_yields(): df = pd.read_csv(StringIO(get_yield_history( "0041", "12", "073", ipfs_timeout=IPFS_TIMEOUT))) From dfe1acf0b7c55a1bb7f448031beffe5cb580195a Mon Sep 17 00:00:00 2001 From: PrettyHertz Date: Wed, 24 May 2023 11:37:38 -0500 Subject: [PATCH 15/18] Included error --- dweather_client/storms_datasets.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dweather_client/storms_datasets.py b/dweather_client/storms_datasets.py index e579901..449ca92 100644 --- a/dweather_client/storms_datasets.py +++ b/dweather_client/storms_datasets.py @@ -7,6 +7,8 @@ from dweather_client.df_utils import boxed_storms, nearby_storms from dweather_client.ipfs_queries import IpfsDataset +from dweather_client.ipfs_errors import * + def process_df(input_df, **kwargs): if {'radius', 'lat', 'lon'}.issubset(kwargs.keys()): From 27d96247c06a31eebbdc76da6474ced4902d1e22 Mon Sep 17 00:00:00 2001 From: kiran morrison Date: Mon, 3 Jul 2023 14:06:23 +0100 Subject: [PATCH 16/18] national-grid retrieval code --- dweather_client/client.py | 17 ++++++++++-- dweather_client/ipfs_queries.py | 39 +++++++++++++++++++++++++++- dweather_client/tests/test_client.py | 20 ++++++++++---- 3 files changed, 68 insertions(+), 8 deletions(-) diff --git a/dweather_client/client.py b/dweather_client/client.py index 6b54341..3be1053 100644 --- a/dweather_client/client.py +++ b/dweather_client/client.py @@ -19,7 +19,7 @@ from dweather_client import gridded_datasets from dweather_client.storms_datasets import IbtracsDataset, AtcfDataset, SimulatedStormsDataset from dweather_client.ipfs_queries import AustraliaBomStations, CedaBiomass, CmeStationsDataset, DutchStationsDataset, DwdStationsDataset, DwdHourlyStationsDataset, GlobalHourlyStationsDataset, JapanStations, StationDataset, EauFranceDataset,\ - YieldDatasets, FsaIrrigationDataset, AemoPowerDataset, AemoGasDataset, AesoPowerDataset, ForecastDataset, AfrDataset, DroughtMonitor, CwvStations, SpeedwellStations, TeleconnectionsDataset, CsvStationDataset, StationForecastDataset + YieldDatasets, FsaIrrigationDataset, AemoPowerDataset, AemoGasDataset, AesoPowerDataset, ForecastDataset, AfrDataset, DroughtMonitor, CwvStations, SpeedwellStations, TeleconnectionsDataset, CsvStationDataset, StationForecastDataset, SapStations from dweather_client.slice_utils import DateRangeRetriever, has_changed from dweather_client.ipfs_errors import * from io import StringIO @@ -771,6 +771,19 @@ def get_cwv_station_history(station_name, as_of=None, ipfs_timeout=None): return (resp_series * u.dimensionless_unscaled).to_dict() +def get_sap_station_history(as_of=None, ipfs_timeout=None): + """ + return: + dict with datetime keys and sap Quantities as values + """ + metadata = get_metadata(get_heads()["sap-daily"]) + with SapStations(ipfs_timeout=ipfs_timeout, as_of=as_of) as dataset_obj: + str_resp_series = dataset_obj.get_data() + resp_series = str_resp_series.astype(float) + # SAP uses financial units, best to return unscaled + return (resp_series * u.dimensionless_unscaled).to_dict() + + def get_australia_station_history(station_name, weather_variable, desired_units=None, as_of=None, ipfs_timeout=None): """ return: @@ -875,7 +888,7 @@ def get_teleconnections_history(weather_variable, ipfs_timeout=None): headers = next(reader) date_col = headers.index('DATE') try: - data_col=headers.index("value") + data_col = headers.index("value") except ValueError: raise WeatherVariableNotFoundError( "Invalid weather variable for this station") diff --git a/dweather_client/ipfs_queries.py b/dweather_client/ipfs_queries.py index a08afd0..69808b6 100644 --- a/dweather_client/ipfs_queries.py +++ b/dweather_client/ipfs_queries.py @@ -926,6 +926,43 @@ def extract_data_from_text(self, date_range, ipfs_hash, station_name): return data_dict +class SapStations(GriddedDataset): + """ + Instantiable class for Composite Weather Variable Station Data + """ + @property + def dataset(self): + return "sap-daily" + + @property + def data_file_format(self): + """ + format string requires station name eg 'EM' + """ + return "sap_update_UK.txt" + + def get_data(self): + super().get_data() + hashes = self.get_hashes() + ret_dict = {} + for h in hashes: + date_range = self.get_date_range_from_metadata(h) + new_dict = self.extract_data_from_text(date_range, h) + ret_dict = {**ret_dict, **new_dict} + return pd.Series(ret_dict) + + def extract_data_from_text(self, date_range, ipfs_hash): + byte_obj = self.get_file_object( + f"{ipfs_hash}/{self.data_file_format}") + data = byte_obj.read().decode("utf-8").split(",") + day_itr = date_range[0].date() + data_dict = {} + for point in data: + data_dict[day_itr] = point + day_itr += datetime.timedelta(days=1) + return data_dict + + class AustraliaBomStations(GriddedDataset): """ Instantiable class for Australia BOM Data @@ -1250,7 +1287,7 @@ def __init__(self, ipfs_timeout=None): def get_data(self, station): super().get_data() metadata = self.get_metadata(self.head) - + file_name = f"{self.head}/{station}.csv" return self.get_file_object(file_name).read().decode("utf-8") diff --git a/dweather_client/tests/test_client.py b/dweather_client/tests/test_client.py index 664bc05..911b409 100644 --- a/dweather_client/tests/test_client.py +++ b/dweather_client/tests/test_client.py @@ -3,7 +3,7 @@ from dweather_client.client import get_australia_station_history, get_station_history, get_gridcell_history, get_tropical_storms,\ get_yield_history, get_irrigation_data, get_power_history, get_gas_history, get_alberta_power_history, GRIDDED_DATASETS, has_dataset_updated,\ get_forecast_datasets, get_forecast, get_cme_station_history, get_european_station_history, get_hourly_station_history, get_drought_monitor_history, get_japan_station_history,\ - get_afr_history, get_cwv_station_history, get_teleconnections_history, get_station_forecast_history, get_station_forecast_stations, get_eaufrance_history + get_afr_history, get_cwv_station_history, get_teleconnections_history, get_station_forecast_history, get_station_forecast_stations, get_eaufrance_history, get_sap_station_history from dweather_client.aliases_and_units import snotel_to_ghcnd import pandas as pd from io import StringIO @@ -362,7 +362,7 @@ def test_historical_storms(): def test_hist_storm_as_of(): df_all_na = get_tropical_storms( - 'historical', 'NA', as_of=datetime.date(2023, 5, 20), ipfs_timeout=IPFS_TIMEOUT) # This will throw an exception if there's ever a break in the chain + 'historical', 'NA', as_of=datetime.date(2023, 5, 20), ipfs_timeout=IPFS_TIMEOUT) # This will throw an exception if there's ever a break in the chain def test_yields(): @@ -398,6 +398,12 @@ def test_cwv(): assert data[sorted(data)[0]].unit == u.dimensionless_unscaled +def test_sap(): + data = get_sap_station_history(ipfs_timeout=IPFS_TIMEOUT) + assert len(data) == (sorted(data)[-1] - sorted(data)[0]).days + 1 + assert data[sorted(data)[0]].unit == u.dimensionless_unscaled + + def test_australia(): data = get_australia_station_history( "Adelaide Airport", weather_variable="TMAX", ipfs_timeout=IPFS_TIMEOUT) @@ -489,13 +495,17 @@ def test_has_dataset_updated_false(): def test_forecast_station_history(): - history = get_station_forecast_history("cme_futures-daily", "D2", datetime.date(2023, 1, 31)) + history = get_station_forecast_history( + "cme_futures-daily", "D2", datetime.date(2023, 1, 31)) assert history[datetime.date(2023, 2, 28)] == 369.0 + def test_forecast_station_stations(): - stations = get_station_forecast_stations("cme_futures-daily", datetime.date(2023, 1, 31)) + stations = get_station_forecast_stations( + "cme_futures-daily", datetime.date(2023, 1, 31)) assert stations["features"][0]["properties"]["station name"] == "D2" + def test_eaufrance_station(): history = get_eaufrance_history("V720001002", "FLOWRATE") - assert history[datetime.date(2022,4,2)].value == 749 \ No newline at end of file + assert history[datetime.date(2022, 4, 2)].value == 749 From 1a0e8623710fde0e5fefc8550b349ee08aa4c48f Mon Sep 17 00:00:00 2001 From: eschechter Date: Fri, 22 Sep 2023 14:47:21 -0400 Subject: [PATCH 17/18] set gfs start time to be an hour after forecast date --- dweather_client/ipfs_queries.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dweather_client/ipfs_queries.py b/dweather_client/ipfs_queries.py index 69808b6..edc0075 100644 --- a/dweather_client/ipfs_queries.py +++ b/dweather_client/ipfs_queries.py @@ -1219,8 +1219,9 @@ def get_weather_dict(self, forecast_date, ipfs_hash, lat, lon): file_name = f"{forecast_date.strftime('%Y%m%d')}_{lat:.2f}_{lon:.2f}" with zi.open(file_name) as f: vals = f.read().decode("utf-8").split(',') + start_hour = 1 if "gfs" in self._dataset else 0 start_datetime = datetime.datetime( - forecast_date.year, forecast_date.month, forecast_date.day) + forecast_date.year, forecast_date.month, forecast_date.day, hour=start_hour) for i, val in enumerate(vals): ret[start_datetime + datetime.timedelta(hours=i*self._interval)] = val From 7dcb6842485f3bc644627ede12c2443d45c9d9d9 Mon Sep 17 00:00:00 2001 From: PrettyHertz Date: Fri, 20 Oct 2023 12:08:00 -0500 Subject: [PATCH 18/18] modified storm retrieval to allow for units column --- dweather_client/storms_datasets.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/dweather_client/storms_datasets.py b/dweather_client/storms_datasets.py index 449ca92..d5d061a 100644 --- a/dweather_client/storms_datasets.py +++ b/dweather_client/storms_datasets.py @@ -31,14 +31,20 @@ def get_data(self, basin, **kwargs): df = pd.read_csv( file_obj, na_values=["", " "], keep_default_na=False, low_memory=False, compression="gzip" ) + direction_row = df[0:1] # We remove the direction row when doing manipulations of the data df = df[1:] + direction_row["lat"] = direction_row["LAT"] + direction_row["lon"] = direction_row["LON"] + del direction_row["LAT"] + del direction_row["LON"] + df["lat"] = df.LAT.astype(float) df["lon"] = df.LON.astype(float) del df["LAT"] del df["LON"] - + processed_df = process_df(df, **kwargs) - + processed_df = pd.concat([direction_row, processed_df]).reset_index(drop = True) processed_df["HOUR"] = pd.to_datetime(processed_df["ISO_TIME"]) del processed_df["ISO_TIME"]