diff --git a/docs/documentation/additional_examples.rst b/docs/documentation/additional_examples.rst index e4c8eefd0c..19a3f8f665 100644 --- a/docs/documentation/additional_examples.rst +++ b/docs/documentation/additional_examples.rst @@ -8,13 +8,6 @@ example_brownian.py :language: python :linenos: -example_dask_chunk_OCMs.py --------------------------- - -.. literalinclude:: ../examples/example_dask_chunk_OCMs.py - :language: python - :linenos: - example_decaying_moving_eddy.py ------------------------------- diff --git a/docs/examples/documentation_MPI.ipynb b/docs/examples/documentation_MPI.ipynb index c16608d7d4..ee1f7e3dfb 100644 --- a/docs/examples/documentation_MPI.ipynb +++ b/docs/examples/documentation_MPI.ipynb @@ -5,7 +5,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# Parallelisation with MPI and Field chunking with dask\n" + "# Parallelisation with MPI\n" ] }, { @@ -159,237 +159,6 @@ "For small projects, the above instructions are sufficient. If your project is large, then it is helpful to combine the `proc*` directories into a single zarr dataset and to optimise the chunking for your analysis. What is \"large\"? If you find yourself running out of memory while doing your analysis, saving the results, or sorting the dataset, or if reading the data is taking longer than you can tolerate, your problem is \"large.\" Another rule of thumb is if the size of your output directory is 1/3 or more of the memory of your machine, your problem is large. Chunking and combining the `proc*` data in order to speed up analysis is discussed [in the documentation on runs with large output](https://docs.oceanparcels.org/en/latest/examples/documentation_LargeRunsOutput.html).\n" ] }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Chunking the FieldSet with dask\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The basic idea of field-chunking in Parcels is that we use the `dask` library to load only these regions of the `Field` that are occupied by `Particles`. The advantage is that if different MPI processors take care of Particles in different parts of the domain, each only needs to load a small section of the full `FieldSet` (although note that this load-balancing functionality is still in [development](#Future-developments:-load-balancing)). Furthermore, the field-chunking in principle makes the `indices` keyword superfluous, as Parcels will determine which part of the domain to load itself.\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The default behaviour is for `dask` to control the chunking, via a call to `da.from_array(data, chunks='auto')`. The chunksizes are then determined by the layout of the NetCDF files.\n", - "\n", - "However, in tests we have experienced that this may not necessarily be the most efficient chunking. Therefore, Parcels provides control over the chunksize via the `chunksize` keyword in `Field` creation, which requires a dictionary that sets the typical size of chunks for each dimension.\n", - "\n", - "It is strongly encouraged to explore what the best value for chunksize is for your experiment, which will depend on the `FieldSet`, the `ParticleSet` and the type of simulation (2D versus 3D). As a guidance, we have found that chunksizes in the zonal and meridional direction of approximately around 128 to 512 are typically most effective. The binning relates to the size of the model and its data size, so power-of-two values are advantageous but not required.\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The notebook below shows an example script to explore the scaling of the time taken for `pset.execute()` as a function of zonal and meridional `chunksize` for a dataset from the [Copernicus Marine Service](http://marine.copernicus.eu/) portal.\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "pycharm": { - "is_executing": true - } - }, - "outputs": [], - "source": [ - "%pylab inline\n", - "import os\n", - "import time\n", - "from datetime import timedelta\n", - "from glob import glob\n", - "\n", - "import matplotlib.pyplot as plt\n", - "import numpy as np\n", - "import psutil\n", - "\n", - "import parcels\n", - "\n", - "\n", - "def set_cms_fieldset(cs):\n", - " data_dir_head = \"/data/oceanparcels/input_data\"\n", - " data_dir = os.path.join(data_dir_head, \"CMS/GLOBAL_REANALYSIS_PHY_001_030/\")\n", - " files = sorted(glob(data_dir + \"mercatorglorys12v1_gl12_mean_201607*.nc\"))\n", - " variables = {\"U\": \"uo\", \"V\": \"vo\"}\n", - " dimensions = {\"lon\": \"longitude\", \"lat\": \"latitude\", \"time\": \"time\"}\n", - "\n", - " if cs not in [\"auto\", False]:\n", - " cs = {\"time\": (\"time\", 1), \"lat\": (\"latitude\", cs), \"lon\": (\"longitude\", cs)}\n", - " return parcels.FieldSet.from_netcdf(files, variables, dimensions, chunksize=cs)\n", - "\n", - "\n", - "func_time = []\n", - "mem_used_GB = []\n", - "chunksize = [128, 256, 512, 768, 1024, 1280, 1536, 1792, 2048, 2610, \"auto\", False]\n", - "for cs in chunksize:\n", - " fieldset = set_cms_fieldset(cs)\n", - " pset = parcels.ParticleSet(\n", - " fieldset=fieldset,\n", - " pclass=parcels.Particle,\n", - " lon=[0],\n", - " lat=[0],\n", - " repeatdt=timedelta(hours=1),\n", - " )\n", - "\n", - " tic = time.time()\n", - " pset.execute(parcels.AdvectionRK4, dt=timedelta(hours=1))\n", - " func_time.append(time.time() - tic)\n", - " process = psutil.Process(os.getpid())\n", - " mem_B_used = process.memory_info().rss\n", - " mem_used_GB.append(mem_B_used / (1024 * 1024))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "pycharm": { - "is_executing": true - } - }, - "outputs": [], - "source": [ - "fig, ax = plt.subplots(1, 1, figsize=(15, 7))\n", - "\n", - "ax.plot(chunksize[:-2], func_time[:-2], \"o-\")\n", - "ax.plot([0, 2800], [func_time[-2], func_time[-2]], \"--\", label=chunksize[-2])\n", - "ax.plot([0, 2800], [func_time[-1], func_time[-1]], \"--\", label=chunksize[-1])\n", - "plt.xlim([0, 2800])\n", - "plt.legend()\n", - "ax.set_xlabel(\"chunksize\")\n", - "ax.set_ylabel(\"Time spent in pset.execute() [s]\")\n", - "plt.show()" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The plot above shows that in this case, `chunksize='auto'` and `chunksize=False` (the two dashed lines) are roughly the same speed, but that the fastest run is for `chunksize=(1, 256, 256)`. But note that the actual thresholds and numbers depend on the `FieldSet` used and the specifics of your experiment.\n", - "\n", - "Furthermore, one of the major advantages of field chunking is the efficient utilization of memory. This permits the distribution of the particle advection to many cores, as otherwise the processing unit (e.g. a CPU core; a node in a cluster) would exhaust the memory rapidly. This is shown in the following plot of the memory behaviour.\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "pycharm": { - "is_executing": true - } - }, - "outputs": [], - "source": [ - "fig, ax = plt.subplots(1, 1, figsize=(15, 12))\n", - "ax.plot(chunksize[:-2], mem_used_GB[:-2], \"--\", label=\"memory_blocked [MB]\")\n", - "ax.plot([0, 2800], [mem_used_GB[-2], mem_used_GB[-2]], \"x-\", label=\"auto [MB]\")\n", - "ax.plot([0, 2800], [mem_used_GB[-1], mem_used_GB[-1]], \"--\", label=\"no chunking [MB]\")\n", - "plt.legend()\n", - "ax.set_xlabel(\"chunksize\")\n", - "ax.set_ylabel(\"Memory blocked in pset.execute() [MB]\")\n", - "plt.show()" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "It can clearly be seen that with `chunksize=False` (green line), all Field data are loaded in full directly into memory, which can lead to MPI-run simulations being aborted for memory reasons. Furthermore, one can see that even the automatic method is not optimal, and optimizing the chunksize for a specific hydrodynamic dataset can make a large difference to the memory used.\n", - "\n", - "It may - depending on your simulation goal - be necessary to tweak the chunksize to leave more memory space for additional particles that are being simulated. Since particles and fields share the same memory space, lower memory utilisation by the `FieldSet` means more memory available for a larger `ParticleSet`.\n", - "\n", - "Also note that the above example is for a 2D application. For 3D applications, the `chunksize=False` will almost always be slower than `chunksize='auto'` or any dictionary, and is likely to run into insufficient memory issues, raising a `MemoryError`. The plot below shows the same analysis as above, but this time for a set of simulations using the full 3D Copernicus Marine Service code. In this case, the `chunksize='auto'` is about two orders of magnitude faster than running without chunking, and about 7.5 times faster than with minimal chunk capacity (i.e. `chunksize=(1, 128, 128)`).\n", - "\n", - "Choosing too small chunksizes can make the code slower, again highlighting that it is wise to explore which chunksize is best for your experiment before you perform it.\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "pycharm": { - "is_executing": true - } - }, - "outputs": [], - "source": [ - "from parcels import AdvectionRK4_3D\n", - "\n", - "\n", - "def set_cms_fieldset_3D(cs):\n", - " data_dir_head = \"/data/oceanparcels/input_data\"\n", - " data_dir = os.path.join(data_dir_head, \"CMS/GLOBAL_REANALYSIS_PHY_001_030/\")\n", - " files = sorted(glob(data_dir + \"mercatorglorys12v1_gl12_mean_201607*.nc\"))\n", - " variables = {\"U\": \"uo\", \"V\": \"vo\"}\n", - " dimensions = {\n", - " \"lon\": \"longitude\",\n", - " \"lat\": \"latitude\",\n", - " \"depth\": \"depth\",\n", - " \"time\": \"time\",\n", - " }\n", - "\n", - " if cs not in [\"auto\", False]:\n", - " cs = {\n", - " \"time\": (\"time\", 1),\n", - " \"depth\": {\"depth\", 1},\n", - " \"lat\": (\"latitude\", cs),\n", - " \"lon\": (\"longitude\", cs),\n", - " }\n", - " return parcels.FieldSet.from_netcdf(files, variables, dimensions, chunksize=cs)\n", - "\n", - "\n", - "chunksize_3D = [128, 256, 512, 768, 1024, 1280, 1536, 1792, 2048, 2610, \"auto\", False]\n", - "func_time3D = []\n", - "for cs in chunksize_3D:\n", - " fieldset = set_cms_fieldset_3D(cs)\n", - " pset = parcels.ParticleSet(\n", - " fieldset=fieldset,\n", - " pclass=parcels.Particle,\n", - " lon=[0],\n", - " lat=[0],\n", - " repeatdt=timedelta(hours=1),\n", - " )\n", - "\n", - " tic = time.time()\n", - " pset.execute(AdvectionRK4_3D, dt=timedelta(hours=1))\n", - " func_time3D.append(time.time() - tic)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "pycharm": { - "is_executing": true - } - }, - "outputs": [], - "source": [ - "fig, ax = plt.subplots(1, 1, figsize=(15, 7))\n", - "\n", - "ax.plot(chunksize[:-2], func_time3D[:-2], \"o-\")\n", - "ax.plot([0, 2800], [func_time3D[-2], func_time3D[-2]], \"--\", label=chunksize_3D[-2])\n", - "plt.xlim([0, 2800])\n", - "plt.legend()\n", - "ax.set_xlabel(\"chunksize\")\n", - "ax.set_ylabel(\"Time spent in pset.execute() [s]\")\n", - "plt.show()" - ] - }, { "attachments": {}, "cell_type": "markdown", diff --git a/docs/examples/example_nemo_curvilinear.py b/docs/examples/example_nemo_curvilinear.py index a9a36fb1eb..9c81b938d7 100644 --- a/docs/examples/example_nemo_curvilinear.py +++ b/docs/examples/example_nemo_curvilinear.py @@ -28,11 +28,7 @@ def run_nemo_curvilinear(outfile, advtype="RK4"): } variables = {"U": "U", "V": "V"} dimensions = {"lon": "glamf", "lat": "gphif"} - chunksize = {"lat": ("y", 256), "lon": ("x", 512)} - fieldset = parcels.FieldSet.from_nemo( - filenames, variables, dimensions, chunksize=chunksize - ) - assert fieldset.U.chunksize == chunksize + fieldset = parcels.FieldSet.from_nemo(filenames, variables, dimensions) # Now run particles as normal npart = 20 diff --git a/docs/examples/example_ofam.py b/docs/examples/example_ofam.py index 175d5f40de..d0f887b8ea 100644 --- a/docs/examples/example_ofam.py +++ b/docs/examples/example_ofam.py @@ -33,7 +33,6 @@ def set_ofam_fieldset(deferred_load=True, use_xarray=False): dimensions, allow_time_extrapolation=True, deferred_load=deferred_load, - chunksize=False, ) diff --git a/docs/examples/tutorial_parcels_structure.ipynb b/docs/examples/tutorial_parcels_structure.ipynb index df802b3067..8784d70ae3 100644 --- a/docs/examples/tutorial_parcels_structure.ipynb +++ b/docs/examples/tutorial_parcels_structure.ipynb @@ -418,26 +418,6 @@ "- [Optimising the partitioning of the particles with a user-defined partition function](https://docs.oceanparcels.org/en/latest/examples/documentation_MPI.html#Optimising-the-partitioning-of-the-particles-with-a-user-defined-partition_function)\n", "- [Future developments: load balancing](https://docs.oceanparcels.org/en/latest/examples/documentation_MPI.html#Future-developments:-load-balancing)" ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Another good way to optimise Parcels and speed-up execution is to chunk the `FieldSet` with `dask`, using the `chunksize` argument in the `FieldSet` creation. This will allow Parcels to load the `FieldSet` in chunks. \n", - "\n", - "Using chunking can be especially useful when working with large datasets _and_ when the particles only occupy a small region of the domain.\n", - "\n", - "Note that the **default** is `chunksize=None`, which means that the `FieldSet` is loaded in its entirety. This is generally the most efficient way to load the `FieldSet` when the particles are spread out over the entire domain.\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### For more tutorials chunking and dask:\n", - "\n", - "- [Chunking the FieldSet with dask](https://docs.oceanparcels.org/en/latest/examples/documentation_MPI.html#Chunking-the-FieldSet-with-dask)" - ] } ], "metadata": { diff --git a/parcels/_typing.py b/parcels/_typing.py index 775defeae8..b2ee43580c 100644 --- a/parcels/_typing.py +++ b/parcels/_typing.py @@ -28,7 +28,6 @@ PathLike = str | os.PathLike Mesh = Literal["spherical", "flat"] # corresponds with `mesh` VectorType = Literal["3D", "3DSigma", "2D"] | None # corresponds with `vector_type` -ChunkMode = Literal["auto", "specific", "failsafe"] # corresponds with `chunk_mode` GridIndexingType = Literal["pop", "mom5", "mitgcm", "nemo", "croco"] # corresponds with `gridindexingtype` UpdateStatus = Literal["not_updated", "first_updated", "updated"] # corresponds with `_update_status` NetcdfEngine = Literal["netcdf4", "xarray", "scipy"] diff --git a/parcels/field.py b/parcels/field.py index 7b8b7cdf24..516da65b61 100644 --- a/parcels/field.py +++ b/parcels/field.py @@ -2,7 +2,6 @@ import math import warnings from collections.abc import Iterable -from ctypes import c_int from pathlib import Path from typing import TYPE_CHECKING, Literal @@ -44,8 +43,6 @@ from ._index_search import _search_indices_curvilinear, _search_indices_rectilinear from .fieldfilebuffer import ( - DaskFileBuffer, - DeferredDaskFileBuffer, DeferredNetcdfFileBuffer, NetcdfFileBuffer, ) @@ -154,9 +151,6 @@ class Field: allow_time_extrapolation : bool boolean whether to allow for extrapolation in time (i.e. beyond the last available time snapshot) - chunkdims_name_map : str, optional - Gives a name map to the FieldFileBuffer that declared a mapping between chunksize name, NetCDF dimension and Parcels dimension; - required only if currently incompatible OCM field is loaded and chunking is used by 'chunksize' (which is the default) to_write : bool Write the Field in NetCDF format at the same frequency as the ParticleFile outputdt, using a filenaming scheme based on the ParticleFile name @@ -276,8 +270,7 @@ def __init__( # Hack around the fact that NaN and ridiculously large values # propagate in SciPy's interpolators - lib = np if isinstance(self.data, np.ndarray) else da - self.data[lib.isnan(self.data)] = 0.0 + self.data[np.isnan(self.data)] = 0.0 if self.vmin is not None: self.data[self.data < self.vmin] = 0.0 if self.vmax is not None: @@ -291,8 +284,6 @@ def __init__( self._field_fb_class = kwargs.pop("FieldFileBuffer", None) self._netcdf_engine = kwargs.pop("netcdf_engine", "netcdf4") self._creation_log = kwargs.pop("creation_log", "") - self.chunksize = kwargs.pop("chunksize", None) - self.netcdf_chunkdims_name_map = kwargs.pop("chunkdims_name_map", None) self.grid.depth_field = kwargs.pop("depth_field", None) if self.grid.depth_field == "not_yet_set": @@ -304,9 +295,6 @@ def __init__( # (data_full_zdim = grid.zdim if no indices are used, for A- and C-grids and for some B-grids). It is used for the B-grid, # since some datasets do not provide the deeper level of data (which is ignored by the interpolation). self.data_full_zdim = kwargs.pop("data_full_zdim", None) - self._data_chunks = [] # type: ignore # the data buffer of the FileBuffer raw loaded data - shall be a list of C-contiguous arrays - self.nchunks: tuple[int, ...] = () - self._chunk_set: bool = False self.filebuffers = [None] * 2 if len(kwargs) > 0: raise SyntaxError(f'Field received an unexpected keyword argument "{list(kwargs.keys())[0]}"') @@ -460,8 +448,6 @@ def from_netcdf( gridindexingtype : str The type of gridindexing. Either 'nemo' (default), 'mitgcm', 'mom5', 'pop', or 'croco' are supported. See also the Grid indexing documentation on oceanparcels.org - chunksize : - size of the chunks in dask loading netcdf_decodewarning : bool (DEPRECATED - v3.1.0) Whether to show a warning if there is a problem decoding the netcdf files. Default is True, but in some cases where these warnings are expected, it may be useful to silence them @@ -623,9 +609,6 @@ def from_netcdf( ) kwargs["dataFiles"] = dataFiles - chunksize: bool | None = kwargs.get("chunksize", None) - grid.chunksize = chunksize - if "time" in indices: warnings.warn( "time dimension in indices is not necessary anymore. It is then ignored.", FieldSetWarning, stacklevel=2 @@ -634,13 +617,8 @@ def from_netcdf( if grid.time.size <= 2: deferred_load = False - _field_fb_class: type[DeferredDaskFileBuffer | DaskFileBuffer | DeferredNetcdfFileBuffer | NetcdfFileBuffer] - if chunksize not in [False, None]: - if deferred_load: - _field_fb_class = DeferredDaskFileBuffer - else: - _field_fb_class = DaskFileBuffer - elif deferred_load: + _field_fb_class: type[DeferredNetcdfFileBuffer | NetcdfFileBuffer] + if deferred_load: _field_fb_class = DeferredNetcdfFileBuffer else: _field_fb_class = NetcdfFileBuffer @@ -658,7 +636,6 @@ def from_netcdf( netcdf_engine, interp_method=interp_method, data_full_zdim=data_full_zdim, - chunksize=chunksize, ) as filebuffer: # If Field.from_netcdf is called directly, it may not have a 'data' dimension # In that case, assume that 'name' is the data dimension @@ -680,16 +657,15 @@ def from_netcdf( if len(filebuffer.indices["depth"]) > 1: data_list.append(buffer_data.reshape(sum(((1,), buffer_data.shape), ()))) else: - if type(tslice) not in [list, np.ndarray, da.Array, xr.DataArray]: + if type(tslice) not in [list, np.ndarray, xr.DataArray]: tslice = [tslice] data_list.append(buffer_data.reshape(sum(((len(tslice), 1), buffer_data.shape[1:]), ()))) else: data_list.append(buffer_data) - if type(tslice) not in [list, np.ndarray, da.Array, xr.DataArray]: + if type(tslice) not in [list, np.ndarray, xr.DataArray]: tslice = [tslice] ti += len(tslice) - lib = np if isinstance(data_list[0], np.ndarray) else da - data = lib.concatenate(data_list, axis=0) + data = np.concatenate(data_list, axis=0) else: grid._defer_load = True grid._ti = -1 @@ -770,32 +746,23 @@ def from_xarray( def _reshape(self, data, transpose=False): # Ensure that field data is the right data type - if not isinstance(data, (np.ndarray, da.core.Array)): + if not isinstance(data, (np.ndarray)): data = np.array(data) if (self.cast_data_dtype == np.float32) and (data.dtype != np.float32): data = data.astype(np.float32) elif (self.cast_data_dtype == np.float64) and (data.dtype != np.float64): data = data.astype(np.float64) - lib = np if isinstance(data, np.ndarray) else da if transpose: - data = lib.transpose(data) + data = np.transpose(data) if self.grid._lat_flipped: - data = lib.flip(data, axis=-2) + data = np.flip(data, axis=-2) if self.grid.xdim == 1 or self.grid.ydim == 1: - data = lib.squeeze(data) # First remove all length-1 dimensions in data, so that we can add them below + data = np.squeeze(data) # First remove all length-1 dimensions in data, so that we can add them below if self.grid.xdim == 1 and len(data.shape) < 4: - if lib == da: - raise NotImplementedError( - "Length-one dimensions with field chunking not implemented, as dask does not have an `expand_dims` method. Use chunksize=None" - ) - data = lib.expand_dims(data, axis=-1) + data = np.expand_dims(data, axis=-1) if self.grid.ydim == 1 and len(data.shape) < 4: - if lib == da: - raise NotImplementedError( - "Length-one dimensions with field chunking not implemented, as dask does not have an `expand_dims` method. Use chunksize=None" - ) - data = lib.expand_dims(data, axis=-2) + data = np.expand_dims(data, axis=-2) if self.grid.tdim == 1: if len(data.shape) < 4: data = data.reshape(sum(((1,), data.shape), ())) @@ -911,8 +878,6 @@ def _spatial_interpolation(self, time, z, y, x, ti, particle=None): # Detect Out-of-bounds sampling and raise exception _raise_field_out_of_bound_error(z, y, x) else: - if isinstance(val, da.core.Array): - val = val.compute() return val except (FieldSamplingError, FieldOutOfBoundError, FieldOutOfBoundSurfaceError) as e: @@ -985,67 +950,6 @@ def eval(self, time, z, y, x, particle=None, applyConversion=True): else: return value - def _get_block_id(self, block): - return np.ravel_multi_index(block, self.nchunks) - - def _get_block(self, bid): - return np.unravel_index(bid, self.nchunks[1:]) - - def _chunk_setup(self): - if isinstance(self.data, da.core.Array): - chunks = self.data.chunks - self.nchunks = self.data.numblocks - npartitions = 1 - for n in self.nchunks[1:]: - npartitions *= n - elif isinstance(self.data, np.ndarray): - chunks = tuple((t,) for t in self.data.shape) - self.nchunks = (1,) * len(self.data.shape) - npartitions = 1 - elif isinstance(self.data, DeferredArray): - self.nchunks = (1,) * len(self.data.data_shape) - return - else: - return - - self._data_chunks = [None] * npartitions - self.grid._load_chunk = np.zeros(npartitions, dtype=c_int, order="C") - # self.grid.chunk_info format: number of dimensions (without tdim); number of chunks per dimensions; - # chunksizes (the 0th dim sizes for all chunk of dim[0], then so on for next dims - self.grid.chunk_info = [ - [len(self.nchunks) - 1], - list(self.nchunks[1:]), - sum(list(list(ci) for ci in chunks[1:]), []), # noqa: RUF017 # TODO: Perhaps avoid quadratic list summation here - ] - self.grid.chunk_info = sum(self.grid.chunk_info, []) # noqa: RUF017 - self._chunk_set = True - - def _chunk_data(self): - if not self._chunk_set: - self._chunk_setup() - g = self.grid - if isinstance(self.data, da.core.Array): - for block_id in range(len(self.grid._load_chunk)): - if g._load_chunk[block_id] == g._chunk_loading_requested or ( - g._load_chunk[block_id] in g._chunk_loaded and self._data_chunks[block_id] is None - ): - block = self._get_block(block_id) - self._data_chunks[block_id] = np.array( - self.data.blocks[(slice(self.grid.tdim),) + block], order="C" - ) - elif g._load_chunk[block_id] == g._chunk_not_loaded: - if isinstance(self._data_chunks, list): - self._data_chunks[block_id] = None - else: - self._data_chunks[block_id, :] = None - else: - if isinstance(self._data_chunks, list): - self._data_chunks[0] = None - else: - self._data_chunks[0, :] = None - self.grid._load_chunk[0] = g._chunk_loaded_touched - self._data_chunks[0] = np.array(self.data, order="C") - def add_periodic_halo(self, zonal, meridional, halosize=5, data=None): """Add a 'halo' to all Fields in a FieldSet. @@ -1067,26 +971,25 @@ def add_periodic_halo(self, zonal, meridional, halosize=5, data=None): data : if data is not None, the periodic halo will be achieved on data instead of self.data and data will be returned (Default value = None) """ - dataNone = not isinstance(data, (np.ndarray, da.core.Array)) + dataNone = not isinstance(data, np.ndarray) if self.grid.defer_load and dataNone: return data = self.data if dataNone else data - lib = np if isinstance(data, np.ndarray) else da if zonal: if len(data.shape) == 3: - data = lib.concatenate((data[:, :, -halosize:], data, data[:, :, 0:halosize]), axis=len(data.shape) - 1) + data = np.concatenate((data[:, :, -halosize:], data, data[:, :, 0:halosize]), axis=len(data.shape) - 1) assert data.shape[2] == self.grid.xdim, "Third dim must be x." else: - data = lib.concatenate( + data = np.concatenate( (data[:, :, :, -halosize:], data, data[:, :, :, 0:halosize]), axis=len(data.shape) - 1 ) assert data.shape[3] == self.grid.xdim, "Fourth dim must be x." if meridional: if len(data.shape) == 3: - data = lib.concatenate((data[:, -halosize:, :], data, data[:, 0:halosize, :]), axis=len(data.shape) - 2) + data = np.concatenate((data[:, -halosize:, :], data, data[:, 0:halosize, :]), axis=len(data.shape) - 2) assert data.shape[1] == self.grid.ydim, "Second dim must be y." else: - data = lib.concatenate( + data = np.concatenate( (data[:, :, -halosize:, :], data, data[:, :, 0:halosize, :]), axis=len(data.shape) - 2 ) assert data.shape[2] == self.grid.ydim, "Third dim must be y." @@ -1158,11 +1061,10 @@ def _data_concatenate(self, data, data_to_concat, tindex): data[tindex] = None elif isinstance(data, list): del data[tindex] - lib = np if isinstance(data, np.ndarray) else da if tindex == 0: - data = lib.concatenate([data_to_concat, data[tindex + 1 :, :]], axis=0) + data = np.concatenate([data_to_concat, data[tindex + 1 :, :]], axis=0) elif tindex == 1: - data = lib.concatenate([data[:tindex, :], data_to_concat], axis=0) + data = np.concatenate([data[:tindex, :], data_to_concat], axis=0) else: raise ValueError("data_concatenate is used for computeTimeChunk, with tindex in [0, 1]") return data @@ -1178,7 +1080,6 @@ def computeTimeChunk(self, data, tindex): ti = g._ti + tindex timestamp = self.timestamps[np.where(ti < summedlen)[0][0]] - rechunk_callback_fields = self._chunk_setup if isinstance(tindex, list) else None filebuffer = self._field_fb_class( self._dataFiles[g._ti + tindex], self.dimensions, @@ -1187,10 +1088,7 @@ def computeTimeChunk(self, data, tindex): timestamp=timestamp, interp_method=self.interp_method, data_full_zdim=self.data_full_zdim, - chunksize=self.chunksize, cast_data_dtype=self.cast_data_dtype, - rechunk_callback_fields=rechunk_callback_fields, - chunkdims_name_map=self.netcdf_chunkdims_name_map, ) filebuffer.__enter__() time_data = filebuffer.time @@ -1199,13 +1097,12 @@ def computeTimeChunk(self, data, tindex): if self.netcdf_engine != "xarray": filebuffer.name = filebuffer.parse_name(self.filebuffername) buffer_data = filebuffer.data - lib = np if isinstance(buffer_data, np.ndarray) else da if len(buffer_data.shape) == 2: - buffer_data = lib.reshape(buffer_data, sum(((1, 1), buffer_data.shape), ())) + buffer_data = np.reshape(buffer_data, sum(((1, 1), buffer_data.shape), ())) elif len(buffer_data.shape) == 3 and g.zdim > 1: - buffer_data = lib.reshape(buffer_data, sum(((1,), buffer_data.shape), ())) + buffer_data = np.reshape(buffer_data, sum(((1,), buffer_data.shape), ())) elif len(buffer_data.shape) == 3: - buffer_data = lib.reshape( + buffer_data = np.reshape( buffer_data, sum( ( diff --git a/parcels/fieldfilebuffer.py b/parcels/fieldfilebuffer.py index 8c3a55495f..9c47890294 100644 --- a/parcels/fieldfilebuffer.py +++ b/parcels/fieldfilebuffer.py @@ -1,18 +1,11 @@ import datetime -import math import warnings -import dask.array as da import numpy as np -import psutil import xarray as xr -from dask import config as da_conf -from dask import utils as da_utils -from netCDF4 import Dataset as ncDataset from parcels._typing import InterpMethodOption from parcels.tools.converters import convert_xarray_time_units -from parcels.tools.statuscodes import DaskChunkingError from parcels.tools.warnings import FileWarning @@ -263,600 +256,3 @@ def time_access(self): class DeferredNetcdfFileBuffer(NetcdfFileBuffer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - - -class DaskFileBuffer(NetcdfFileBuffer): - _static_name_maps = { - "time": ["time", "time_count", "time_counter", "timer_count", "t"], - "depth": [ - "depth", - "depthu", - "depthv", - "depthw", - "depths", - "deptht", - "depthx", - "depthy", - "depthz", - "z", - "z_u", - "z_v", - "z_w", - "d", - "k", - "w_dep", - "w_deps", - "Z", - "Zp1", - "Zl", - "Zu", - "level", - ], - "lat": ["lat", "nav_lat", "y", "latitude", "la", "lt", "j", "YC", "YG"], - "lon": ["lon", "nav_lon", "x", "longitude", "lo", "ln", "i", "XC", "XG"], - } - _min_dim_chunksize = 16 - - """ Class that encapsulates and manages deferred access to file data. """ - - def __init__(self, *args, **kwargs): - """ - Initializes this specific filebuffer type. As a result of using dask, the internal library is set to 'da'. - The chunksize parameter is popped from the argument list, as well as the locking-parameter and the - rechunk callback function. Also chunking-related variables are initialized. - """ - self.lib = da - self.chunksize = kwargs.pop("chunksize", "auto") - self.lock_file = kwargs.pop("lock_file", True) - self.chunk_mapping = None - self.rechunk_callback_fields = kwargs.pop("rechunk_callback_fields", None) - self.chunking_finalized = False - self.autochunkingfailed = False - super().__init__(*args, **kwargs) - - def __enter__(self): - """ - This function enters the physical file (equivalent to a 'with open(...)' statement) and returns a file object. - In Dask, with dynamic loading, this is the point where we have access to the header-information of the file. - Hence, this function initializes the dynamic loading by parsing the chunksize-argument and maps the requested - chunksizes onto the variables found in the file. For auto-chunking, educated guesses are made (e.g. with the - dask configuration file in the background) to determine the ideal chunk sizes. This is also the point - where - due to the chunking, the file is 'locked', meaning that it cannot be simultaneously accessed by - another process. This is significant in a cluster setup. - """ - if self.chunksize not in [False, None, "auto"] and type(self.chunksize) is not dict: - raise AttributeError( - "'chunksize' is of wrong type. Parameter is expected to be a dict per data dimension, or be False, None or 'auto'." - ) - if isinstance(self.chunksize, list): - self.chunksize = tuple(self.chunksize) - - init_chunk_dict = None - if self.chunksize not in [False, None]: - init_chunk_dict = self._get_initial_chunk_dictionary() - try: - # Unfortunately we need to do if-else here, cause the lock-parameter is either False or a Lock-object - # (which we would rather want to have being auto-managed). - # If 'lock' is not specified, the Lock-object is auto-created and managed by xarray internally. - if self.lock_file: - self.dataset = xr.open_dataset( - str(self.filename), decode_cf=True, engine=self.netcdf_engine, chunks=init_chunk_dict - ) - else: - self.dataset = xr.open_dataset( - str(self.filename), decode_cf=True, engine=self.netcdf_engine, chunks=init_chunk_dict, lock=False - ) - self.dataset["decoded"] = True - except: - warnings.warn( - f"File {self.filename} could not be decoded properly by xarray (version {xr.__version__}). " - "It will be opened with no decoding. Filling values might be wrongly parsed.", - FileWarning, - stacklevel=2, - ) - if self.lock_file: - self.dataset = xr.open_dataset( - str(self.filename), decode_cf=False, engine=self.netcdf_engine, chunks=init_chunk_dict - ) - else: - self.dataset = xr.open_dataset( - str(self.filename), decode_cf=False, engine=self.netcdf_engine, chunks=init_chunk_dict, lock=False - ) - self.dataset["decoded"] = False - - for inds in self.indices.values(): - if type(inds) not in [list, range]: - raise RuntimeError("Indices for field subsetting need to be a list") - return self - - def __exit__(self, type, value, traceback): - """Function releases the file handle. - - This function releases the file handle. Hence access to the dataset and its header-information is lost. The - previously executed chunking is lost. Furthermore, if the file access required file locking, the lock-handle - is freed so other processes can now access the file again. - """ - self.close() - - def close(self): - """Teardown FileBuffer object with dask. - - This function can be called to initialise an orderly teardown of a FileBuffer object with dask, meaning - to release the file handle, deposing the dataset, and releasing the file lock (if required). - """ - if self.dataset is not None: - self.dataset.close() - self.dataset = None - self.chunking_finalized = False - self.chunk_mapping = None - - @classmethod - def add_to_dimension_name_map_global(cls, name_map): - """ - [externally callable] - This function adds entries to the name map from parcels_dim -> netcdf_dim. This is required if you want to - use auto-chunking on large fields whose map parameters are not defined. This function must be called before - entering the filebuffer object. Example: - DaskFileBuffer.add_to_dimension_name_map_global({'lat': 'nydim', - 'lon': 'nxdim', - 'time': 'ntdim', - 'depth': 'nddim'}) - fieldset = FieldSet(..., chunksize='auto') - [...] - Note that not all parcels dimensions need to be present in 'name_map'. - """ - assert isinstance(name_map, dict) - for pcls_dim_name in name_map.keys(): - if isinstance(name_map[pcls_dim_name], list): - for nc_dim_name in name_map[pcls_dim_name]: - cls._static_name_maps[pcls_dim_name].append(nc_dim_name) - elif isinstance(name_map[pcls_dim_name], str): - cls._static_name_maps[pcls_dim_name].append(name_map[pcls_dim_name]) - - def add_to_dimension_name_map(self, name_map): - """ - [externally callable] - This function adds entries to the name map from parcels_dim -> netcdf_dim. This is required if you want to - use auto-chunking on large fields whose map parameters are not defined. This function must be called after - constructing an filebuffer object and before entering the filebuffer. Example: - fb = DaskFileBuffer(...) - fb.add_to_dimension_name_map({'lat': 'nydim', 'lon': 'nxdim', 'time': 'ntdim', 'depth': 'nddim'}) - with fb: - [do_stuff} - Note that not all parcels dimensions need to be present in 'name_map'. - """ - assert isinstance(name_map, dict) - for pcls_dim_name in name_map.keys(): - self._static_name_maps[pcls_dim_name].append(name_map[pcls_dim_name]) - - def _get_available_dims_indices_by_request(self): - """Returns a dict mapping 'parcels_dimname' -> [None, int32_index_data_array]. - - This dictionary is based on the information provided by the requested dimensions. - Example: {'time': 0, 'depth': None, 'lat': 1, 'lon': 2} - """ - result = {} - neg_offset = 0 - tpl_offset = 0 - for name in ["time", "depth", "lat", "lon"]: - i = list(self._static_name_maps.keys()).index(name) - if name not in self.dimensions: - result[name] = None - tpl_offset += 1 - neg_offset += 1 - elif ( - (type(self.chunksize) is dict) - and ( - name not in self.chunksize - or ( - type(self.chunksize[name]) is tuple - and len(self.chunksize[name]) == 2 - and self.chunksize[name][1] <= 1 - ) - ) - ) or ( - (type(self.chunksize) is tuple) and name in self.dimensions and (self.chunksize[i - tpl_offset] <= 1) - ): - result[name] = None - neg_offset += 1 - else: - result[name] = i - neg_offset - return result - - def _get_available_dims_indices_by_namemap(self): - """ - Returns a dict mapping 'parcels_dimname' -> [None, int32_index_data_array]. - This dictionary is based on the information provided by the requested dimensions. - Example: {'time': 0, 'depth': 1, 'lat': 2, 'lon': 3} - """ - result = {} - for name in ["time", "depth", "lat", "lon"]: - result[name] = list(self._static_name_maps.keys()).index(name) - return result - - def _get_available_dims_indices_by_netcdf_file(self): - """ - [File needs to be open (i.e. self.dataset is not None) for this to work - otherwise generating an error] - Returns a dict mapping 'parcels_dimname' -> [None, int32_index_data_array]. - This dictionary is based on the information provided by the requested dimensions. - Example: {'time': 0, 'depth': 5, 'lat': 3, 'lon': 1} - for NetCDF with dimensions: - timer: 1 - x: [0 4000] - xr: [0 3999] - y: [0 2140] - yr: [0 2139] - z: [0 75] - """ - if self.dataset is None: - raise OSError("Trying to parse NetCDF header information before opening the file.") - result = {} - for pcls_dimname in ["time", "depth", "lat", "lon"]: - for nc_dimname in self._static_name_maps[pcls_dimname]: - if nc_dimname not in self.dataset.sizes.keys(): - continue - result[pcls_dimname] = list(self.dataset.sizes.keys()).index(nc_dimname) - return result - - def _is_dimension_available(self, dimension_name): - """ - This function returns a boolean value indicating if a certain variable (name) is available in the - requested dimensions as well as in the actual dataset of the file. If any of the two conditions is not met, - if returns 'False'. - """ - if self.dimensions is None or self.dataset is None: - return False - return dimension_name in self.dimensions - - def _is_dimension_chunked(self, dimension_name): - """ - This functions returns a boolean value indicating if a certain variable is available in the requested - dimensions, the NetCDF file dataset, and is also required to be chunked according to the requested - chunksize dictionary. If any of the two conditions is not met, if returns 'False'. - """ - if self.dimensions is None or self.dataset is None or self.chunksize in [None, False, "auto"]: - return False - dim_chunked = False - dim_chunked = ( - True - if (not dim_chunked and type(self.chunksize) is dict and dimension_name in self.chunksize.keys()) - else False - ) - dim_chunked = True if (not dim_chunked and type(self.chunksize) in [None, False]) else False - return (dimension_name in self.dimensions) and dim_chunked - - def _is_dimension_in_dataset(self, parcels_dimension_name, netcdf_dimension_name=None): - """ - [File needs to be open (i.e. self.dataset is not None) for this to work - otherwise generating an error] - This function returns the index, the name and the size of a NetCDF dimension in the file (in order: index, name, size). - It requires as input the name of the related parcels dimension (i.e. one of ['time', 'depth', 'lat', 'lon']. If - no hint on its mapping to a NetCDF dimension is provided, a heuristic based on the pre-defined name dictionary - is used. If a hint is provided, a connections is made between the designated parcels-dimension and NetCDF dimension. - """ - if self.dataset is None: - raise OSError("Trying to parse NetCDF header information before opening the file.") - k, dname, dvalue = (-1, "", 0) - dimension_name = parcels_dimension_name.lower() - dim_indices = self._get_available_dims_indices_by_request() - i = dim_indices[dimension_name] - if netcdf_dimension_name is not None and netcdf_dimension_name in self.dataset.sizes.keys(): - value = self.dataset.sizes[netcdf_dimension_name] - k, dname, dvalue = i, netcdf_dimension_name, value - elif self.dimensions is None or self.dataset is None: - return k, dname, dvalue - else: - for name in self._static_name_maps[dimension_name]: - if name in self.dataset.sizes: - value = self.dataset.sizes[name] - k, dname, dvalue = i, name, value - break - return k, dname, dvalue - - def _is_dimension_in_chunksize_request(self, parcels_dimension_name): - """ - This function returns the dense-array index, the NetCDF dimension name and the requested chunsize of a requested - parcels dimension(in order: index, name, size). This only works if the chunksize is provided as a dictionary - of tuples of parcels dimensions and their chunk mapping (i.e. dict(parcels_dim_name => (netcdf_dim_name, chunksize)). - It requires as input the name of the related parcels dimension (i.e. one of ['time', 'depth', 'lat', 'lon']. - """ - k, dname, dvalue = (-1, "", 0) - if self.dimensions is None or self.dataset is None: - return k, dname, dvalue - parcels_dimension_name = parcels_dimension_name.lower() - dim_indices = self._get_available_dims_indices_by_request() - i = dim_indices[parcels_dimension_name] - name = self.chunksize[parcels_dimension_name][0] - value = self.chunksize[parcels_dimension_name][1] - k, dname, dvalue = i, name, value - return k, dname, dvalue - - def _netcdf_DimNotFound_warning_message(self, dimension_name): - """Helper function that issues a warning message if a certain requested NetCDF dimension is not found in the file.""" - display_name = dimension_name if (dimension_name not in self.dimensions) else self.dimensions[dimension_name] - return f"Did not find {display_name} in NetCDF dims. Please specifiy chunksize as dictionary for NetCDF dimension names, e.g.\n chunksize={{ '{display_name}': , ... }}." - - def _chunkmap_to_chunksize(self): - """ - [File needs to be open via the '__enter__'-method for this to work - otherwise generating an error] - This functions translates the array-index-to-chunksize chunk map into a proper fieldsize dictionary that - can later be used for re-chunking, if a previously-opened file is re-opened again. - """ - if self.chunksize in [False, None]: - return - self.chunksize = {} - chunk_map = self.chunk_mapping - timei, timename, timevalue = self._is_dimension_in_dataset("time") - depthi, depthname, depthvalue = self._is_dimension_in_dataset("depth") - lati, latname, latvalue = self._is_dimension_in_dataset("lat") - loni, lonname, lonvalue = self._is_dimension_in_dataset("lon") - if len(chunk_map) == 2: - self.chunksize["lon"] = (latname, chunk_map[0]) - self.chunksize["lat"] = (lonname, chunk_map[1]) - elif len(chunk_map) == 3: - chunk_dim_index = 0 - if depthi is not None and depthi >= 0 and depthvalue > 1 and self._is_dimension_available("depth"): - self.chunksize["depth"] = (depthname, chunk_map[chunk_dim_index]) - chunk_dim_index += 1 - elif timei is not None and timei >= 0 and timevalue > 1 and self._is_dimension_available("time"): - self.chunksize["time"] = (timename, chunk_map[chunk_dim_index]) - chunk_dim_index += 1 - self.chunksize["lat"] = (latname, chunk_map[chunk_dim_index]) - chunk_dim_index += 1 - self.chunksize["lon"] = (lonname, chunk_map[chunk_dim_index]) - elif len(chunk_map) >= 4: - self.chunksize["time"] = (timename, chunk_map[0]) - self.chunksize["depth"] = (depthname, chunk_map[1]) - self.chunksize["lat"] = (latname, chunk_map[2]) - self.chunksize["lon"] = (lonname, chunk_map[3]) - dim_index = 4 - for dim_name in self.dimensions: - if dim_name not in ["time", "depth", "lat", "lon"]: - self.chunksize[dim_name] = (self.dimensions[dim_name], chunk_map[dim_index]) - dim_index += 1 - - def _get_initial_chunk_dictionary_by_dict_(self): - """ - [File needs to be open (i.e. self.dataset is not None) for this to work - otherwise generating an error] - Maps and correlates the requested dictionary-style chunksize with the requested parcels dimensions, variables - and the NetCDF-available dimensions. Thus, it takes care to remove chunksize arguments that are not in the - Parcels- or NetCDF dimensions, or whose chunking would be omitted due to an empty chunk dimension. - The function returns a pair of two things: corrected_chunk_dict, chunk_map - The corrected chunk_dict is the corrected version of the requested chunksize. The chunk map maps the array index - dimension to the requested chunksize. - """ - chunk_dict = {} - chunk_index_map = {} - neg_offset = 0 - if "time" in self.chunksize.keys(): - timei, timename, timesize = self._is_dimension_in_dataset( - parcels_dimension_name="time", netcdf_dimension_name=self.chunksize["time"][0] - ) - timevalue = self.chunksize["time"][1] - if timei is not None and timei >= 0 and timevalue > 1: - timevalue = min(timesize, timevalue) - chunk_dict[timename] = timevalue - chunk_index_map[timei - neg_offset] = timevalue - else: - self.chunksize.pop("time") - if "depth" in self.chunksize.keys(): - depthi, depthname, depthsize = self._is_dimension_in_dataset( - parcels_dimension_name="depth", netcdf_dimension_name=self.chunksize["depth"][0] - ) - depthvalue = self.chunksize["depth"][1] - if depthi is not None and depthi >= 0 and depthvalue > 1: - depthvalue = min(depthsize, depthvalue) - chunk_dict[depthname] = depthvalue - chunk_index_map[depthi - neg_offset] = depthvalue - else: - self.chunksize.pop("depth") - if "lat" in self.chunksize.keys(): - lati, latname, latsize = self._is_dimension_in_dataset( - parcels_dimension_name="lat", netcdf_dimension_name=self.chunksize["lat"][0] - ) - latvalue = self.chunksize["lat"][1] - if lati is not None and lati >= 0 and latvalue > 1: - latvalue = min(latsize, latvalue) - chunk_dict[latname] = latvalue - chunk_index_map[lati - neg_offset] = latvalue - else: - self.chunksize.pop("lat") - if "lon" in self.chunksize.keys(): - loni, lonname, lonsize = self._is_dimension_in_dataset( - parcels_dimension_name="lon", netcdf_dimension_name=self.chunksize["lon"][0] - ) - lonvalue = self.chunksize["lon"][1] - if loni is not None and loni >= 0 and lonvalue > 1: - lonvalue = min(lonsize, lonvalue) - chunk_dict[lonname] = lonvalue - chunk_index_map[loni - neg_offset] = lonvalue - else: - self.chunksize.pop("lon") - return chunk_dict, chunk_index_map - - def _failsafe_parse_(self): - """['name' need to be initialised]""" - # ==== fail - open it as a normal array and deduce the dimensions from the variable-function names ==== # - # ==== done by parsing ALL variables in the NetCDF, and comparing their call-parameters with the ==== # - # ==== name map available here. ==== # - init_chunk_dict = {} - self.dataset = ncDataset(str(self.filename)) - refdims = self.dataset.dimensions.keys() - max_field = "" - max_dim_names = () - max_coincide_dims = 0 - for vname in self.dataset.variables: - var = self.dataset.variables[vname] - coincide_dims = [] - for vdname in var.dimensions: - if vdname in refdims: - coincide_dims.append(vdname) - n_coincide_dims = len(coincide_dims) - if n_coincide_dims > max_coincide_dims: - max_field = vname - max_dim_names = tuple(coincide_dims) - max_coincide_dims = n_coincide_dims - self.name = max_field - for nc_dname in max_dim_names: - pcls_dname = None - for dname in self._static_name_maps.keys(): - if nc_dname in self._static_name_maps[dname]: - pcls_dname = dname - break - nc_dimsize = None - pcls_dim_chunksize = None - if pcls_dname is not None and pcls_dname in self.dimensions: - pcls_dim_chunksize = self._min_dim_chunksize - if isinstance(self.chunksize, dict) and pcls_dname is not None: - nc_dimsize = self.dataset.dimensions[nc_dname].size - if pcls_dname in self.chunksize.keys(): - pcls_dim_chunksize = self.chunksize[pcls_dname][1] - if ( - pcls_dname is not None - and nc_dname is not None - and nc_dimsize is not None - and pcls_dim_chunksize is not None - ): - init_chunk_dict[nc_dname] = pcls_dim_chunksize - - # ==== because in this case it has shown that the requested chunksize setup cannot be used, ==== # - # ==== replace the requested chunksize with this auto-derived version. ==== # - return init_chunk_dict - - def _get_initial_chunk_dictionary(self): - """ - Super-function that maps and correlates the requested chunksize with the requested parcels dimensions, variables - and the NetCDF-available dimensions. Thus, it takes care to remove chunksize arguments that are not in the - Parcels- or NetCDF dimensions, or whose chunking would be omitted due to an empty chunk dimension. - The function returns the corrected chunksize dictionary. The function also initializes the chunk_map. - The chunk map maps the array index dimension to the requested chunksize. - Apart from resolving the different requested version of the chunksize, the function also test-executes the - chunk request. If this initial test fails, as a last resort, we execute a heuristic to map the requested - parcels dimensions to the dimension signature of the most-parameterized NetCDF variable, and heuristically - try to map its parameters to the parcels dimensions with the class-wide name-map. - """ - # ==== check-opening requested dataset to access metadata ==== # - # ==== file-opening and dimension-reading does not require a decode or lock ==== # - self.dataset = xr.open_dataset( - str(self.filename), decode_cf=False, engine=self.netcdf_engine, chunks={}, lock=False - ) - self.dataset["decoded"] = False - # ==== self.dataset temporarily available ==== # - init_chunk_dict = {} - init_chunk_map = {} - if isinstance(self.chunksize, dict): - init_chunk_dict, init_chunk_map = self._get_initial_chunk_dictionary_by_dict_() - elif self.chunksize == "auto": - av_mem = psutil.virtual_memory().available - chunk_cap = av_mem * (1 / 8) * (1 / 3) - if "array.chunk-size" in da_conf.config.keys(): - chunk_cap = da_utils.parse_bytes(da_conf.config.get("array.chunk-size")) - else: - predefined_cap = da_conf.get("array.chunk-size") - if predefined_cap is not None: - chunk_cap = da_utils.parse_bytes(predefined_cap) - else: - warnings.warn( - "Unable to locate chunking hints from dask, thus estimating the max. chunk size heuristically. " - "Please consider defining the 'chunk-size' for 'array' in your local dask configuration file (see https://docs.oceanparcels.org/en/latest/examples/documentation_MPI.html#Chunking-the-FieldSet-with-dask and https://docs.dask.org).", - FileWarning, - stacklevel=2, - ) - loni, lonname, lonvalue = self._is_dimension_in_dataset("lon") - lati, latname, latvalue = self._is_dimension_in_dataset("lat") - if lati is not None and loni is not None and lati >= 0 and loni >= 0: - pDim = int(math.floor(math.sqrt(chunk_cap / np.dtype(np.float64).itemsize))) - init_chunk_dict[latname] = min(latvalue, pDim) - init_chunk_map[lati] = min(latvalue, pDim) - init_chunk_dict[lonname] = min(lonvalue, pDim) - init_chunk_map[loni] = min(lonvalue, pDim) - timei, timename, timevalue = self._is_dimension_in_dataset("time") - if timei is not None and timei >= 0: - init_chunk_dict[timename] = min(1, timevalue) - init_chunk_map[timei] = min(1, timevalue) - depthi, depthname, depthvalue = self._is_dimension_in_dataset("depth") - if depthi is not None and depthi >= 0: - init_chunk_dict[depthname] = max(1, depthvalue) - init_chunk_map[depthi] = max(1, depthvalue) - # ==== closing check-opened requested dataset ==== # - self.dataset.close() - # ==== check if the chunksize reading is successful. if not, load the file ONCE really into memory and ==== # - # ==== deduce the chunking from the array dims. ==== # - if len(init_chunk_dict) == 0 and self.chunksize not in [False, None, "auto"]: - self.autochunkingfailed = True - raise DaskChunkingError( - f"[{self.__class__.__name__}]: No correct mapping found between Parcels- and NetCDF dimensions! Please correct the 'FieldSet(..., chunksize=...)' parameter and try again.", - ) - else: - self.autochunkingfailed = False - try: - self.dataset = xr.open_dataset( - str(self.filename), decode_cf=True, engine=self.netcdf_engine, chunks=init_chunk_dict, lock=False - ) - if isinstance(self.chunksize, dict): - self.chunksize = init_chunk_dict - except: - warnings.warn( - f"Chunking with init_chunk_dict = {init_chunk_dict} failed - Executing Dask chunking 'failsafe'...", - FileWarning, - stacklevel=2, - ) - self.autochunkingfailed = True - if not self.autochunkingfailed: - init_chunk_dict = self._failsafe_parse_() - if isinstance(self.chunksize, dict): - self.chunksize = init_chunk_dict - finally: - self.dataset.close() - self.chunk_mapping = init_chunk_map - self.dataset = None - # ==== self.dataset not available ==== # - return init_chunk_dict - - @property - def data(self): - return self.data_access() - - def data_access(self): - data = self.dataset[self.name] - - ti = range(data.shape[0]) if self.ti is None else self.ti - data = self._apply_indices(data, ti) - if isinstance(data, xr.DataArray): - data = data.data - - if isinstance(data, da.core.Array): - if not self.chunking_finalized: - if self.chunksize == "auto": - # ==== as the chunksize is not initiated, the data is chunked automatically by Dask. ==== # - # ==== the resulting chunk dictionary is stored, to be re-used later. This prevents ==== # - # ==== the expensive re-calculation and PHYSICAL FILE RECHUNKING on each data access. ==== # - if data.shape[-2:] != data.chunksize[-2:]: - data = data.rechunk(self.chunksize) - self.chunk_mapping = {} - chunkIndex = 0 - startblock = 0 - for chunkDim in data.chunksize[startblock:]: - self.chunk_mapping[chunkIndex] = chunkDim - chunkIndex += 1 - self._chunkmap_to_chunksize() - if self.rechunk_callback_fields is not None: - self.rechunk_callback_fields() - self.chunking_finalized = True - else: - self.chunking_finalized = True - else: - da_data = da.from_array(data, chunks=self.chunksize) - if self.chunksize == "auto" and da_data.shape[-2:] == da_data.chunksize[-2:]: - data = np.array(data) - else: - data = da_data - if not self.chunking_finalized and self.rechunk_callback_fields is not None: - self.rechunk_callback_fields() - self.chunking_finalized = True - - return data.astype(self.cast_data_dtype) - - -class DeferredDaskFileBuffer(DaskFileBuffer): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) diff --git a/parcels/fieldset.py b/parcels/fieldset.py index 4f6458d5ad..6385f3fb25 100644 --- a/parcels/fieldset.py +++ b/parcels/fieldset.py @@ -5,7 +5,6 @@ from copy import deepcopy from glob import glob -import dask.array as da import numpy as np from parcels._compat import MPI @@ -341,7 +340,6 @@ def from_netcdf( timestamps=None, allow_time_extrapolation: bool | None = None, deferred_load=True, - chunksize=None, **kwargs, ): """Initialises FieldSet object from NetCDF files. @@ -400,10 +398,6 @@ def from_netcdf( gridindexingtype : str The type of gridindexing. Either 'nemo' (default), 'mitgcm', 'mom5', 'pop', or 'croco' are supported. See also the Grid indexing documentation on oceanparcels.org - chunksize : - size of the chunks in dask loading. Default is None (no chunking). Can be None or False (no chunking), - 'auto' (chunking is done in the background, but results in one grid per field individually), or a dict in the format - ``{parcels_varname: {netcdf_dimname : (parcels_dimname, chunksize_as_int)}, ...}``, where ``parcels_dimname`` is one of ('time', 'depth', 'lat', 'lon') netcdf_engine : engine to use for netcdf reading in xarray. Default is 'netcdf', but in cases where this doesn't work, setting netcdf_engine='scipy' could help. Accepted options are the same as the ``engine`` parameter in ``xarray.open_dataset()``. @@ -450,9 +444,6 @@ def from_netcdf( cls.checkvaliddimensionsdict(dims) inds = indices[var] if (indices and var in indices) else indices fieldtype = fieldtype[var] if (fieldtype and var in fieldtype) else fieldtype - varchunksize = ( - chunksize[var] if (chunksize and var in chunksize) else chunksize - ) # -> {: (, ) } grid = None dFiles = None @@ -461,18 +452,11 @@ def from_netcdf( procdims = dimensions[procvar] if procvar in dimensions else dimensions procinds = indices[procvar] if (indices and procvar in indices) else indices procpaths = filenames[procvar] if isinstance(filenames, dict) and procvar in filenames else filenames - procchunk = chunksize[procvar] if (chunksize and procvar in chunksize) else chunksize nowpaths = filenames[var] if isinstance(filenames, dict) and var in filenames else filenames if procdims == dims and procinds == inds: possibly_samegrid = True - if procchunk != varchunksize: - for dim in varchunksize: - if varchunksize[dim][1] != procchunk[dim][1]: - possibly_samegrid &= False if not possibly_samegrid: break - if varchunksize == "auto": - break if "depth" in dims and dims["depth"] == "not_yet_set": break processedGrid = False @@ -499,7 +483,6 @@ def from_netcdf( allow_time_extrapolation=allow_time_extrapolation, deferred_load=deferred_load, fieldtype=fieldtype, - chunksize=varchunksize, dataFiles=dFiles, **kwargs, ) @@ -518,7 +501,6 @@ def from_nemo( mesh: Mesh = "spherical", allow_time_extrapolation: bool | None = None, tracer_interp_method: InterpMethodOption = "cgrid_tracer", - chunksize=None, **kwargs, ): """Initialises FieldSet object from NetCDF files of Curvilinear NEMO fields. @@ -586,8 +568,6 @@ def from_nemo( tracer_interp_method : str Method for interpolation of tracer fields. It is recommended to use 'cgrid_tracer' (default) Note that in the case of from_nemo() and from_c_grid_dataset(), the velocity fields are default to 'cgrid_velocity' - chunksize : - size of the chunks in dask loading. Default is None (no chunking) **kwargs : Keyword arguments passed to the :func:`Fieldset.from_c_grid_dataset` constructor. @@ -606,7 +586,6 @@ def from_nemo( indices=indices, allow_time_extrapolation=allow_time_extrapolation, tracer_interp_method=tracer_interp_method, - chunksize=chunksize, gridindexingtype="nemo", **kwargs, ) @@ -624,7 +603,6 @@ def from_mitgcm( mesh: Mesh = "spherical", allow_time_extrapolation: bool | None = None, tracer_interp_method: InterpMethodOption = "cgrid_tracer", - chunksize=None, **kwargs, ): """Initialises FieldSet object from NetCDF files of MITgcm fields. @@ -656,7 +634,6 @@ def from_mitgcm( indices=indices, allow_time_extrapolation=allow_time_extrapolation, tracer_interp_method=tracer_interp_method, - chunksize=chunksize, gridindexingtype="mitgcm", **kwargs, ) @@ -673,7 +650,6 @@ def from_croco( mesh="spherical", allow_time_extrapolation=None, tracer_interp_method="cgrid_tracer", - chunksize=None, **kwargs, ): """Initialises FieldSet object from NetCDF files of CROCO fields. @@ -738,7 +714,6 @@ def from_croco( indices=indices, allow_time_extrapolation=allow_time_extrapolation, interp_method=interp_method, - chunksize=chunksize, gridindexingtype="croco", **kwargs, ) @@ -759,7 +734,6 @@ def from_c_grid_dataset( allow_time_extrapolation: bool | None = None, tracer_interp_method: InterpMethodOption = "cgrid_tracer", gridindexingtype: GridIndexingType = "nemo", - chunksize=None, **kwargs, ): """Initialises FieldSet object from NetCDF files of Curvilinear NEMO fields. @@ -825,8 +799,6 @@ def from_c_grid_dataset( gridindexingtype : str The type of gridindexing. Set to 'nemo' in FieldSet.from_nemo(), 'mitgcm' in FieldSet.from_mitgcm() or 'croco' in FieldSet.from_croco(). See also the Grid indexing documentation on oceanparcels.org (Default value = 'nemo') - chunksize : - size of the chunks in dask loading. (Default value = None) **kwargs : Keyword arguments passed to the :func:`Fieldset.from_netcdf` constructor. """ @@ -860,7 +832,6 @@ def from_c_grid_dataset( indices=indices, allow_time_extrapolation=allow_time_extrapolation, interp_method=interp_method, - chunksize=chunksize, gridindexingtype=gridindexingtype, **kwargs, ) @@ -875,7 +846,6 @@ def from_pop( mesh: Mesh = "spherical", allow_time_extrapolation: bool | None = None, tracer_interp_method: InterpMethodOption = "bgrid_tracer", - chunksize=None, depth_units="m", **kwargs, ): @@ -942,8 +912,6 @@ def from_pop( tracer_interp_method : str Method for interpolation of tracer fields. It is recommended to use 'bgrid_tracer' (default) Note that in the case of from_pop() and from_b_grid_dataset(), the velocity fields are default to 'bgrid_velocity' - chunksize : - size of the chunks in dask loading (Default value = None) depth_units : The units of the vertical dimension. Default in Parcels is 'm', but many POP outputs are in 'cm' @@ -961,7 +929,6 @@ def from_pop( indices=indices, allow_time_extrapolation=allow_time_extrapolation, tracer_interp_method=tracer_interp_method, - chunksize=chunksize, gridindexingtype="pop", **kwargs, ) @@ -993,7 +960,6 @@ def from_mom5( mesh: Mesh = "spherical", allow_time_extrapolation: bool | None = None, tracer_interp_method: InterpMethodOption = "bgrid_tracer", - chunksize=None, **kwargs, ): """Initialises FieldSet object from NetCDF files of MOM5 fields. @@ -1056,8 +1022,6 @@ def from_mom5( tracer_interp_method : str Method for interpolation of tracer fields. It is recommended to use 'bgrid_tracer' (default) Note that in the case of from_mom5() and from_b_grid_dataset(), the velocity fields are default to 'bgrid_velocity' - chunksize : - size of the chunks in dask loading (Default value = None) **kwargs : Keyword arguments passed to the :func:`Fieldset.from_b_grid_dataset` constructor. """ @@ -1071,7 +1035,6 @@ def from_mom5( indices=indices, allow_time_extrapolation=allow_time_extrapolation, tracer_interp_method=tracer_interp_method, - chunksize=chunksize, gridindexingtype="mom5", **kwargs, ) @@ -1112,7 +1075,6 @@ def from_b_grid_dataset( mesh: Mesh = "spherical", allow_time_extrapolation: bool | None = None, tracer_interp_method: InterpMethodOption = "bgrid_tracer", - chunksize=None, **kwargs, ): """Initialises FieldSet object from NetCDF files of Bgrid fields. @@ -1174,8 +1136,6 @@ def from_b_grid_dataset( tracer_interp_method : str Method for interpolation of tracer fields. It is recommended to use 'bgrid_tracer' (default) Note that in the case of from_pop() and from_b_grid_dataset(), the velocity fields are default to 'bgrid_velocity' - chunksize : - size of the chunks in dask loading (Default value = None) **kwargs : Keyword arguments passed to the :func:`Fieldset.from_netcdf` constructor. """ @@ -1209,7 +1169,6 @@ def from_b_grid_dataset( indices=indices, allow_time_extrapolation=allow_time_extrapolation, interp_method=interp_method, - chunksize=chunksize, **kwargs, ) @@ -1223,7 +1182,6 @@ def from_parcels( extra_fields=None, allow_time_extrapolation: bool | None = None, deferred_load=True, - chunksize=None, **kwargs, ): """Initialises FieldSet data from NetCDF files using the Parcels FieldSet.write() conventions. @@ -1252,8 +1210,6 @@ def from_parcels( fully load them (default: True). It is advised to deferred load the data, since in that case Parcels deals with a better memory management during particle set execution. deferred_load=False is however sometimes necessary for plotting the fields. - chunksize : - size of the chunks in dask loading (Default value = None) uvar : (Default value = 'vozocrtx') vvar : @@ -1280,7 +1236,6 @@ def from_parcels( dimensions=dimensions, allow_time_extrapolation=allow_time_extrapolation, deferred_load=deferred_load, - chunksize=chunksize, **kwargs, ) @@ -1455,7 +1410,7 @@ def computeTimeChunk(self, time=0.0, dt=1): Parameters ---------- time : - Time around which the FieldSet chunks are to be loaded. + Time around which the FieldSet data are to be loaded. Time is provided as a double, relatively to Fieldset.time_origin. Default is 0. dt : @@ -1489,12 +1444,11 @@ def computeTimeChunk(self, time=0.0, dt=1): for i in range(len(f.data)): del f.data[i, :] - lib = np if f.chunksize in [False, None] else da if f.gridindexingtype == "pop" and g.zdim > 1: zd = g.zdim - 1 else: zd = g.zdim - data = lib.empty( + data = np.empty( (g.tdim, zd, g.ydim - 2 * g.meridional_halo, g.xdim - 2 * g.zonal_halo), dtype=np.float32 ) f._loaded_time_indices = range(2) @@ -1509,21 +1463,13 @@ def computeTimeChunk(self, time=0.0, dt=1): if isinstance(f.data, DeferredArray): f.data = DeferredArray() f.data = f._reshape(data) - if not f._chunk_set: - f._chunk_setup() - if len(g._load_chunk) > g._chunk_not_loaded: - g._load_chunk = np.where( - g._load_chunk == g._chunk_loaded_touched, g._chunk_loading_requested, g._load_chunk - ) - g._load_chunk = np.where(g._load_chunk == g._chunk_deprecated, g._chunk_not_loaded, g._load_chunk) elif g._update_status == "updated": - lib = np if isinstance(f.data, np.ndarray) else da if f.gridindexingtype == "pop" and g.zdim > 1: zd = g.zdim - 1 else: zd = g.zdim - data = lib.empty( + data = np.empty( (g.tdim, zd, g.ydim - 2 * g.meridional_halo, g.xdim - 2 * g.zonal_halo), dtype=np.float32 ) if signdt >= 0: @@ -1543,53 +1489,22 @@ def computeTimeChunk(self, time=0.0, dt=1): data = f._rescale_and_set_minmax(data) if signdt >= 0: data = f._reshape(data)[1, :] - if lib is da: - f.data = lib.stack([f.data[1, :], data], axis=0) - else: - if not isinstance(f.data, DeferredArray): - if isinstance(f.data, list): - del f.data[0, :] - else: - f.data[0, :] = None - f.data[0, :] = f.data[1, :] - f.data[1, :] = data + if not isinstance(f.data, DeferredArray): + if isinstance(f.data, list): + del f.data[0, :] + else: + f.data[0, :] = None + f.data[0, :] = f.data[1, :] + f.data[1, :] = data else: data = f._reshape(data)[0, :] - if lib is da: - f.data = lib.stack([data, f.data[0, :]], axis=0) - else: - if not isinstance(f.data, DeferredArray): - if isinstance(f.data, list): - del f.data[1, :] - else: - f.data[1, :] = None - f.data[1, :] = f.data[0, :] - f.data[0, :] = data - g._load_chunk = np.where( - g._load_chunk == g._chunk_loaded_touched, g._chunk_loading_requested, g._load_chunk - ) - g._load_chunk = np.where(g._load_chunk == g._chunk_deprecated, g._chunk_not_loaded, g._load_chunk) - if isinstance(f.data, da.core.Array) and len(g._load_chunk) > 0: - if signdt >= 0: - for block_id in range(len(g._load_chunk)): - if g._load_chunk[block_id] == g._chunk_loaded_touched: - if f._data_chunks[block_id] is None: - # file chunks were never loaded. - # happens when field not called by kernel, but shares a grid with another field called by kernel - break - block = f.get_block(block_id) - f._data_chunks[block_id][0] = None - f._data_chunks[block_id][1] = np.array(f.data.blocks[(slice(2),) + block][1]) - else: - for block_id in range(len(g._load_chunk)): - if g._load_chunk[block_id] == g._chunk_loaded_touched: - if f._data_chunks[block_id] is None: - # file chunks were never loaded. - # happens when field not called by kernel, but shares a grid with another field called by kernel - break - block = f.get_block(block_id) - f._data_chunks[block_id][1] = None - f._data_chunks[block_id][0] = np.array(f.data.blocks[(slice(2),) + block][0]) + if not isinstance(f.data, DeferredArray): + if isinstance(f.data, list): + del f.data[1, :] + else: + f.data[1, :] = None + f.data[1, :] = f.data[0, :] + f.data[0, :] = data # do user-defined computations on fieldset data if self.compute_on_defer: self.compute_on_defer(self) diff --git a/parcels/grid.py b/parcels/grid.py index a87cdec792..903dce4a52 100644 --- a/parcels/grid.py +++ b/parcels/grid.py @@ -75,9 +75,6 @@ def __init__( self._lonlat_minmax = np.array( [np.nanmin(lon), np.nanmax(lon), np.nanmin(lat), np.nanmax(lat)], dtype=np.float32 ) - self._load_chunk: npt.NDArray = np.array([]) - self.chunk_info = None - self.chunksize = None self.depth_field = None def __repr__(self): @@ -246,26 +243,6 @@ def _computeTimeChunk(self, f, time, signdt): nextTime_loc = self.time[0] return nextTime_loc - @property - def _chunk_not_loaded(self): - return 0 - - @property - def _chunk_loading_requested(self): - return 1 - - @property - def _chunk_loaded_touched(self): - return 2 - - @property - def _chunk_deprecated(self): - return 3 - - @property - def _chunk_loaded(self): - return [2, 3] - class RectilinearGrid(Grid): """Rectilinear Grid class diff --git a/parcels/gridset.py b/parcels/gridset.py index e3b6191103..3609ba795d 100644 --- a/parcels/gridset.py +++ b/parcels/gridset.py @@ -13,8 +13,6 @@ def add_grid(self, field): grid = field.grid existing_grid = False for g in self.grids: - if field.chunksize == "auto": - break if g == grid: existing_grid = True break @@ -28,12 +26,6 @@ def add_grid(self, field): sameGrid = False break - if (g.chunksize != grid.chunksize) and (grid.chunksize not in [False, None]): - for dim in grid.chunksize: - if grid.chunksize[dim][1] != g.chunksize[dim][1]: - sameGrid &= False - break - if sameGrid: existing_grid = True field._grid = g # TODO: Is this even necessary? diff --git a/parcels/interaction/interactionkernel.py b/parcels/interaction/interactionkernel.py index ef01dbd5c0..f2192c18f6 100644 --- a/parcels/interaction/interactionkernel.py +++ b/parcels/interaction/interactionkernel.py @@ -185,13 +185,6 @@ def execute(self, pset, endtime, dt, output_file=None): stacklevel=2, ) - if pset.fieldset is not None: - for g in pset.fieldset.gridset.grids: - if len(g._load_chunk) > g._chunk_not_loaded: # not the case if a field in not called in the kernel - g._load_chunk = np.where( - g._load_chunk == g._chunk_loaded_touched, g._chunk_deprecated, g._load_chunk - ) - self.execute_python(pset, endtime, dt) # Remove all particles that signalled deletion diff --git a/parcels/kernel.py b/parcels/kernel.py index 08224bb0c6..2bfcd41dd3 100644 --- a/parcels/kernel.py +++ b/parcels/kernel.py @@ -336,12 +336,6 @@ def execute(self, pset, endtime, dt): ) if pset.fieldset is not None: - for g in pset.fieldset.gridset.grids: - if len(g._load_chunk) > g._chunk_not_loaded: # not the case if a field in not called in the kernel - g._load_chunk = np.where( - g._load_chunk == g._chunk_loaded_touched, g._chunk_deprecated, g._load_chunk - ) - for f in self.fieldset.get_fields(): if isinstance(f, (VectorField, NestedField)): continue diff --git a/parcels/tools/statuscodes.py b/parcels/tools/statuscodes.py index 437c83e0d6..7a9799dc7c 100644 --- a/parcels/tools/statuscodes.py +++ b/parcels/tools/statuscodes.py @@ -29,12 +29,6 @@ class StatusCode: ErrorTimeExtrapolation = 70 -class DaskChunkingError(RuntimeError): - """Error indicating to the user that something with setting up Dask and chunked fieldsets went wrong.""" - - pass - - class FieldSamplingError(RuntimeError): """Utility error class to propagate erroneous field sampling.""" diff --git a/tests/test_fieldset.py b/tests/test_fieldset.py index cb63ee9af9..79963307d6 100644 --- a/tests/test_fieldset.py +++ b/tests/test_fieldset.py @@ -13,7 +13,6 @@ Variable, ) from parcels.field import Field, VectorField -from parcels.fieldfilebuffer import DaskFileBuffer from parcels.tools.converters import ( GeographicPolar, UnitConverter, @@ -235,7 +234,7 @@ def test_fieldset_from_file_subsets(indslon, indslat, tmpdir): fieldsetfull.write(filepath) indices = {"lon": indslon, "lat": indslat} indices_back = indices.copy() - fieldsetsub = FieldSet.from_parcels(filepath, indices=indices, chunksize=None) + fieldsetsub = FieldSet.from_parcels(filepath, indices=indices) assert indices == indices_back assert np.allclose(fieldsetsub.U.lon, fieldsetfull.U.grid.lon[indices["lon"]]) assert np.allclose(fieldsetsub.U.lat, fieldsetfull.U.grid.lat[indices["lat"]]) @@ -311,8 +310,7 @@ def test_add_field_after_pset(fieldtype): fieldset.add_vector_field(vfield) -@pytest.mark.parametrize("chunksize", ["auto", None]) -def test_fieldset_samegrids_from_file(tmpdir, chunksize): +def test_fieldset_samegrids_from_file(tmpdir): """Test for subsetting fieldset from file using indices dict.""" data, dimensions = generate_fieldset_data(100, 100) filepath1 = tmpdir.join("test_subsets_1") @@ -326,17 +324,10 @@ def test_fieldset_samegrids_from_file(tmpdir, chunksize): files = {"U": ufiles, "V": vfiles} variables = {"U": "vozocrtx", "V": "vomecrty"} dimensions = {"lon": "nav_lon", "lat": "nav_lat"} - fieldset = FieldSet.from_netcdf( - files, variables, dimensions, timestamps=timestamps, allow_time_extrapolation=True, chunksize=chunksize - ) + fieldset = FieldSet.from_netcdf(files, variables, dimensions, timestamps=timestamps, allow_time_extrapolation=True) - if chunksize == "auto": - assert fieldset.gridset.size == 2 - assert fieldset.U.grid != fieldset.V.grid - else: - assert fieldset.gridset.size == 1 - assert fieldset.U.grid == fieldset.V.grid - assert fieldset.U.chunksize == fieldset.V.chunksize + assert fieldset.gridset.size == 1 + assert fieldset.U.grid == fieldset.V.grid @pytest.mark.parametrize("gridtype", ["A", "C"]) @@ -353,8 +344,7 @@ def test_fieldset_dimlength1_cgrid(gridtype): assert success -@pytest.mark.parametrize("chunksize", ["auto", None]) -def test_fieldset_diffgrids_from_file(tmpdir, chunksize): +def test_fieldset_diffgrids_from_file(tmpdir): """Test for subsetting fieldset from file using indices dict.""" filename = "test_subsets" data, dimensions = generate_fieldset_data(100, 100) @@ -374,15 +364,12 @@ def test_fieldset_diffgrids_from_file(tmpdir, chunksize): variables = {"U": "vozocrtx", "V": "vomecrty"} dimensions = {"lon": "nav_lon", "lat": "nav_lat"} - fieldset = FieldSet.from_netcdf( - files, variables, dimensions, timestamps=timestamps, allow_time_extrapolation=True, chunksize=chunksize - ) + fieldset = FieldSet.from_netcdf(files, variables, dimensions, timestamps=timestamps, allow_time_extrapolation=True) assert fieldset.gridset.size == 2 assert fieldset.U.grid != fieldset.V.grid -@pytest.mark.parametrize("chunksize", ["auto", None]) -def test_fieldset_diffgrids_from_file_data(tmpdir, chunksize): +def test_fieldset_diffgrids_from_file_data(tmpdir): """Test for subsetting fieldset from file using indices dict.""" data, dimensions = generate_fieldset_data(100, 100) filepath = tmpdir.join("test_subsets") @@ -399,16 +386,13 @@ def test_fieldset_diffgrids_from_file_data(tmpdir, chunksize): variables = {"U": "vozocrtx", "V": "vomecrty"} dimensions = {"lon": "nav_lon", "lat": "nav_lat"} fieldset_file = FieldSet.from_netcdf( - files, variables, dimensions, timestamps=timestamps, allow_time_extrapolation=True, chunksize=chunksize + files, variables, dimensions, timestamps=timestamps, allow_time_extrapolation=True ) fieldset_file.add_field(field_data, "B") fields = [f for f in fieldset_file.get_fields() if isinstance(f, Field)] assert len(fields) == 3 - if chunksize == "auto": - assert fieldset_file.gridset.size == 3 - else: - assert fieldset_file.gridset.size == 2 + assert fieldset_file.gridset.size == 2 assert fieldset_file.U.grid != fieldset_file.B.grid @@ -554,40 +538,6 @@ def UpdateU(particle, fieldset, time): # pragma: no cover assert np.allclose(fieldset.U.data, da["U"].values, atol=1.0) -@pytest.mark.parametrize( - "chunksize", - [ - False, - "auto", - {"lat": ("y", 32), "lon": ("x", 32)}, - {"time": ("time_counter", 1), "lat": ("y", 32), "lon": ("x", 32)}, - ], -) -@pytest.mark.parametrize("deferLoad", [True, False]) -def test_from_netcdf_chunking(chunksize, deferLoad): - fnameU = str(TEST_DATA / "perlinfieldsU.nc") - fnameV = str(TEST_DATA / "perlinfieldsV.nc") - ufiles = [fnameU] * 4 - vfiles = [fnameV] * 4 - timestamps = np.arange(0, 4, 1) * 86400.0 - timestamps = np.expand_dims(timestamps, 1) - files = {"U": ufiles, "V": vfiles} - variables = {"U": "vozocrtx", "V": "vomecrty"} - dimensions = {"lon": "nav_lon", "lat": "nav_lat"} - - fieldset = FieldSet.from_netcdf( - files, - variables, - dimensions, - timestamps=timestamps, - deferred_load=deferLoad, - allow_time_extrapolation=True, - chunksize=chunksize, - ) - pset = ParticleSet.from_line(fieldset, size=1, pclass=Particle, start=(0.5, 0.5), finish=(0.5, 0.5)) - pset.execute(AdvectionRK4, dt=1, runtime=1) - - @pytest.mark.parametrize("datetype", ["float", "datetime64"]) def test_timestamps(datetype, tmpdir): data1, dims1 = generate_fieldset_data(10, 10, 1, 10) @@ -849,17 +799,3 @@ def SampleU(particle, fieldset, time): # pragma: no cover runtime = tdim * 2 if time_extrapolation else None pset.execute(SampleU, dt=direction, runtime=runtime) assert pset.p == tdim - 1 if time_extrapolation else tdim - 2 - - -def test_daskfieldfilebuffer_dimnames(): - DaskFileBuffer.add_to_dimension_name_map_global({"lat": "nydim", "lon": "nxdim"}) - fnameU = str(TEST_DATA / "perlinfieldsU.nc") - dimensions = {"lon": "nav_lon", "lat": "nav_lat"} - fb = DaskFileBuffer(fnameU, dimensions, indices={}) - assert ("nxdim" in fb._static_name_maps["lon"]) and ("ntdim" not in fb._static_name_maps["time"]) - fb.add_to_dimension_name_map({"time": "ntdim", "depth": "nddim"}) - assert ("nxdim" in fb._static_name_maps["lon"]) and ("ntdim" in fb._static_name_maps["time"]) - assert fb._get_available_dims_indices_by_request() == {"time": None, "depth": None, "lat": 0, "lon": 1} - assert fb._get_available_dims_indices_by_namemap() == {"time": 0, "depth": 1, "lat": 2, "lon": 3} - assert fb._is_dimension_chunked("lon") is False - assert fb._is_dimension_in_chunksize_request("lon") == (-1, "", 0) diff --git a/tests/test_fieldset_sampling.py b/tests/test_fieldset_sampling.py index 940ca4b763..fe7baca2e6 100644 --- a/tests/test_fieldset_sampling.py +++ b/tests/test_fieldset_sampling.py @@ -606,8 +606,7 @@ def SampleU(particle, fieldset, time): # pragma: no cover @pytest.mark.parametrize("npart", [1, 10]) -@pytest.mark.parametrize("chs", [False, "auto", {"lat": ("y", 10), "lon": ("x", 10)}]) -def test_sampling_multigrids_non_vectorfield_from_file(npart, tmpdir, chs): +def test_sampling_multigrids_non_vectorfield_from_file(npart, tmpdir): xdim, ydim = 100, 200 filepath = tmpdir.join("test_subsets") U = Field( @@ -641,15 +640,10 @@ def test_sampling_multigrids_non_vectorfield_from_file(npart, tmpdir, chs): files = {"U": ufiles, "V": vfiles, "B": bfiles} variables = {"U": "vozocrtx", "V": "vomecrty", "B": "B"} dimensions = {"lon": "nav_lon", "lat": "nav_lat"} - fieldset = FieldSet.from_netcdf( - files, variables, dimensions, timestamps=timestamps, allow_time_extrapolation=True, chunksize=chs - ) + fieldset = FieldSet.from_netcdf(files, variables, dimensions, timestamps=timestamps, allow_time_extrapolation=True) fieldset.add_constant("sample_depth", 2.5) - if chs == "auto": - assert fieldset.U.grid != fieldset.V.grid - else: - assert fieldset.U.grid is fieldset.V.grid + assert fieldset.U.grid is fieldset.V.grid assert fieldset.U.grid is not fieldset.B.grid TestParticle = Particle.add_variable("sample_var", initial=0.0)