Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic Dask-Zarr chunk alignment on the to_zarr method #9914

Open
josephnowak opened this issue Dec 21, 2024 · 20 comments
Open

Automatic Dask-Zarr chunk alignment on the to_zarr method #9914

josephnowak opened this issue Dec 21, 2024 · 20 comments

Comments

@josephnowak
Copy link
Contributor

josephnowak commented Dec 21, 2024

Is your feature request related to a problem?

In the time that I have used Xarray, I have seen many issues related to the alignment between the Dask and Zarr chunks, and most of them have been misunderstanding how to achieve a many-to-one relation between both chunks, for that reason, I would like to propose the addition of a new feature that allows aligning the chunks automatically, this should bring a significant reduction in the number of issues related to this problem and also simplify the way that Xarray, Dask, and Zarr interact from the user perspective.

Describe the solution you'd like

Add a new align chunks parameter. (Option A)

Pros:

  • It would not break the current behavior of the method.
  • It gives the users control over whether they want the automatic alignment.

Cons:

  • It adds an extra parameter to a method that is already complex from my perspective.
  • We will have to add extra validations on the code and docs, for example, if the automatic chunk alignment is enabled and the synchronizer is also present, does it make sense to realign the chunks? or if the safe_chunks is enabled together with the automatic alignment, should we raise an error, or should we give priority to the chunk alignment? this also makes me think, what happens if someone sends a synchronizer and the safe_chunks is enabled? I have not seen this specified in the docs.

Drop the safe_chunks parameter and always align the chunks if it is necessary. (Option B)

Pros:

  • Reduce the number of parameters in the to_zarr method.
  • Simplify the user experience, now the users could ignore the difference between Dask and Zarr Chunks, which should reduce the number of issues reported on this topic. It is important to highlight that If the synchronizer is set, then the automatic alignment of the chunks should be disabled. I think this is a better workflow and prevents data corruption in all the scenarios, this means that synchronizer would work as a safe_chunks = False.
  • Delete the possibility of corrupting the data, which I think is more important than affecting the performance.

Cons:

  • It would be a breaking change.
  • The automatic chunk alignment could increase the number of tasks in Dask or unexpectedly affect performance. A possible mitigation for this is to raise a warning indicating that the chunks were unaligned.

I think option B is the best for a good part of the users, it prevents corruption of the data and simplifies the workflow, but probably there are some use cases that I'm not taking into consideration that would explain why the safe_chunks parameter was added in first place.

Any feedback or different ideas are welcome

Describe alternatives you've considered

No response

Additional context

No response

@josephnowak
Copy link
Contributor Author

Hi @max-sixty here is the proposal based on the discussion of this issue, if anyone has another approach, idea, or change for this it would be good to discuss it, so we can implement the best possible option.

@josephnowak josephnowak changed the title Automatic chunk alignment on the to_zarr method Automatic Dask-Zarr chunk alignment on the to_zarr method Dec 21, 2024
@pjpetersik
Copy link

I really like this enhancement (both option A or B).

After an upgrade to xarray>=2024.10, I got as well these ValueErrors that are discussed here: #9767 Only after digging through this and other issues for a while, I could find out that using a synchronizer together with safe_chunks=False solves my problem. Therefore, speaking from the users perspective, both proposed enhancement would have probably saved me quite some time and trouble.

@josephnowak
Copy link
Contributor Author

josephnowak commented Jan 29, 2025

Hi @max-sixty , sorry for bothering you, I will have some free time during the weekend to work on this, and it would be good to receive some feedback from the Xarray team to decide what alternative could be implemented. More people are interested in this feature, so it looks like it could be helpful.

@brendan-m-murphy
Copy link

An option to specify an "offset" when chunking a DataArray would be useful. What I mean is: da.chunk(time=60, offset={"time": 30}) would lead to chunk sizes 30, 60, 60, 60, ... .

This would allow matching the first dask chunk to fill the last zarr chunk when appending data along an axis. (E.g. in this case, suppose the zarr chunk sizes are 60, but the last chunk written is only half full. If you chunk the dask array to 60, then you need to turn off safe chunks and use a synchroniser because of the half-empty zarr chunk.)

