Skip to content

Commit 6306c5e

Browse files
shoyerXarray-Beam authors
authored andcommitted
[xarray_beam] Add a docs on interfacing with Beam transforms
PiperOrigin-RevId: 814797000
1 parent e60b6ad commit 6306c5e

File tree

1 file changed

+63
-15
lines changed

1 file changed

+63
-15
lines changed

docs/high-level.ipynb

Lines changed: 63 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
"xbeam_ds"
6464
],
6565
"outputs": [],
66-
"execution_count": 2
66+
"execution_count": 1
6767
},
6868
{
6969
"metadata": {
@@ -83,7 +83,7 @@
8383
"xarray_ds.chunk(chunks).to_zarr('example_data.zarr', mode='w')"
8484
],
8585
"outputs": [],
86-
"execution_count": 3
86+
"execution_count": 2
8787
},
8888
{
8989
"metadata": {
@@ -186,7 +186,7 @@
186186
"xarray.open_zarr('example_climatology.zarr')"
187187
],
188188
"outputs": [],
189-
"execution_count": 6
189+
"execution_count": 3
190190
},
191191
{
192192
"metadata": {
@@ -215,7 +215,7 @@
215215
"xarray.open_zarr('example_regrid.zarr')"
216216
],
217217
"outputs": [],
218-
"execution_count": 7
218+
"execution_count": 4
219219
},
220220
{
221221
"metadata": {
@@ -245,7 +245,7 @@
245245
" print(f'{type(e).__name__}: {e}')"
246246
],
247247
"outputs": [],
248-
"execution_count": 8
248+
"execution_count": 5
249249
},
250250
{
251251
"metadata": {
@@ -262,19 +262,71 @@
262262
},
263263
"cell_type": "code",
264264
"source": [
265-
"ds_beam = xbeam.Dataset.from_zarr('example_data.zarr')\n",
266-
"ds_beam.map_blocks(lambda ds: ds.compute(), template=ds_beam.template)"
265+
"(\n",
266+
" xbeam.Dataset.from_zarr('example_data.zarr')\n",
267+
" .map_blocks(lambda ds: ds.compute(), template=ds_beam.template)\n",
268+
")"
267269
],
268270
"outputs": [],
269-
"execution_count": 9
271+
"execution_count": 6
272+
},
273+
{
274+
"metadata": {
275+
"id": "-U4t0kKIkDvb"
276+
},
277+
"cell_type": "markdown",
278+
"source": [
279+
"## Interfacing with Beam transforms"
280+
]
270281
},
271282
{
272283
"metadata": {
273284
"id": "75IG-22cKcuE"
274285
},
275286
"cell_type": "markdown",
276287
"source": [
277-
"Sometimes, your computation doesn't fit into the ``map_blocks`` paradigm because you don't want to create `xarray.Dataset` objects. For these cases, you can switch to the lower-level Xarray-Beam [data model](data-model), and use raw Beam operations:"
288+
"`Dataset` is a thin wrapper around Xarray-Beam transformations, so you can always drop into the lower-level Xarray-Beam [data model](data-model) and use raw Beam operations. This is especially useful for the reading or writing data.\n",
289+
"\n",
290+
"For example, here's how you could manually recreate a `Dataset`, using the common pattern of evaluating a single example in-memory to create a template with {py:func}`~xarray_beam.make_template` and {py:func}`~xarray_beam.replace_template_dims`:"
291+
]
292+
},
293+
{
294+
"metadata": {
295+
"id": "l9pHS1QDlMd-"
296+
},
297+
"cell_type": "code",
298+
"source": [
299+
"all_times = pd.date_range('2025-01-01', freq='1D', periods=365)\n",
300+
"source_dataset = xarray.open_zarr('example_data.zarr', chunks=None)\n",
301+
"\n",
302+
"def load_chunk(time: pd.Timestamp) -\u003e tuple[xbeam.Key, xarray.Dataset]:\n",
303+
" key = xbeam.Key({'time': (time - all_times[0]).days})\n",
304+
" dataset = source_dataset.sel(time=[time])\n",
305+
" return key, dataset\n",
306+
"\n",
307+
"_, example = load_chunk(all_times[0])\n",
308+
"\n",
309+
"template = xbeam.make_template(example)\n",
310+
"template = xbeam.replace_template_dims(template, time=all_times)\n",
311+
"\n",
312+
"ds_beam = xbeam.Dataset(\n",
313+
" template=template,\n",
314+
" chunks=xbeam.normalize_chunks({'time': 1}, template),\n",
315+
" split_vars=False,\n",
316+
" ptransform=(beam.Create(all_times) | beam.Map(load_chunk)),\n",
317+
")\n",
318+
"ds_beam"
319+
],
320+
"outputs": [],
321+
"execution_count": 12
322+
},
323+
{
324+
"metadata": {
325+
"id": "1qjeY5mwlLGJ"
326+
},
327+
"cell_type": "markdown",
328+
"source": [
329+
"You can also pull-out the underlying Beam `ptransform` from a dataset to append new transformations, e.g., to write each element of the pipeline to disk as a separate file:"
278330
]
279331
},
280332
{
@@ -288,16 +340,12 @@
288340
" chunk.to_netcdf(path)\n",
289341
"\n",
290342
"with beam.Pipeline() as p:\n",
291-
" p | (\n",
292-
" xbeam.Dataset.from_zarr('example_data.zarr')\n",
293-
" .rechunk({'latitude': -1, 'longitude': -1})\n",
294-
" .ptransform\n",
295-
" ) | beam.MapTuple(to_netcdf)\n",
343+
" p | ds_beam.rechunk('50MB').ptransform | beam.MapTuple(to_netcdf)\n",
296344
"\n",
297345
"%ls *.nc"
298346
],
299347
"outputs": [],
300-
"execution_count": 10
348+
"execution_count": 13
301349
}
302350
],
303351
"metadata": {

0 commit comments

Comments
 (0)