11__author__ = "Daniel Westwood"
223- __copyright__ = "Copyright 2023 United Kingdom Research and Innovation"
4-
5- # Chunk wrapper is common to both CFAPyX and XarrayActive
6- VERSION = '1.2.1'
3+ __copyright__ = "Copyright 2024 United Kingdom Research and Innovation"
74
85import numpy as np
96import netCDF4
107
118from itertools import product
12- from copy import deepcopy
9+ import math
1310from dask .utils import SerializableLock
11+ from dask .array .core import normalize_chunks
1412
1513try :
1614 from XarrayActive import ActiveChunk
@@ -57,15 +55,17 @@ def ndim(self):
5755 def copy (self , ** kwargs ):
5856 """
5957 Return a new basic ArrayLike instance. Ignores provided kwargs
60- this class does not require, but other inheritors may."""
58+ this class does not require, but other inheritors may.
59+ """
6160 return ArrayLike (
6261 self .shape ,
6362 ** self .get_kwargs ()
6463 )
6564
6665 def get_kwargs (self ):
6766 """
68- Get the kwargs provided to this class initially - for creating a copy."""
67+ Get the kwargs provided to this class initially - for creating a copy.
68+ """
6969 return {
7070 'units' :self .units ,
7171 'dtype' :self .dtype ,
@@ -76,7 +76,8 @@ class SuperLazyArrayLike(ArrayLike):
7676 """
7777 Container class for SuperLazy Array-Like behaviour. ``SuperLazy`` behaviour is
7878 defined as Lazy-Slicing behaviour for objects that are below the 'Dask Surface',
79- i.e for object that serve as Dask Chunks."""
79+ i.e for object that serve as Dask Chunks.
80+ """
8081
8182 description = "Container class for SuperLazy Array-Like behaviour"
8283
@@ -127,7 +128,35 @@ def shape(self, value):
127128 self ._shape = value
128129
129130 def get_extent (self ):
130- return tuple (self ._extent )
131+ """
132+ Method for getting private variable `_extent` outside of this class.
133+ """
134+ return tuple (self ._smooth_extent (self ._extent ))
135+
136+ def set_extent (self , extent ):
137+ """
138+ Method for directly setting the `_extent` for this class, once it has been
139+ 'smoothed'."""
140+ self ._extent = self ._smooth_extent (extent )
141+
142+ def _smooth_extent (self , extent ):
143+ """
144+ Replace values of None within each provided slice of the extent with integer
145+ values derived from the current shape.
146+ """
147+ if len (extent ) != self .ndim :
148+ raise ValueError (
149+ 'Direct assignment of truncated extent is not supported.'
150+ )
151+
152+ smoothextent = []
153+ for x , ext in enumerate (extent ):
154+ start = ext .start or 0
155+ stop = ext .stop or self .shape [x ]
156+ step = ext .step or 1
157+ smoothextent .append (slice (start ,stop ,step ))
158+
159+ return smoothextent
131160
132161 def copy (self , extent = None ):
133162 """
@@ -215,7 +244,7 @@ def __init__(self,
215244
216245 if extent :
217246 # Apply a specific extent if given by the initiator
218- self ._extent = extent
247+ self .set_extent ( extent )
219248
220249 def __array__ (self , * args , ** kwargs ):
221250 """
@@ -233,7 +262,7 @@ def __array__(self, *args, **kwargs):
233262 if args :
234263 dtype = args [0 ]
235264
236- if dtype != self .dtype :
265+ if dtype and dtype != self .dtype :
237266 raise ValueError (
238267 'Requested datatype does not match this chunk'
239268 )
@@ -267,7 +296,7 @@ def __array__(self, *args, **kwargs):
267296 self ._correct_slice (array .dimensions )
268297
269298 try :
270- var = np .array (array [tuple (self ._extent )])
299+ var = np .array (array [tuple (self ._extent )], dtype = self . dtype )
271300 except IndexError :
272301 raise ValueError (
273302 f"Unable to select required 'extent' of { self .extent } "
@@ -307,9 +336,9 @@ def _correct_slice(self, array_dims):
307336
308337 def _post_process_data (self , data ):
309338 """
310- Perform any post-processing steps on the data here.
311- - unit correction
312- - calendar correction
339+ Perform any post-processing steps on the data here. Method to be
340+ overriden by inherrited classes (CFAPyX.CFAPartition and
341+ XarrayActive.ActivePartition)
313342 """
314343 return data
315344
@@ -429,11 +458,10 @@ def _identical_extents(old, new, dshape):
429458
430459def get_chunk_space (chunk_shape , shape ):
431460 """
432- Derive the chunk space and shape given the user-provided ``chunks`` option.
433- Chunk space is the number of chunks in each dimension which presents like an array
434- shape, but is referred to as a ``space`` because it has a novel coordinate system.
435- Chunk shape is the shape of each chunk in ``array space``, which must be regular
436- even if lower-level objects used to define the chunk are not.
461+ Derive the chunk space from the ratio between the chunk shape and array shape in
462+ each dimension. Chunk space is the number of chunks in each dimension which is
463+ referred to as a ``space`` because it effectively represents the lengths of the each
464+ dimension in 'chunk space' rather than any particular chunk coordinate.
437465
438466 Example:
439467 50 chunks across the time dimension of 1000 values which is represented by 8
@@ -452,9 +480,29 @@ def get_chunk_space(chunk_shape, shape):
452480
453481 """
454482
455- return tuple ([int (i / j ) for i , j in zip (shape , chunk_shape )])
483+ space = tuple ([math .ceil (i / j ) for i , j in zip (shape , chunk_shape )])
484+ return space
456485
457486def get_chunk_shape (chunks , shape , dims , chunk_limits = True ):
487+ """
488+ Calculate the chunk shape from the user-provided ``chunks`` parameter,
489+ the array shape and named dimensions, and apply chunk limits if enabled.
490+
491+ :param chunks: (dict) The user specified chunks, which match the usual dask
492+ chunks from xr.open_dataset, except these come from the ``cfa_options``.
493+
494+ :param shape: (tuple) The array shape of the data array to be chunked.
495+
496+ :param dims: (tuple) The names of each dimension to match to the ``chunks``
497+ provided.
498+
499+ :param chunk_limits (bool) Option to disable, chunk limits will prevent chunking
500+ to beyond a useful chunk size which is likely to be much less than the memory
501+ chunk size of the source files, in which case there would be a lot of wasted
502+ data retrieval.
503+
504+ :returns: A tuple of the shape of each chunk in ``array space`` for each dimension.
505+ """
458506 chunk_shape = [i for i in shape ]
459507
460508 for dim in chunks .keys ():
@@ -464,22 +512,28 @@ def get_chunk_shape(chunks, shape, dims, chunk_limits=True):
464512 if d == dim :
465513 idim = x
466514
467- if not idim :
515+ if idim == None :
468516 raise ValueError (
469517 f"Requested chunking across dimension '{ dim } '"
470518 f"but only '{ dims } ' present in the dataset"
471519 )
472520
521+ # Apply chunk limits unless disabled.
473522 min_size = int (shape [idim ]/ np .prod (shape ))
474523 if chunk_limits :
475- min_size = int (min_size * 2e6 )
524+ min_size = int (min_size * 2e6 )
525+ # 2M data points is the smallest total size allowed.
476526
477527 chunk_size = chunks [dim ]
478528 chunk_shape [idim ] = max (chunk_size , min_size )
479529
480530 return tuple (chunk_shape )
481531
482532def get_chunk_positions (chunk_space ):
533+ """
534+ Get the list of chunk positions in ``chunk space`` given the size
535+ of the space.
536+ """
483537 origin = [0 for i in chunk_space ]
484538
485539 positions = [
@@ -491,9 +545,13 @@ def get_chunk_positions(chunk_space):
491545 return positions
492546
493547def get_chunk_extent (position , shape , chunk_space ):
548+ """
549+ Get the extent of a particular chunk within the space given its position,
550+ the array shape and the extent of the chunk space.
551+ """
494552 extent = []
495553 for dim in range (len (position )):
496- pos_index = position [dim ]
554+ pos_index = position [dim ]
497555 shape_size = shape [dim ]
498556 space_size = chunk_space [dim ]
499557
@@ -503,6 +561,7 @@ def get_chunk_extent(position, shape, chunk_space):
503561 int (pos_index * conversion ), int ((pos_index + 1 )* conversion )
504562 )
505563 extent .append (ext )
564+
506565 return extent
507566
508567def get_dask_chunks (
@@ -519,11 +578,13 @@ def get_dask_chunks(
519578
520579 :param fragment_space: (tuple) The shape of the array in ``fragment space``.
521580
522- :param extent: (dict) The global extent of each fragment - where it fits into the total array for this variable (in array space).
581+ :param extent: (dict) The global extent of each fragment - where it fits into
582+ the total array for this variable (in array space).
523583
524- :param dtype: (obj) The datatype for this variable.
584+ :param dtype: (obj) The datatype for this variable.
525585
526- :param explicit_shapes: (tuple) Set of shapes to apply to the fragments - currently not implemented outside this function.
586+ :param explicit_shapes: (tuple) Set of shapes to apply to the fragments - currently
587+ not implemented outside this function.
527588
528589 :returns: A tuple of the chunk sizes along each dimension.
529590 """
@@ -556,7 +617,8 @@ def get_dask_chunks(
556617 ## Handle explicit shapes for the fragments.
557618
558619 if isinstance (explicit_shapes , (str , Number )) or explicit_shapes is None :
559- fsizes_per_dim = [ # For each dimension, use fs or explicit_shapes if the dimension is fragmented or not respectively.
620+ # For each dimension, use fs or explicit_shapes if the dimension is fragmented or not respectively.
621+ fsizes_per_dim = [
560622 fs if i in fragmented_dim_indices else explicit_shapes for i , fs in enumerate (fsizes_per_dim )
561623 ]
562624 elif isinstance (explicit_shapes , dict ):
@@ -621,11 +683,29 @@ def combine_sliced_dim(old, new, dim):
621683
622684 return slice (start , stop , step )
623685
624-
625686 if not extent :
626687 return newslice
627688 else :
628689 for dim in range (len (newslice )):
629690 if not _identical_extents (extent [dim ], newslice [dim ], shape [dim ]):
630691 extent [dim ] = combine_sliced_dim (extent [dim ], newslice [dim ], dim )
631- return extent
692+ return extent
693+
694+ def normalize_partition_chunks (chunks , shape , dtype , named_dims ):
695+
696+ chunk_values = []
697+
698+ for nd in named_dims :
699+ if nd not in chunks .keys ():
700+ chunk_values .append ('auto' )
701+ continue
702+ try :
703+ chunk_values .append (int (chunks [nd ]))
704+ except ValueError :
705+ chunk_values .append (chunks [nd ])
706+
707+ return normalize_chunks (
708+ chunk_values ,
709+ shape ,
710+ dtype = dtype
711+ )
0 commit comments