@max-sixty
Copy link
Collaborator

@josephnowak I'm so sorry for missing your messages. I'm not sure how I missed them; I was really busy earlier this year and I guess I dismissed my notifications.

I'll look now. But again my apologies, me missing them is both rude and most importantly bad for us making progress on the project

@max-sixty
Copy link
Collaborator

can I ask a quick question to ground myself (again apologies this is coming so late)

when do we really need chunks that don't align between dask & zarr?

I'm familiar with having multiple zarr chunks per dask chunk, but is there a time they need to be unaligned?

@josephnowak
Copy link
Contributor Author

josephnowak commented Mar 28, 2025

Hi @max-sixty , don't worry, I understand that sometimes the amount of work becomes overwhelming, and that comes with a lot of notifications.

Related to your question of the unaligned chunks, I think the only scenario when they "need" to be unaligned is when the user does not want to modify their dask chunks (I suppose because of the increase on the number of tasks on the Dask side) and prefers to use a synchronizer, in that scenario multiple dask chunks can be writing on the same Zarr chunk without corrupting the data, but the important point of this case is that it implies that the only use case when they need to be unaligned is when you need to use the synchronizer, so I do not see a practical use case for having the safe_chunks parameter, I think the best would be to make mandatory the use of the Synchronizer if you want to keep your Dask chunks unaligned (Option B that I proposed).

@josephnowak
Copy link
Contributor Author

An option to specify an "offset" when chunking a DataArray would be useful. What I mean is: da.chunk(time=60, offset={"time": 30}) would lead to chunk sizes 30, 60, 60, 60, ... .

This would allow matching the first dask chunk to fill the last zarr chunk when appending data along an axis. (E.g. in this case, suppose the zarr chunk sizes are 60, but the last chunk written is only half full. If you chunk the dask array to 60, then you need to turn off safe chunks and use a synchroniser because of the half-empty zarr chunk.)

Hi, I think that Dask (and Xarray) allows specifying the chunks using a tuple, something like this da.chunk(time=(30, 60, ...)) would be enough to satisfy your requirement I think, but the idea of this issue is to avoid that the user deals with the chunks alignment between Dask and Zarr, instead Xarray would handle the alignment for you, simplifying the process and reducing the amount of errors reported on this topic.

@josephnowak
Copy link
Contributor Author

josephnowak commented Mar 28, 2025

This issue could affect the to_zarr interface/usability, it would probably also affect projects like icechunk that replicate the Xarray interface, @dcherian would probably also be interested in this

@brendan-m-murphy
Copy link

@josephnowak thanks! I knew dask could accept tuples as chunk sizes, but the xarray docs currently say that if you pass a dict to the chunks parameter of DataArray.chunk, then it must map Hashable to ints or pandas freq. strings. Anyway, passing a dict mapping dims to tuples works as you suggested (I should have just tried it).

If xarray could align the chunks automatically, that would be great. I asked my question because it didn't seem like I could even align them manually with .chunk, and that seemed like a place to start.

I can open an issue to update the docs.

@max-sixty
Copy link
Collaborator

I think the only scenario when they "need" to be unaligned is when the user does not want to modify their dask chunks (I suppose because of the increase on the number of tasks on the Dask side)

I frequently need multiple dask chunks per zarr chunk, to reduce the number of dask chunks & tasks

but that's different in my mind from them being unaligned; i.e. 100 zarr chunks -> 10 dask chunks, each of size 10 is still aligned. is that right?


if so, I would support moving towards forcing alignment given the issues and code complications we're supporting. we could start by adding a deprecation warning and then eventually transition to always align

but often in these sorts of issues I find that there's a use case I'm missing!

@josephnowak
Copy link
Contributor Author

josephnowak commented Mar 28, 2025

Even if there is a relation of 100 Zarr chunks and 10 Dask chunks, that does not imply that they will be aligned because the last chunk of Zarr may be a partial one, but I think this is a very good case to illustrate the performance issue that I'm referring when an alignment process is carried out and probably a justification to allow the user to use unaligned chunks (of course always with the synchronizer to avoid data corruption).

Imagine the following scenario:

