From 71f52e44a8033628e0539be50b6339d1ba1b9dae Mon Sep 17 00:00:00 2001 From: James Fulton Date: Wed, 13 Dec 2023 14:51:21 +0000 Subject: [PATCH] remove unused live time functions --- ocf_datapipes/select/__init__.py | 2 - ocf_datapipes/select/select_live_t0_time.py | 30 -------- .../select/select_live_time_slice.py | 49 ------------- .../select/test_select_live_t0_time_slice.py | 69 ------------------- tests/select/test_select_live_time_slice.py | 69 ------------------- 5 files changed, 219 deletions(-) delete mode 100644 ocf_datapipes/select/select_live_t0_time.py delete mode 100644 ocf_datapipes/select/select_live_time_slice.py delete mode 100644 tests/select/test_select_live_t0_time_slice.py delete mode 100644 tests/select/test_select_live_time_slice.py diff --git a/ocf_datapipes/select/__init__.py b/ocf_datapipes/select/__init__.py index e31fb29aa..f7f089d06 100644 --- a/ocf_datapipes/select/__init__.py +++ b/ocf_datapipes/select/__init__.py @@ -10,8 +10,6 @@ from .select_channels import SelectChannelsIterDataPipe as SelectChannels from .select_gsp_ids import SelectGSPIDsIterDataPipe as SelectGSPIDs from .select_id import SelectIDIterDataPipe as SelectID -from .select_live_t0_time import SelectLiveT0TimeIterDataPipe as SelectLiveT0Time -from .select_live_time_slice import SelectLiveTimeSliceIterDataPipe as SelectLiveTimeSlice from .select_loc_and_t0 import LocationT0PickerIterDataPipe as LocationT0Picker from .select_overlapping_time_slices import ( SelectOverlappingTimeSliceIterDataPipe as SelectOverlappingTimeSlice, diff --git a/ocf_datapipes/select/select_live_t0_time.py b/ocf_datapipes/select/select_live_t0_time.py deleted file mode 100644 index 9c4966964..000000000 --- a/ocf_datapipes/select/select_live_t0_time.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Select the history for the live data""" -import pandas as pd -from torch.utils.data import IterDataPipe, functional_datapipe - - -@functional_datapipe("select_live_t0_time") -class SelectLiveT0TimeIterDataPipe(IterDataPipe): - """Select the history for the live data""" - - def __init__(self, source_datapipe: IterDataPipe, dim_name: str = "time_utc"): - """ - Select history for the Xarray object - - Args: - source_datapipe: Datapipe emitting Xarray objects - dim_name: The time dimension name to use - """ - self.source_datapipe = source_datapipe - self.dim_name = dim_name - - def __iter__(self) -> pd.Timestamp: - """Get the latest timestamp and return it""" - for xr_data in self.source_datapipe: - # Get most recent time in data - # Select the history that goes back that far - latest_time_idx = pd.DatetimeIndex(xr_data[self.dim_name].values).get_indexer( - [pd.Timestamp.now(tz=None)], method="pad" - )[0] - latest_time = xr_data[self.dim_name].values[latest_time_idx] - yield latest_time diff --git a/ocf_datapipes/select/select_live_time_slice.py b/ocf_datapipes/select/select_live_time_slice.py deleted file mode 100644 index e95e63b8e..000000000 --- a/ocf_datapipes/select/select_live_time_slice.py +++ /dev/null @@ -1,49 +0,0 @@ -"""Select the history for the live data""" -import logging -from datetime import timedelta -from typing import Union - -import numpy as np -import xarray as xr -from torch.utils.data import IterDataPipe, functional_datapipe - -from ocf_datapipes.utils import Zipper - -logger = logging.getLogger(__name__) - - -@functional_datapipe("select_live_time_slice") -class SelectLiveTimeSliceIterDataPipe(IterDataPipe): - """Select the history for the live data""" - - def __init__( - self, - source_datapipe: IterDataPipe, - t0_datapipe: IterDataPipe, - history_duration: timedelta, - dim_name: str = "time_utc", - ): - """ - Select the history for the live time slice - - Args: - source_datapipe: Datapipe emitting Xarray object - t0_datapipe: Datapipe emitting t0 timestamps - history_duration: Amount of time for the history - dim_name: Time dimension name - """ - self.source_datapipe = source_datapipe - self.t0_datapipe = t0_datapipe - self.history_duration = np.timedelta64(history_duration) - self.dim_name = dim_name - - def __iter__(self) -> Union[xr.DataArray, xr.Dataset]: - """Select the recent live data""" - for xr_data, t0 in Zipper(self.source_datapipe, self.t0_datapipe): - logger.debug(f"Selecting time slice {t0} on dim {self.dim_name}") - - xr_data = xr_data.sel({self.dim_name: slice(t0 - self.history_duration, t0)}) - - logger.debug(f"Took slice of length {len(getattr(xr_data,self.dim_name))}") - - yield xr_data diff --git a/tests/select/test_select_live_t0_time_slice.py b/tests/select/test_select_live_t0_time_slice.py deleted file mode 100644 index dc0493167..000000000 --- a/tests/select/test_select_live_t0_time_slice.py +++ /dev/null @@ -1,69 +0,0 @@ -from datetime import timedelta - -from ocf_datapipes.select import SelectLiveT0Time, SelectLiveTimeSlice -from ocf_datapipes.transform.xarray import ConvertToNWPTargetTime - - -def test_select_hrv(sat_hrv_datapipe): - time_len = len(next(iter(sat_hrv_datapipe)).time_utc.values) - t0_datapipe = SelectLiveT0Time(sat_hrv_datapipe) - sat_hrv_datapipe = SelectLiveTimeSlice( - sat_hrv_datapipe, - t0_datapipe=t0_datapipe, - history_duration=timedelta(hours=1), - ) - data = next(iter(sat_hrv_datapipe)) - assert len(data.time_utc.values) == 13 - assert len(data.time_utc.values) < time_len - - -def test_select_gsp(gsp_datapipe): - time_len = len(next(iter(gsp_datapipe)).time_utc.values) - t0_datapipe = SelectLiveT0Time(gsp_datapipe) - gsp_datapipe = SelectLiveTimeSlice( - gsp_datapipe, - t0_datapipe=t0_datapipe, - history_duration=timedelta(hours=2), - ) - data = next(iter(gsp_datapipe)) - assert len(data.time_utc.values) == 5 - assert len(data.time_utc.values) < time_len - - -def test_select_nwp(nwp_datapipe): - t0_datapipe = SelectLiveT0Time(nwp_datapipe, dim_name="init_time_utc") - nwp_datapipe = ConvertToNWPTargetTime( - nwp_datapipe, - t0_datapipe=t0_datapipe, - sample_period_duration=timedelta(hours=1), - history_duration=timedelta(hours=2), - forecast_duration=timedelta(hours=3), - ) - data = next(iter(nwp_datapipe)) - assert len(data.target_time_utc.values) == 6 - - -def test_select_passiv(passiv_datapipe): - time_len = len(next(iter(passiv_datapipe)).time_utc.values) - t0_datapipe = SelectLiveT0Time(passiv_datapipe) - passiv_datapipe = SelectLiveTimeSlice( - passiv_datapipe, - t0_datapipe=t0_datapipe, - history_duration=timedelta(hours=1), - ) - data = next(iter(passiv_datapipe)) - assert len(data.time_utc.values) == 13 - assert len(data.time_utc.values) < time_len - - -def test_select_pvoutput(pvoutput_datapipe): - time_len = len(next(iter(pvoutput_datapipe)).time_utc.values) - t0_datapipe = SelectLiveT0Time(pvoutput_datapipe, dim_name="time_utc") - pvoutput_datapipe = SelectLiveTimeSlice( - pvoutput_datapipe, - t0_datapipe=t0_datapipe, - history_duration=timedelta(hours=1), - ) - data = next(iter(pvoutput_datapipe)) - assert len(data.time_utc.values) == 13 - assert len(data.time_utc.values) < time_len diff --git a/tests/select/test_select_live_time_slice.py b/tests/select/test_select_live_time_slice.py deleted file mode 100644 index 9ad4768ba..000000000 --- a/tests/select/test_select_live_time_slice.py +++ /dev/null @@ -1,69 +0,0 @@ -from datetime import timedelta - -from ocf_datapipes.select import SelectLiveT0Time, SelectLiveTimeSlice -from ocf_datapipes.transform.xarray import ConvertToNWPTargetTime - - -def test_select_hrv(sat_hrv_datapipe): - time_len = len(next(iter(sat_hrv_datapipe)).time_utc.values) - t0_datapipe = SelectLiveT0Time(sat_hrv_datapipe, dim_name="time_utc") - sat_hrv_datapipe = SelectLiveTimeSlice( - sat_hrv_datapipe, history_duration=timedelta(minutes=60), t0_datapipe=t0_datapipe - ) - data = next(iter(sat_hrv_datapipe)) - assert len(data.time_utc.values) == 13 - assert len(data.time_utc.values) < time_len - - -def test_select_gsp(gsp_datapipe): - time_len = len(next(iter(gsp_datapipe)).time_utc.values) - t0_datapipe = SelectLiveT0Time(gsp_datapipe, dim_name="time_utc") - gsp_datapipe = SelectLiveTimeSlice( - gsp_datapipe, history_duration=timedelta(minutes=120), t0_datapipe=t0_datapipe - ) - data = next(iter(gsp_datapipe)) - assert len(data.time_utc.values) == 5 - assert len(data.time_utc.values) < time_len - - -def test_select_nwp(nwp_datapipe): - t0_datapipe = SelectLiveT0Time(nwp_datapipe, dim_name="init_time_utc") - nwp_datapipe = ConvertToNWPTargetTime( - nwp_datapipe, - t0_datapipe, - sample_period_duration=timedelta(hours=1), - history_duration=timedelta(hours=2), - forecast_duration=timedelta(hours=4), - ) - time_len = len(next(iter(nwp_datapipe)).target_time_utc.values) - nwp_datapipe = SelectLiveTimeSlice( - nwp_datapipe, - t0_datapipe=t0_datapipe, - history_duration=timedelta(minutes=120), - dim_name="target_time_utc", - ) - data = next(iter(nwp_datapipe)) - assert len(data.target_time_utc.values) == 3 - assert len(data.target_time_utc.values) < time_len - - -def test_select_passiv(passiv_datapipe): - time_len = len(next(iter(passiv_datapipe)).time_utc.values) - t0_datapipe = SelectLiveT0Time(passiv_datapipe, dim_name="time_utc") - passiv_datapipe = SelectLiveTimeSlice( - passiv_datapipe, history_duration=timedelta(minutes=60), t0_datapipe=t0_datapipe - ) - data = next(iter(passiv_datapipe)) - assert len(data.time_utc.values) == 13 - assert len(data.time_utc.values) < time_len - - -def test_select_pvoutput(pvoutput_datapipe): - time_len = len(next(iter(pvoutput_datapipe)).time_utc.values) - t0_datapipe = SelectLiveT0Time(pvoutput_datapipe, dim_name="time_utc") - pvoutput_datapipe = SelectLiveTimeSlice( - pvoutput_datapipe, history_duration=timedelta(minutes=60), t0_datapipe=t0_datapipe - ) - data = next(iter(pvoutput_datapipe)) - assert len(data.time_utc.values) == 13 - assert len(data.time_utc.values) < time_len