Use a concurrent buffer deque in FragmentConsolidation.#5700
Use a concurrent buffer deque in FragmentConsolidation.#5700bekadavis9 wants to merge 18 commits intomainfrom
Conversation
c3a3e3a to
358d1b3
Compare
e70da9b to
47ea0fc
Compare
rroelke
left a comment
There was a problem hiding this comment.
You need to add new tests for some of the edge cases of the new implementation before we can merge this.
77c355d to
2a28600
Compare
d4c4f49 to
0e6611a
Compare
rroelke
left a comment
There was a problem hiding this comment.
Still more to do. Aside from the comments -
Performance
As mentioned in the sync we do need to observe that this is not worse for representative customer data. You've clearly seen that it is much faster for the unit tests, which is excellent; but that may not be reflected in production.
I would like to see some kind of repeatable program we can run so that we can re-use this benchmark later on, to compare the result of this to the new consolidation we will implement later.
Add a way to configure the initial buffer size so that we can play with it a bit to see how it affects performance.
Initial buffer size
We had discussed prototyping with a fixed number like 10M. I didn't catch that in this review. But this probably is not what we want to use anyway. The consolidator knows the average var cell size, and can compute the fixed part size of the cells. This should be used to inform the initial buffer size in some fashion. For example, if 10M only fits 1 average cell, then it is probably not a good choice for buffer size.
Testing
Testing is mostly using the existing tests, and then we'll see the performance testing from above. The new tests intend to force the reader to wait for memory to become available. We need tests (new or existing) which in some way assert that this is actually happening. See the review comments about waiting - I have some doubt about its correctness. If we merge #5725 that will help, but that's not the only thing you can do.
6fa8594 to
9419e0d
Compare
9419e0d to
89ab72d
Compare
| // Allow use of deprecated param `config_.buffer_size_` | ||
| // Allow the buffer to grow 3 times | ||
| uint64_t initial_buffer_size = config_.buffer_size_ != 0 ? | ||
| buffer_budget : |
There was a problem hiding this comment.
I spent some time analyzing this and it's hard to know if it is correct. First you have to recognize that FragmentConsolidationWorkspace also uses this config parameter to override its constructor argument; then you have to follow through whether the memory accounting uses the expected or actual buffer size; and so on. This deprecated parameter makes things messy. Plus, the line immediately following this might mess it up.
Even if it is deprecated we do have to acknowledge that someone out there might be depending on it, so my current line of thinking is that copy_array should probably just do the old thing whenever this parameter is nonzero.
There was a problem hiding this comment.
That's the intent here. Use the old behavior if buffer_size_ is set (non-zero) and the new behavior otherwise.
On main, the initial consolidation workspace is set to size buffer_budget by the fragment consolidator (ref). The workspace itself still checks the config_.buffer_size_ internally, as the behavior of resize_buffers has not changed, only migrated to the constructor.
There was a problem hiding this comment.
I vote for removing the config parameters that were marked as deprecated here 2(!) years ago. Especially if this makes our code simpler, safer and more readable.
Please do it in a separate PR though and rebase this branch once merged.
| uint64_t initial_buffer_size = config_.buffer_size_ != 0 ? | ||
| buffer_budget : | ||
| config_.initial_buffer_size_; | ||
| initial_buffer_size = std::min(initial_buffer_size, buffer_budget / 8); |
There was a problem hiding this comment.
It's probably a good idea to use the new "what level was this config set at?" facility that Agis added here. If initial_buffer_size is the configuration default, then the user did not set it, and we have some freedom to adjust it down, and/or determine a size using properties of the array schema as I had suggested. But if they did specify it in some way then we should use that (just taking the min with the budget). This is useful as an override for performance testing, for example.
| // Deque which stores the buffers passed between the reader and writer. | ||
| // Total size of enqueued buffers may not exceed `max_queue_size`. | ||
| // The reader will enqueue until that limit, so adjust `buffer_size` | ||
| // via `Config::initial_buffer_size` to allow concurrrent in-flight buffers. |
There was a problem hiding this comment.
| // via `Config::initial_buffer_size` to allow concurrrent in-flight buffers. | |
| // via `Config::initial_buffer_size` to allow concurrent in-flight buffers. |
| Query* query_r, | ||
| Query* query_w, | ||
| const ArraySchema& reader_array_schema_latest, | ||
| std::unordered_map<std::string, uint64_t> average_var_cell_sizes, |
There was a problem hiding this comment.
| std::unordered_map<std::string, uint64_t> average_var_cell_sizes, | |
| const std::unordered_map<std::string, uint64_t>& average_var_cell_sizes, |
I think we can avoid copying the entire map by value on every call.
The current implementation of
FragmentConsolidationalways sets up a very large buffer space (~ 10GB) and performs consolidation within that workspace. This work aims to reduce the memory footprint and latency of fragment consolidation through use of aProducerConsumerQueuefor concurrent reads/writes. At present, the buffers are (somewhat arbitrarily) sized at 10MB, and the queue is capped at size 10. As such, there are quantitatively more allocations, but the overall size (and runtime) is drastically reduced, as small operations need not construct / destruct the full workspace.The following test saw an improvement in runtime of
16.500s->0.130s:./test/tiledb_unit --durations yes --vfs=native "C++ API: Test consolidation that respects the current domain"TYPE: IMPROVEMENT
DESC: Use a concurrent buffer deque in
FragmentConsolidation.Resolves CORE-411.