1D Zarr Array:
Chunk shape: 3
Size: 5
Total number of chunks: 2

1D Dask Array:
Chunk shape: 6
Size: 12
Total number of chunks: 2

There can be multiple strategies to align the chunks but, all of them should guarantee that the chunks of Dask are never increased in size (unless they are smaller than the Zarr chunk size), and because of this restriction, we will end up with more chunks (this is a possible configuration of the chunks to illustrate my point (4,3,5)), which translates into an increase in the number of tasks on Dask and for some users it can be a problem.

Related to the point of adding the deprecation, I agree with you, I think that if we go for the path of forcing the alignment, it would be ideal to add a message of the deprecation, what I'm not sure if what plan would be good for the transition (probably Xarray already have an standard for this), but I have the following plans in mind in case that there is no a specific standard:

Plan A (less disruptive):

  1. Add a parameter that indicates whether you want or not the new behaviour (Option B proposed on this issue)
  2. Add the deprecation warning on the docs for the safe chunks parameter.
  3. Add to the docs the new behaviour/feature of the synchronizer (Option B) that will be available after the deprecation of the safe_chunks parameter.

Plan B (more disruptive):

  1. Force by default the alignment if and only if the safe_chunks is equal to True, and logs a warning when the chunks are not aligned (with the safe_chunk being equal to True, of course).
  2. Add a deprecation warning for the safe_chunks parameter.
  3. Change the behaviour of the synchronizer, and indicate on the docs that if it is used, then the automatic alignment process is not going to be executed even if the safe_chunks=True is enabled.

@max-sixty
Copy link
Collaborator

1D Zarr Array: Chunk shape: 3 Size: 8 Total number of chunks: 3

1D Dask Array: Chunk shape: 6 Size: 12 Total number of chunks: 2

but in this case the arrays are different sizes?

@josephnowak
Copy link
Contributor Author

josephnowak commented Mar 29, 2025

1D Zarr Array: Chunk shape: 3 Size: 8 Total number of chunks: 3
1D Dask Array: Chunk shape: 6 Size: 12 Total number of chunks: 2

but in this case the arrays are different sizes?

Yes sorry, I have edited the example, the size should have been 5 instead of 8, to make both arrays equal in the number of chunks

@max-sixty
Copy link
Collaborator

Yes sorry, I have edited the example, the size should have been 5 instead of 8, to make both arrays equal in the number of chunks

sorry if I'm being slow to understand! to confirm — this is not the same array and we're evaluating whether the are these are two arrays which we're doing some binary operation, such as adding them. and then here we're evaluating whether the chunks would align?

I had thought the issue was when dask represented a zarr array — so not the result of a calculation

or is the issue that when we do a calculation, get a result, and then want to write that to zarr, we'd then have a potentially unaligned array?

@brendan-m-murphy
Copy link

Perhaps another example with the same effect is writing the dask array with two chunks of length 6 to the region slice(1, 13) of a zarr array with 5 chunks of length 3:

zarr:     |* * *|* * *|* * *|* * *|* * *|
dask:       |- - - - - -|- - - - - -|
aligned:    |- -|- - - - - -|- - - -|

A possible interpretation of @josephnowak's example is (?)

zarr:     |* * *|* *  |     |     |     |     |
dask:               |- - - - - -|- - - - - -|
aligned:            |-|- - - - - -|- - - - -|

At least with the zarr v2 code, it looks like the zarr array computes the relevant region for each incoming chunk, so aligning in xarray might just be duplicating that effort. For zarr v3, it looks like the async array does this (I think... it looks like each write is an async task).

So, as far as I can tell, the only penalty for not aligning chunks and using a synchronizer is that the chunk manager needs to acquire the lock for a chunk before writing to it (the lock is requested when the chunk manager calls __setitem__). Maybe @dcherian can correct me?

Either way, I found needing a synchronizer and setting safe_chunks=False a little confusing when I first started using zarr with xarray, so a better default option would be nice.

@josephnowak
Copy link
Contributor Author

@max-sixty I apologize if my previous explanation wasn’t clear.

The main objective of this issue is that Xarray automatically modifies the chunks of a Dask array to guarantee that two or more chunks of Dask are not written on the same chunk of an existing Zarr array (I call this alignment of chunks). Another way to understand it is that instead of throwing an error, as the "safe_chunks=True" does, it would align the chunks automatically (and probably print a warning).
My previous examples were intended to highlight the cons of the alignment process in terms of the increase in the number of Dask tasks in some specific scenarios, and why that would justify that the synchronizer disables the automatic alignment on Option B or why it would justify the use of a separate parameter to control the alignment like in option A.

@brendan-m-murphy, thanks for the illustration of the chunks. that is exactly what I wanted to explain with my previous message. While Zarr can manage these writes correctly on its own, issues arise with Dask, which writes in parallel. This parallelism can lead to a situation where two Dask chunks write to the same Zarr chunk, resulting in corrupted data or unexpected results.

Related to the behaviour of the synchronizer and the "safe_chunks=False", I also agree that it is confusing, and I'm not sure in what scenarios it can be useful, probably in one where multiple unsynchronized Dask clusters write to the same Zarr Array, and they do not share the same synchronizer so you would have just to guaranty that they write on different regions.

@brendan-m-murphy
Copy link

brendan-m-murphy commented Apr 1, 2025

@josephnowak within one call to to_zarr I think zarr's thread synchronizer handles parallel writes from Dask without problem, at least with the default "threaded" Dask scheduler.

But you're right that separate processes that are not writing to the same zarr Array object can result in conflicts. I haven't used Dask distributed with xarray in a case where it would cause conflicts, but I could see that it might.

@max-sixty
Copy link
Collaborator

ok, I think I'm partially following!

I'm still trying to confirm the cases when it's necessary for dask & zarr to have unaligned chunks

using the definition of aligned:

The main objective of this issue is that Xarray automatically modifies the chunks of a Dask array to guarantee that two or more chunks of Dask are not written on the same chunk of an existing Zarr array (I call this alignment of chunks).

so, is this case a good one (thanks @brendan-m-murphy )?

zarr: |* * *|* * | | | | |
dask: |- - - - - -|- - - - - -|
aligned: |-|- - - - - -|- - - - -|

...since the slice is midway through the array, no dask chunksize can always land on a zarr chunk boundary. is that right?

if so, how would forcing alignment work in this case?

@josephnowak
Copy link
Contributor Author

josephnowak commented Apr 5, 2025

Sorry it took me a while to reply. I think I did not understand your question. The forcing alignment option would give you, as a result, the "aligned" chunks of the examples, while if you don't force the alignment, you would get the "dask" chunks, which would corrupt the data unless the "safe_chunks=True" is enabled.

If your question is related to the algorithm to rechunk the Dask array to align it with the ones of Zarr, it is possible to just rechunk the Dask array with the exact chunks of the region of the Zarr Array (naive solution) or it is possible to apply a kind of optimization where the objective is to avoid modifying the Dask chunks as much as possible while satisfying the alignment restriction.

A greedy strategy should be good enough, here is a possible not very well tested solution, but I think it produce less tasks in many scenarios than the naive solution (Probably this is a very common/famous problem I just have to search for it with more time to see how to get the real optimum, probably there is some kind of DP solution):

# v0.2
# This algorithm can be applied to every dimension of the Array
# Case 1
zarr_chunks = [1, 3, 3, 3, 2]
dask_chunks = [6, 6]
# Case 2
# zarr_chunks = [6, 6, 1]
# dask_chunks = [6, 7]

# Case 3
# zarr_chunks = [6, 6, 1]
# dask_chunks = [6, 6, 1]

# Case 4
# zarr_chunks = [1, 3, 3, 3, 2]
# dask_chunks = [1, 3, 2, 6]

# Case 5
# zarr_chunks = [1, 3, 3, 3, 2]
# dask_chunks = [2, 2, 2, 6]

# Case 6
# zarr_chunks = [1, 3, 3, 3, 2]
# dask_chunks = [3, 1, 3, 5]

# Size of the chunk of Zarr (the one that is inside the encoding),
# for now let's take it as the maximum of the Zarr chunks
fixed_chunk = max(zarr_chunks)

# Validate that they have at least the same number of elements
assert sum(zarr_chunks) == sum(dask_chunks)

# The ideal size of the chunks is the maximum of the two; this would avoid
# that we use more memory than expected
max_chunk = max(max(zarr_chunks), max(dask_chunks))

# The algorithm assumes that the chunks on this array, except the last one
# are already aligned, and for simplicity of the algorithm,m let's try
# to modify the Zarr region in such a way that there are no partial chunks.
# To achieve it, we can add artificial data to the borders of the region
# to complete them
aligned_chunks = [fixed_chunk - zarr_chunks[-1]]
dask_chunks[0] += fixed_chunk - zarr_chunks[0]

print({
    "aligned_chunks": aligned_chunks,
    "max_chunk": max_chunk,
    "dask_chunks": dask_chunks
})

for i, dask_chunk in enumerate(dask_chunks[::-1]):
    new_chunk = dask_chunk + aligned_chunks[-1]
    print({
        "aligned_chunks": aligned_chunks,
        "new_chunk": new_chunk,
        "dask_chunk": dask_chunk
    })

    if new_chunk <= fixed_chunk:
        print("On first if")
        # This is an unavoidable scenario where the Dask chunks needs to be merged
        # to be bigger or equal to the Zarr chunk size
        aligned_chunks[-1] = new_chunk
    elif new_chunk > max_chunk:
        print("On second if")
        # This is a non-ideal scenario because we are exceeding the maximum chunk
        # We need to split the dask chunk

        # The chunk is split into two parts, the first part is the max_chunk
        # which is going to be reduced in size based on the remainder with the
        # fixed_chunks
        aligned_chunks[-1] = max_chunk - (max_chunk % fixed_chunk)

        # The second part is the missing element from the new_chunk
        aligned_chunks.append(new_chunk - aligned_chunks[-1])
    elif not aligned_chunks[-1] % fixed_chunk and not dask_chunk % fixed_chunk:
        print("On third if")
        # This is a perfect scenario where the chunks are aligned,
        # which means that we can preserve the original Dask chunk and not split it
        aligned_chunks.append(dask_chunk)

    elif new_chunk + fixed_chunk - new_chunk % fixed_chunk > max_chunk:
        print("On four if")
        # This is another non-ideal scenario where we have to split the chunk
        # because if not, we will end up with a chunk that is bigger than the max_chunk
        # on the next iteration
        aligned_chunks[-1] = new_chunk - new_chunk % fixed_chunk
        aligned_chunks.append(new_chunk - aligned_chunks[-1])
    else:
        print("On else")
        # There is no other alternative that merge the chunks
        aligned_chunks[-1] = new_chunk

# Revert the aligned chunks because we iterated in the reverse order
aligned_chunks = aligned_chunks[::-1]
print(f"Result of the loop {aligned_chunks}")
# Let's remove the additional artificial size
aligned_chunks[-1] -= fixed_chunk - zarr_chunks[-1]
aligned_chunks[0] -= fixed_chunk - zarr_chunks[0]
dask_chunks[0] -= fixed_chunk - zarr_chunks[0]

# The artificial data added to the border can introduce inefficient chunks
# in some scenarios, for that reason, we will check if we should merge the borders
# with their previous/next chunk
# Example:
# zarr_chunks = [6, 6, 1]
# dask_chunks = [6, 7]
# The ideal output should preserve the same dask_chunks, but the previous loop is going to produce
# aligned_chunks = [6, 6, 1] because we added artificial data to simplify the algorithm
for is_right_border in [False, True]:
    if is_right_border:
        aligned_chunks = aligned_chunks[::-1]
    if (
        len(aligned_chunks) >= 2 and
        aligned_chunks[0] + aligned_chunks[1] <= max_chunk and
        aligned_chunks[0] != dask_chunks[0]
    ):
        aligned_chunks[1] += aligned_chunks[0]
        aligned_chunks = aligned_chunks[1:]
    if is_right_border:
        aligned_chunks = aligned_chunks[::-1]

# Output should be [1, 6, 5], which matches with Brendan's illustration
print(aligned_chunks)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants