-
-
Notifications
You must be signed in to change notification settings - Fork 304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat/batch creation #2665
base: main
Are you sure you want to change the base?
feat/batch creation #2665
Conversation
…/batch-creation
…into feat/batch-creation
…at/batch-creation
…to feat/batch-creation
…at/batch-creation
…into feat/batch-creation
this is now working, so I would appreciate some feedback on the design. The basic design is the same as what I outlined earlier in this PR: there are two new functions that take a approachbasically the same as concurrent group members listing, except we don't need any recursion. I'm scheduling writes and using new functions
Implicit groupsPartial hierarchies like streaming v2 vs v3 node creationcreating v3 arrays / groups requires writing 1 metadata document, but v2 requires 2. To get the most concurrency I await the write of each metadata document separately, which means that Overlap with metadata consolidation logicthere's a lot of similarity between the stuff in this PR and routines used for consolidated metadata. it would be great to find ways to factor out some of the overlap areas still to do:
|
@TomNicholas you should have a look at some of these new functions / methods. I'd be happy to change things if you have some datatree conventions you'd like to suggest |
This method takes a dictionary where the keys are the names of the arrays or groups | ||
to create and the values are the metadata or objects representing the arrays or groups. | ||
|
||
The method returns an asynchronous iterator over the created nodes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A note about when to use this would be great
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, would something like "use this method to create an entire tree of sub-groups and / or sub-arrays efficiently." suffice?
@@ -1407,6 +1395,42 @@ async def _members( | |||
): | |||
yield member | |||
|
|||
# TODO: find a better name for this. create_tree could work. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hierarchy is fine to me.
""" | ||
ctx: asyncio.Semaphore | contextlib.nullcontext[None] | ||
|
||
if semaphore is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need None
as an option here? And why is it "no semaphore" instead of "default concurrency semaphore"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create_nodes
is low-level, kind of dangerous, and the expectation is that the user of this function knows what they are doing. So it doesn't default with any concurrency limit. It also doesn't check if nodes already exist, or if the user wants to nest arrays inside arrays. A higher level function (create_hierarchy
) does all that checking, and that's where the concurrency limit defaults to the value in the config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so why not require the semaphore if it's an advanced user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the semaphore is only necessary if you want some concurrency limiting mechanism, but I think that's a special case. A default of None
means that users of the function don't need to run from asyncio import Semaphore
before creating some nodes. E.g., there are a lot of places in the tests where I would want to use create_nodes
, and in basically none of those places would a semaphore be necessary.
# We will iterate over the dict again, but a full pass here ensures that the error message | ||
# is comprehensive, and I think the performance cost will be negligible. | ||
for k, v in data.items(): | ||
observed_zarr_formats[v.zarr_format].append(k) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we short-circuit inside the loop and raise on the first instance of zarr_format
that is not equal to next(data.items())[1].zarr_format
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we definitely could, but we get a much nicer error message if we can traverse the entire proposed hierarchy and identify all the problematic nodes.
document stored at store_path.path / (.zgroup | .zarray). If no such document is found, | ||
raise a KeyError. | ||
""" | ||
# TODO: consider first fetching array metadata, and only fetching group metadata when we don't |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it's better to just grab all 3 at once honestly
""" | ||
|
||
|
||
def create_array_metadata( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like this would duplicate logic form elsewhere? Is there nowhere else this function could be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be duplicated, but sadly these functions don't exist in the codebase yet! I add some functions like this in another PR: #2761
question: should we export a sync version of |
Yes, this would be used in Xarray. |
this PR adds a few functions that have async implementations and sync wrappers, like |
This PR adds a few routines for creating a collection of arrays and groups (i.e., a dict with path-like keys and
ArrayMetadata
/GroupMetadata
values) in storage concurrently.create_hierarchy
takes a dict representation of a hierarchy, parses that dict to ensure that there are no implicit groups (creating group metadata documents as needed), then invokescreate_nodes
and yields the resultscreate_nodes
concurrently writes metadata documents to storage, and yields the createdAsyncArray
/AsyncGroup
instances.I still need to wire up concurrency limits, and test them.
TODO: