@@ -61,21 +61,48 @@ def __iter__(self) -> pd.DataFrame:
61
61
yield contiguous_time_periods
62
62
63
63
64
- def get_contiguous_t0_time_periods (
65
- contiguous_time_periods : pd .DataFrame , history_duration : timedelta , forecast_duration : timedelta
66
- ) -> pd .DataFrame :
67
- """Get all time periods which contain valid t0 datetimes.
64
+ @functional_datapipe ("get_contiguous_time_periods_nwp" )
65
+ class GetContiguousT0TimePeriodsNWPIterDataPipe (IterDataPipe ):
66
+ """Get contiguous NWP time periods for training"""
68
67
69
- `t0` is the datetime of the most recent observation.
68
+ def __init__ (
69
+ self ,
70
+ source_datapipe : IterDataPipe ,
71
+ history_duration : timedelta ,
72
+ max_staleness : timedelta = timedelta (minutes = 0 ),
73
+ max_dropout : timedelta = timedelta (minutes = 0 ),
74
+ time_dim : str = "init_time_utc" ,
75
+ ):
76
+ """
77
+ Get contiguous time periods for use in determing t0 times for training
70
78
71
- Returns:
72
- pd.DataFrame where each row represents a single time period. The pd.DataFrame
73
- has two columns: `start_dt` and `end_dt` (where 'dt' is short for 'datetime').
74
- """
75
- contiguous_time_periods ["start_dt" ] += history_duration
76
- contiguous_time_periods ["end_dt" ] -= forecast_duration
77
- assert (contiguous_time_periods ["start_dt" ] < contiguous_time_periods ["end_dt" ]).all ()
78
- return contiguous_time_periods
79
+ Args:
80
+ source_datapipe: Datapipe emitting a Xarray dataset
81
+ history_duration: Length of the historical slice used for a sample
82
+ max_staleness: Up to how long after an NWP forecast init_time are we willing to use the
83
+ forecast. Each init time will only be used up to this t0 time regardless of the
84
+ forecast valid time.
85
+ max_dropout: What is the maximum amount of dropout that will be used. This must be <=
86
+ max_staleness.
87
+ time_dim: time dimensions for which to find the contiguous time periods
88
+ """
89
+ self .source_datapipe = source_datapipe
90
+ self .history_duration = history_duration
91
+ self .max_staleness = max_staleness
92
+ self .max_dropout = max_dropout
93
+ self .time_dim = time_dim
94
+
95
+ def __iter__ (self ) -> pd .DataFrame :
96
+ """Calculate contiguous time periods and return a dataframe containing them"""
97
+ for xr_data in self .source_datapipe :
98
+ logger .debug ("Getting contiguous NWP t0 time periods" )
99
+ contiguous_time_periods = get_contiguous_t0_periods_nwp (
100
+ datetimes = pd .DatetimeIndex (xr_data [self .time_dim ]),
101
+ history_duration = self .history_duration ,
102
+ max_staleness = self .max_staleness ,
103
+ max_dropout = self .max_dropout ,
104
+ )
105
+ yield contiguous_time_periods
79
106
80
107
81
108
def get_contiguous_time_periods (
@@ -132,3 +159,75 @@ def get_contiguous_time_periods(
132
159
)
133
160
134
161
return pd .DataFrame (periods )
162
+
163
+
164
+ def get_contiguous_t0_time_periods (
165
+ contiguous_time_periods : pd .DataFrame , history_duration : timedelta , forecast_duration : timedelta
166
+ ) -> pd .DataFrame :
167
+ """Get all time periods which contain valid t0 datetimes.
168
+
169
+ `t0` is the datetime of the most recent observation.
170
+
171
+ Returns:
172
+ pd.DataFrame where each row represents a single time period. The pd.DataFrame
173
+ has two columns: `start_dt` and `end_dt` (where 'dt' is short for 'datetime').
174
+ """
175
+ contiguous_time_periods ["start_dt" ] += history_duration
176
+ contiguous_time_periods ["end_dt" ] -= forecast_duration
177
+ assert (contiguous_time_periods ["start_dt" ] < contiguous_time_periods ["end_dt" ]).all ()
178
+ return contiguous_time_periods
179
+
180
+
181
+ def get_contiguous_t0_periods_nwp (
182
+ datetimes : pd .DatetimeIndex ,
183
+ history_duration : timedelta ,
184
+ max_staleness : timedelta ,
185
+ max_dropout : timedelta = timedelta (0 ),
186
+ ) -> pd .DataFrame :
187
+ """Get all time periods from the NWP init times which are valid as t0 datetimes.
188
+
189
+ Args:
190
+ datetimes: Sorted pd.DatetimeIndex
191
+ history_duration: Length of the historical slice used for a sample
192
+ max_staleness: Up to how long after an NWP forecast init_time are we willing to use the
193
+ forecast. Each init time will only be used up to this t0 time regardless of the forecast
194
+ valid time.
195
+ max_dropout: What is the maximum amount of dropout that will be used. This must be <=
196
+ max_staleness.
197
+
198
+ Returns:
199
+ pd.DataFrame where each row represents a single time period. The pd.DataFrame
200
+ has two columns: `start_dt` and `end_dt` (where 'dt' is short for 'datetime').
201
+ """
202
+ # Sanity checks.
203
+ assert len (datetimes ) > 0
204
+ assert datetimes .is_monotonic_increasing
205
+ assert datetimes .is_unique
206
+ assert history_duration >= timedelta (0 )
207
+ assert max_staleness >= timedelta (0 )
208
+ assert max_dropout <= max_staleness
209
+
210
+ hist_drop_buffer = max (history_duration , max_dropout )
211
+
212
+ # Store contiguous periods
213
+ contiguous_periods = []
214
+
215
+ # Start first period allowing for history slice and max dropout
216
+ start_this_period = datetimes [0 ] + hist_drop_buffer
217
+
218
+ # The first forecast is valid up to the max staleness
219
+ end_this_period = datetimes [0 ] + max_staleness
220
+
221
+ for dt_init in datetimes [1 :]:
222
+ # If the previous init time becomes stale before the next init becomes valid whilst also
223
+ # considering dropout and the need for a historic period - then the contiguous period breaks
224
+ if end_this_period < dt_init + hist_drop_buffer :
225
+ contiguous_periods += [[start_this_period , end_this_period ]]
226
+
227
+ # And start a new period
228
+ start_this_period = dt_init + hist_drop_buffer
229
+ end_this_period = dt_init + max_staleness
230
+
231
+ contiguous_periods += [[start_this_period , end_this_period ]]
232
+
233
+ return pd .DataFrame (contiguous_periods , columns = ["start_dt" , "end_dt" ])
0 commit comments