diff --git a/dweather_client/client.py b/dweather_client/client.py index c9d75bc..3be1053 100644 --- a/dweather_client/client.py +++ b/dweather_client/client.py @@ -9,7 +9,8 @@ from dweather_client.struct_utils import tupleify, convert_nans_to_none import datetime import pytz -import csv, json +import csv +import json import inspect import numpy as np import pandas as pd @@ -18,7 +19,7 @@ from dweather_client import gridded_datasets from dweather_client.storms_datasets import IbtracsDataset, AtcfDataset, SimulatedStormsDataset from dweather_client.ipfs_queries import AustraliaBomStations, CedaBiomass, CmeStationsDataset, DutchStationsDataset, DwdStationsDataset, DwdHourlyStationsDataset, GlobalHourlyStationsDataset, JapanStations, StationDataset, EauFranceDataset,\ - YieldDatasets, FsaIrrigationDataset, AemoPowerDataset, AemoGasDataset, AesoPowerDataset, ForecastDataset, AfrDataset, DroughtMonitor, CwvStations, SpeedwellStations, TeleconnectionsDataset, CsvStationDataset, StationForecastDataset + YieldDatasets, FsaIrrigationDataset, AemoPowerDataset, AemoGasDataset, AesoPowerDataset, ForecastDataset, AfrDataset, DroughtMonitor, CwvStations, SpeedwellStations, TeleconnectionsDataset, CsvStationDataset, StationForecastDataset, SapStations from dweather_client.slice_utils import DateRangeRetriever, has_changed from dweather_client.ipfs_errors import * from io import StringIO @@ -246,6 +247,7 @@ def get_tropical_storms( min_lon=None, max_lat=None, max_lon=None, + as_of=None, ipfs_timeout=None): """ return: @@ -288,11 +290,11 @@ def get_tropical_storms( with cm as storm_getter: if radius: - return storm_getter.get_data(basin, radius=radius, lat=lat, lon=lon) + return storm_getter.get_data(basin, radius=radius, lat=lat, lon=lon, as_of=as_of) elif min_lat: - return storm_getter.get_data(basin, min_lat=min_lat, min_lon=min_lon, max_lat=max_lat, max_lon=max_lon) + return storm_getter.get_data(basin, min_lat=min_lat, min_lon=min_lon, max_lat=max_lat, max_lon=max_lon, as_of=as_of) else: - return storm_getter.get_data(basin) + return storm_getter.get_data(basin, as_of=as_of) def get_station_history( @@ -485,6 +487,7 @@ def get_hourly_station_history(dataset, station_id, weather_variable, use_imperi v) for k, v in final_resp_series.to_dict().items()} return result + def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_units=True, desired_units=None, ipfs_timeout=None): """ This is almost an exact copy of get_hourly_station_history @@ -543,14 +546,26 @@ def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_ "Invalid weather variable for this station") try: - if dataset == "inmet_brazil-hourly": + # RawSet style where we only want the most recent file + if dataset in ["inmet_brazil-hourly"]: with CsvStationDataset(dataset=dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: - csv_text = dataset_obj.get_data(station_id, weather_variable) + csv_text_list = [dataset_obj.get_data( + station_id, weather_variable)] + # ClimateSet style where we need the entire linked list history + elif dataset in ["ne_iso-hourly"]: + with CsvStationDataset(dataset=dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: + csv_text_list = dataset_obj.get_data_recursive( + station_id, weather_variable) else: raise DatasetError("No such dataset in dClimate") except ipfshttpclient.exceptions.ErrorResponse: raise StationNotFoundError("Invalid station ID for dataset") - df = pd.read_csv(StringIO(csv_text)) + + # concat together all retrieved station csv texts + dfs = [] + for csv_text in csv_text_list: + dfs.append(pd.read_csv(StringIO(csv_text))) + df = pd.concat(dfs, ignore_index=True) str_resp_series = df[column_name].astype(str) df = df.set_index("dt") if desired_units: @@ -579,8 +594,9 @@ def get_csv_station_history(dataset, station_id, weather_variable, use_imperial_ v) for k, v in final_resp_series.to_dict().items()} return result + def get_station_forecast_history(dataset, station_id, forecast_date, desired_units=None, ipfs_timeout=None): - try: + try: with StationForecastDataset(dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: csv_text = dataset_obj.get_data(station_id, forecast_date) history = {} @@ -588,9 +604,11 @@ def get_station_forecast_history(dataset, station_id, forecast_date, desired_uni headers = next(reader) date_col = headers.index('DATE') try: # Make sure weather variable is correct. - data_col = headers.index("SETT") #at the moment the only variable is "SETT" + # at the moment the only variable is "SETT" + data_col = headers.index("SETT") except ValueError: - raise WeatherVariableNotFoundError("Invalid weather variable for this station") + raise WeatherVariableNotFoundError( + "Invalid weather variable for this station") for row in reader: try: if not row: @@ -599,11 +617,12 @@ def get_station_forecast_history(dataset, station_id, forecast_date, desired_uni row[date_col], "%Y-%m-%d").date()] = float(row[data_col]) except ValueError: history[datetime.datetime.strptime( - row[date_col], "%Y-%m-%d").date()] = row[data_col] + row[date_col], "%Y-%m-%d").date()] = row[data_col] return history except ipfshttpclient.exceptions.ErrorResponse: raise StationNotFoundError("Invalid station ID for dataset") + def get_station_forecast_stations(dataset, forecast_date, desired_units=None, ipfs_timeout=None): with StationForecastDataset(dataset, ipfs_timeout=ipfs_timeout) as dataset_obj: csv_text = dataset_obj.get_stations(forecast_date) @@ -752,6 +771,19 @@ def get_cwv_station_history(station_name, as_of=None, ipfs_timeout=None): return (resp_series * u.dimensionless_unscaled).to_dict() +def get_sap_station_history(as_of=None, ipfs_timeout=None): + """ + return: + dict with datetime keys and sap Quantities as values + """ + metadata = get_metadata(get_heads()["sap-daily"]) + with SapStations(ipfs_timeout=ipfs_timeout, as_of=as_of) as dataset_obj: + str_resp_series = dataset_obj.get_data() + resp_series = str_resp_series.astype(float) + # SAP uses financial units, best to return unscaled + return (resp_series * u.dimensionless_unscaled).to_dict() + + def get_australia_station_history(station_name, weather_variable, desired_units=None, as_of=None, ipfs_timeout=None): """ return: @@ -847,15 +879,16 @@ def has_dataset_updated(dataset, slices, as_of, ipfs_timeout=None): ranges = dataset_obj.get_data(as_of) return has_changed(slices, ranges) + def get_teleconnections_history(weather_variable, ipfs_timeout=None): with TeleconnectionsDataset(ipfs_timeout=ipfs_timeout) as dataset_obj: - csv_text = dataset_obj.get_data() + csv_text = dataset_obj.get_data(weather_variable) history = {} reader = csv.reader(csv_text.split('\n')) headers = next(reader) date_col = headers.index('DATE') - try: # Make sure weather variable is correct. - data_col = headers.index(weather_variable) + try: + data_col = headers.index("value") except ValueError: raise WeatherVariableNotFoundError( "Invalid weather variable for this station") @@ -874,6 +907,7 @@ def get_teleconnections_history(weather_variable, ipfs_timeout=None): row[date_col], "%Y-%m-%d").date()] = row[data_col] return history + def get_eaufrance_history(station, weather_variable, use_imperial_units=False, desired_units=None, ipfs_timeout=None): try: with EauFranceDataset(ipfs_timeout=ipfs_timeout) as dataset_obj: @@ -892,7 +926,8 @@ def get_eaufrance_history(station, weather_variable, use_imperial_units=False, d converted_resp_series = pd.Series( converter(df[weather_variable].values*dweather_unit), index=df.index) except ValueError: - raise UnitError(f"Specified unit is incompatible with original, original units are {original_units} and requested units are {desired_units}") + raise UnitError( + f"Specified unit is incompatible with original, original units are {original_units} and requested units are {desired_units}") if desired_units is not None: rounded_resp_array = np.vectorize(rounding_formula_temperature)( str_resp_series, converted_resp_series) @@ -908,4 +943,3 @@ def get_eaufrance_history(station, weather_variable, use_imperial_units=False, d return result except ipfshttpclient.exceptions.ErrorResponse: raise StationNotFoundError("Invalid station ID for dataset") - \ No newline at end of file diff --git a/dweather_client/ipfs_queries.py b/dweather_client/ipfs_queries.py index ad97e58..edc0075 100644 --- a/dweather_client/ipfs_queries.py +++ b/dweather_client/ipfs_queries.py @@ -657,6 +657,13 @@ def __init__(self, dataset, ipfs_timeout=None): super().__init__(ipfs_timeout=ipfs_timeout) self._dataset = dataset + def get_hashes(self): + """ + return: list of all hashes in dataset + """ + hashes = self.traverse_ll(self.head, self.as_of) + return list(hashes) + def get_data(self, station, weather_variable=None): # only some stations need weather variable # so this is an optional arg @@ -664,6 +671,20 @@ def get_data(self, station, weather_variable=None): file_name = f"{self.head}/{station}.csv" return self.get_file_object(file_name).read().decode("utf-8") + def get_data_recursive(self, station, weather_variable=None): + # only some stations need weather variable + # so this is an optional arg + super().get_data() + # get all hashes and then effectively just use get_data + # recursively to get a full list of csvs + hashes = self.get_hashes() + csv_text_list = [] + for hash_ in hashes: + file_name = f"{hash_}/{station}.csv" + csv_text_list.append(self.get_file_object( + file_name).read().decode("utf-8")) + return csv_text_list + class YieldDatasets(IpfsDataset): """ @@ -905,6 +926,43 @@ def extract_data_from_text(self, date_range, ipfs_hash, station_name): return data_dict +class SapStations(GriddedDataset): + """ + Instantiable class for Composite Weather Variable Station Data + """ + @property + def dataset(self): + return "sap-daily" + + @property + def data_file_format(self): + """ + format string requires station name eg 'EM' + """ + return "sap_update_UK.txt" + + def get_data(self): + super().get_data() + hashes = self.get_hashes() + ret_dict = {} + for h in hashes: + date_range = self.get_date_range_from_metadata(h) + new_dict = self.extract_data_from_text(date_range, h) + ret_dict = {**ret_dict, **new_dict} + return pd.Series(ret_dict) + + def extract_data_from_text(self, date_range, ipfs_hash): + byte_obj = self.get_file_object( + f"{ipfs_hash}/{self.data_file_format}") + data = byte_obj.read().decode("utf-8").split(",") + day_itr = date_range[0].date() + data_dict = {} + for point in data: + data_dict[day_itr] = point + day_itr += datetime.timedelta(days=1) + return data_dict + + class AustraliaBomStations(GriddedDataset): """ Instantiable class for Australia BOM Data @@ -1111,7 +1169,8 @@ def get_full_date_range_from_metadata(self, h): return: list of [start_time, end_time] """ metadata = self.get_metadata(h) - str_dates = (metadata["api documentation"]["full date range"][0], metadata["api documentation"]["full date range"][1]) + str_dates = (metadata["api documentation"]["full date range"] + [0], metadata["api documentation"]["full date range"][1]) return [datetime.datetime.fromisoformat(dt).date() for dt in str_dates] def get_relevant_hash(self, forecast_date): @@ -1142,10 +1201,13 @@ def get_relevant_hash(self, forecast_date): d) for d in prev_metadata["date range"]] if prev_date_range[0] <= forecast_date <= prev_date_range[1]: return prev_hash - prev_hash = prev_metadata['previous hash'] # iterate backwards in the link list one step + # iterate backwards in the link list one step + prev_hash = prev_metadata['previous hash'] # If this script runs to the end without returning anything or an error, the forecast date must fall in a hole in the data - raise DateOutOfRangeError("forecast date unavailable due to holes in data") # NOTE only returns if there are holes in the data + # NOTE only returns if there are holes in the data + raise DateOutOfRangeError( + "forecast date unavailable due to holes in data") def get_weather_dict(self, forecast_date, ipfs_hash, lat, lon): """ @@ -1157,8 +1219,9 @@ def get_weather_dict(self, forecast_date, ipfs_hash, lat, lon): file_name = f"{forecast_date.strftime('%Y%m%d')}_{lat:.2f}_{lon:.2f}" with zi.open(file_name) as f: vals = f.read().decode("utf-8").split(',') + start_hour = 1 if "gfs" in self._dataset else 0 start_datetime = datetime.datetime( - forecast_date.year, forecast_date.month, forecast_date.day) + forecast_date.year, forecast_date.month, forecast_date.day, hour=start_hour) for i, val in enumerate(vals): ret[start_datetime + datetime.timedelta(hours=i*self._interval)] = val @@ -1190,6 +1253,7 @@ def get_data(self, lat, lon, forecast_date): return (float(ret_lat), float(ret_lon)), pd.Series(weather_dict) + class StationForecastDataset(ForecastDataset): """ Instantiable class for pulling in station data that is also forecast data. @@ -1198,11 +1262,11 @@ class StationForecastDataset(ForecastDataset): @property def dataset(self): return self._dataset - + def __init__(self, dataset, **kwargs): super().__init__(dataset, 1) self.head = get_heads()[self.dataset] - + def get_data(self, station, forecast_date): relevant_hash = self.get_relevant_hash(forecast_date) return self.get_file_object(f"{relevant_hash}/{station}.csv").read().decode("utf-8") @@ -1211,6 +1275,7 @@ def get_stations(self, forecast_date): relevant_hash = self.get_relevant_hash(forecast_date) return self.get_file_object(f"{relevant_hash}/stations.json").read().decode("utf-8") + class TeleconnectionsDataset(IpfsDataset): """ Instantiable class used for pulling in el nino teleconnections data @@ -1220,13 +1285,14 @@ class TeleconnectionsDataset(IpfsDataset): def __init__(self, ipfs_timeout=None): super().__init__(ipfs_timeout=ipfs_timeout) - def get_data(self): + def get_data(self, station): super().get_data() metadata = self.get_metadata(self.head) - year_month = metadata["time generated"][:7] - file_name = f"{self.head}/teleconnections_{year_month}.csv" + + file_name = f"{self.head}/{station}.csv" return self.get_file_object(file_name).read().decode("utf-8") + class EauFranceDataset(IpfsDataset): """ Instantiable class used for pulling in el nino teleconnections data @@ -1240,4 +1306,4 @@ def get_data(self, station): super().get_data() metadata = self.get_metadata(self.head) file_name = f"{self.head}/{station}.csv" - return self.get_file_object(file_name).read().decode("utf-8") \ No newline at end of file + return self.get_file_object(file_name).read().decode("utf-8") diff --git a/dweather_client/storms_datasets.py b/dweather_client/storms_datasets.py index 86b122f..d5d061a 100644 --- a/dweather_client/storms_datasets.py +++ b/dweather_client/storms_datasets.py @@ -1,11 +1,14 @@ import gzip import json +import datetime from abc import abstractmethod import pandas as pd from dweather_client.df_utils import boxed_storms, nearby_storms from dweather_client.ipfs_queries import IpfsDataset +from dweather_client.ipfs_errors import * + def process_df(input_df, **kwargs): if {'radius', 'lat', 'lon'}.issubset(kwargs.keys()): @@ -23,23 +26,58 @@ def get_data(self, basin, **kwargs): if basin not in {'NI', 'SI', 'NA', 'EP', 'WP', 'SP', 'SA'}: raise ValueError("Invalid basin ID") super().get_data() - file_obj = self.get_file_object(f"{self.head}/ibtracs-{basin}.csv.gz") + ipfs_hash = self.get_relevant_hash(kwargs["as_of"]) + file_obj = self.get_file_object(f"{ipfs_hash}/ibtracs-{basin}.csv.gz") df = pd.read_csv( file_obj, na_values=["", " "], keep_default_na=False, low_memory=False, compression="gzip" ) + direction_row = df[0:1] # We remove the direction row when doing manipulations of the data df = df[1:] + direction_row["lat"] = direction_row["LAT"] + direction_row["lon"] = direction_row["LON"] + del direction_row["LAT"] + del direction_row["LON"] + df["lat"] = df.LAT.astype(float) df["lon"] = df.LON.astype(float) del df["LAT"] del df["LON"] - + processed_df = process_df(df, **kwargs) - + processed_df = pd.concat([direction_row, processed_df]).reset_index(drop = True) processed_df["HOUR"] = pd.to_datetime(processed_df["ISO_TIME"]) del processed_df["ISO_TIME"] return processed_df + def get_relevant_hash(self, as_of_date): + """ + return the ipfs hash required to pull in data for a forecast date + """ + cur_hash = self.head + if as_of_date == None: + return cur_hash + cur_metadata = self.get_metadata(cur_hash) + # This routine is agnostic to the order of data contained in the hashes (at a cost of inefficiency) -- if the data contains the forecast date, it WILL be found, eventually + most_recent_date = datetime.datetime.fromisoformat(cur_metadata["time generated"]).date() + if as_of_date >= most_recent_date: + return cur_hash + prev_hash = cur_metadata['previous hash'] + while prev_hash is not None: + prev_metadata = self.get_metadata(prev_hash) + prev_date = datetime.datetime.fromisoformat(prev_metadata["time generated"]).date() + if prev_date <= as_of_date <= most_recent_date: + return prev_hash + # iterate backwards in the link list one step + try: + prev_hash = prev_metadata['previous hash'] + most_recent_date = prev_date + except KeyError: + # Because we added the as_of after a while to this ETL prev_hash won't be 'None' we'll just run into an exception + raise DateOutOfRangeError( + "as_of data is earlier than earliest available hash") + + class AtcfDataset(IpfsDataset): dataset = "atcf_btk-seasonal" diff --git a/dweather_client/tests/test_client.py b/dweather_client/tests/test_client.py index 61dd8c0..911b409 100644 --- a/dweather_client/tests/test_client.py +++ b/dweather_client/tests/test_client.py @@ -3,7 +3,7 @@ from dweather_client.client import get_australia_station_history, get_station_history, get_gridcell_history, get_tropical_storms,\ get_yield_history, get_irrigation_data, get_power_history, get_gas_history, get_alberta_power_history, GRIDDED_DATASETS, has_dataset_updated,\ get_forecast_datasets, get_forecast, get_cme_station_history, get_european_station_history, get_hourly_station_history, get_drought_monitor_history, get_japan_station_history,\ - get_afr_history, get_cwv_station_history, get_teleconnections_history, get_station_forecast_history, get_station_forecast_stations, get_eaufrance_history + get_afr_history, get_cwv_station_history, get_teleconnections_history, get_station_forecast_history, get_station_forecast_stations, get_eaufrance_history, get_sap_station_history from dweather_client.aliases_and_units import snotel_to_ghcnd import pandas as pd from io import StringIO @@ -360,6 +360,11 @@ def test_historical_storms(): assert len(df_subset_box_na) < len(df_all_na) +def test_hist_storm_as_of(): + df_all_na = get_tropical_storms( + 'historical', 'NA', as_of=datetime.date(2023, 5, 20), ipfs_timeout=IPFS_TIMEOUT) # This will throw an exception if there's ever a break in the chain + + def test_yields(): df = pd.read_csv(StringIO(get_yield_history( "0041", "12", "073", ipfs_timeout=IPFS_TIMEOUT))) @@ -393,6 +398,12 @@ def test_cwv(): assert data[sorted(data)[0]].unit == u.dimensionless_unscaled +def test_sap(): + data = get_sap_station_history(ipfs_timeout=IPFS_TIMEOUT) + assert len(data) == (sorted(data)[-1] - sorted(data)[0]).days + 1 + assert data[sorted(data)[0]].unit == u.dimensionless_unscaled + + def test_australia(): data = get_australia_station_history( "Adelaide Airport", weather_variable="TMAX", ipfs_timeout=IPFS_TIMEOUT) @@ -484,13 +495,17 @@ def test_has_dataset_updated_false(): def test_forecast_station_history(): - history = get_station_forecast_history("cme_futures-daily", "D2", datetime.date(2023, 1, 31)) + history = get_station_forecast_history( + "cme_futures-daily", "D2", datetime.date(2023, 1, 31)) assert history[datetime.date(2023, 2, 28)] == 369.0 + def test_forecast_station_stations(): - stations = get_station_forecast_stations("cme_futures-daily", datetime.date(2023, 1, 31)) + stations = get_station_forecast_stations( + "cme_futures-daily", datetime.date(2023, 1, 31)) assert stations["features"][0]["properties"]["station name"] == "D2" + def test_eaufrance_station(): history = get_eaufrance_history("V720001002", "FLOWRATE") - assert history[datetime.date(2022,4,2)].value == 749 \ No newline at end of file + assert history[datetime.date(2022, 4, 2)].value == 749 diff --git a/requirements.txt b/requirements.txt index 822e4ad..20f34db 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ ipfshttpclient==0.8.0a2 -pandas>=1.1.4 +pandas>=1.1.4, < 2.0.0 pytest>=6.1.2 pytest-mock==3.6.1 requests