diff --git a/velox/common/base/tests/StatsReporterTest.cpp b/velox/common/base/tests/StatsReporterTest.cpp index 974efd271e1..daddca74b25 100644 --- a/velox/common/base/tests/StatsReporterTest.cpp +++ b/velox/common/base/tests/StatsReporterTest.cpp @@ -344,6 +344,10 @@ class TestMemoryPool : public memory::MemoryPool { return nullptr; } + memory::MemoryArbitrator* arbitrator() const override { + return nullptr; + } + void enterArbitration() override {} void leaveArbitration() noexcept override {} diff --git a/velox/common/memory/CMakeLists.txt b/velox/common/memory/CMakeLists.txt index d6353c696cc..cf0cce89d32 100644 --- a/velox/common/memory/CMakeLists.txt +++ b/velox/common/memory/CMakeLists.txt @@ -22,6 +22,7 @@ velox_add_library( ArbitrationOperation.cpp ArbitrationParticipant.cpp ByteStream.cpp + CustomMemoryResource.cpp HashStringAllocator.cpp MallocAllocator.cpp Memory.cpp @@ -40,6 +41,7 @@ velox_add_library( ArbitrationParticipant.h ByteStream.h CompactDoubleList.h + CustomMemoryResource.h HashStringAllocator.h MallocAllocator.h Memory.h @@ -69,3 +71,12 @@ velox_link_libraries( glog::glog PRIVATE velox_test_util re2::re2 ) + +velox_add_library( + velox_custom_memory_resource_registry + CustomMemoryResourceRegistry.cpp + HEADERS + CustomMemoryResourceRegistry.h +) + +velox_link_libraries(velox_custom_memory_resource_registry velox_memory velox_scoped_registry) diff --git a/velox/common/memory/CustomMemoryResource.cpp b/velox/common/memory/CustomMemoryResource.cpp new file mode 100644 index 00000000000..0341903469c --- /dev/null +++ b/velox/common/memory/CustomMemoryResource.cpp @@ -0,0 +1,52 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/memory/CustomMemoryResource.h" + +#include + +#include "velox/common/base/Exceptions.h" +#include "velox/common/memory/MemoryArbitrator.h" + +namespace facebook::velox::memory { + +CustomMemoryResource::CustomMemoryResource( + std::string tag, + std::shared_ptr allocator, + std::shared_ptr arbitrator, + ReclaimerFactory reclaimerFactory, + int64_t maxCapacity) + : tag_(std::move(tag)), + maxCapacity_(maxCapacity), + allocator_(std::move(allocator)), + arbitrator_(std::move(arbitrator)), + reclaimerFactory_(std::move(reclaimerFactory)) { + VELOX_USER_CHECK(!tag_.empty(), "CustomMemoryResource tag is empty"); + VELOX_USER_CHECK_NOT_NULL( + allocator_, "CustomMemoryResource allocator is null for tag: {}", tag_); + VELOX_USER_CHECK_NOT_NULL( + arbitrator_, "CustomMemoryResource arbitrator is null for tag: {}", tag_); + VELOX_USER_CHECK( + reclaimerFactory_ != nullptr, + "CustomMemoryResource reclaimerFactory is null for tag: {}", + tag_); +} + +std::unique_ptr CustomMemoryResource::newReclaimer() const { + return reclaimerFactory_(); +} + +} // namespace facebook::velox::memory diff --git a/velox/common/memory/CustomMemoryResource.h b/velox/common/memory/CustomMemoryResource.h new file mode 100644 index 00000000000..c4053e43183 --- /dev/null +++ b/velox/common/memory/CustomMemoryResource.h @@ -0,0 +1,80 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +namespace facebook::velox::memory { + +class MemoryAllocator; +class MemoryArbitrator; +class MemoryReclaimer; + +/// Describes an externally-provided memory resource (e.g. a GPU or tiered +/// memory backend) registered with the memory subsystem and referenced by +/// 'tag' when building per-query memory pools. The constructor enforces +/// non-empty tag and non-null allocator, arbitrator, and reclaimerFactory; +/// once constructed, the resource is immutable. +class CustomMemoryResource { + public: + using ReclaimerFactory = std::function()>; + + CustomMemoryResource( + std::string tag, + std::shared_ptr allocator, + std::shared_ptr arbitrator, + ReclaimerFactory reclaimerFactory, + int64_t maxCapacity = std::numeric_limits::max()); + + /// Unique identifier for this resource. + const std::string& tag() const { + return tag_; + } + + /// Capacity of the per-query root pool created from this resource. + int64_t maxCapacity() const { + return maxCapacity_; + } + + /// Allocator backing pools tagged with this resource. + MemoryAllocator* allocator() const { + return allocator_.get(); + } + + /// Arbitrator routing capacity decisions for pools tagged with this + /// resource. + MemoryArbitrator* arbitrator() const { + return arbitrator_.get(); + } + + /// Returns a fresh reclaimer for a new pool by invoking the factory + /// supplied at construction. + std::unique_ptr newReclaimer() const; + + private: + const std::string tag_; + const int64_t maxCapacity_; + const std::shared_ptr allocator_; + const std::shared_ptr arbitrator_; + const ReclaimerFactory reclaimerFactory_; +}; + +} // namespace facebook::velox::memory diff --git a/velox/common/memory/CustomMemoryResourceRegistry.cpp b/velox/common/memory/CustomMemoryResourceRegistry.cpp new file mode 100644 index 00000000000..b327eeb6f58 --- /dev/null +++ b/velox/common/memory/CustomMemoryResourceRegistry.cpp @@ -0,0 +1,33 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/memory/CustomMemoryResourceRegistry.h" + +namespace facebook::velox::memory { + +// static +CustomMemoryResourceRegistry::Registry& CustomMemoryResourceRegistry::global() { + static Registry instance; + return instance; +} + +// static +std::shared_ptr +CustomMemoryResourceRegistry::createRegistry(Registry* parent) { + return std::make_shared(parent); +} + +} // namespace facebook::velox::memory diff --git a/velox/common/memory/CustomMemoryResourceRegistry.h b/velox/common/memory/CustomMemoryResourceRegistry.h new file mode 100644 index 00000000000..210ec8908c2 --- /dev/null +++ b/velox/common/memory/CustomMemoryResourceRegistry.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "velox/common/ScopedRegistry.h" +#include "velox/common/memory/CustomMemoryResource.h" + +namespace facebook::velox::memory { + +/// Key under which a per-QueryCtx scoped CustomMemoryResourceRegistry is +/// stored on QueryCtx via QueryCtx::setRegistry / QueryCtx::registry. Tasks +/// use this key to look up resources when building the custom memory pool +/// hierarchy. +inline constexpr std::string_view kCustomMemoryResourceRegistryKey{ + "customMemoryResource"}; + +/// Entry point for the CustomMemoryResource registry. Provides the +/// process-global root and a factory for scoped registries. Callers +/// register, look up, and clear entries directly on the Registry instance +/// (Registry::insert, Registry::find, Registry::clear). Registry methods +/// are thread-safe. +class CustomMemoryResourceRegistry { + public: + using Registry = ScopedRegistry; + + /// Process-global root registry. The reference is stable for the + /// lifetime of the process. + static Registry& global(); + + /// Creates a scoped registry. Defaults to inheriting from the global + /// registry; pass nullptr for isolation mode (no fallback). + static std::shared_ptr createRegistry(Registry* parent = &global()); +}; + +} // namespace facebook::velox::memory diff --git a/velox/common/memory/Memory.cpp b/velox/common/memory/Memory.cpp index 45a185cae7e..21951a575ea 100644 --- a/velox/common/memory/Memory.cpp +++ b/velox/common/memory/Memory.cpp @@ -274,7 +274,7 @@ std::shared_ptr MemoryManager::createRootPool( std::move(reclaimer), options); VELOX_CHECK_EQ(pool->capacity(), 0); - arbitrator_->addPool(pool); + pool->arbitrator()->addPool(pool); RECORD_HISTOGRAM_METRIC_VALUE( kMetricMemoryPoolInitialCapacityBytes, pool->capacity()); return pool; @@ -285,6 +285,36 @@ std::shared_ptr MemoryManager::addRootPool( int64_t maxCapacity, std::unique_ptr reclaimer, const std::optional& poolDebugOpts) { + return addRootPoolImpl( + name, + maxCapacity, + std::move(reclaimer), + poolDebugOpts, + /*customAllocator=*/nullptr, + /*customArbitrator=*/nullptr); +} + +std::shared_ptr MemoryManager::addCustomRootPool( + const std::string& name, + std::shared_ptr resource, + const std::optional& poolDebugOpts) { + VELOX_USER_CHECK_NOT_NULL(resource); + return addRootPoolImpl( + name, + resource->maxCapacity(), + resource->newReclaimer(), + poolDebugOpts, + resource->allocator(), + resource->arbitrator()); +} + +std::shared_ptr MemoryManager::addRootPoolImpl( + const std::string& name, + int64_t maxCapacity, + std::unique_ptr reclaimer, + const std::optional& poolDebugOpts, + MemoryAllocator* customAllocator, + MemoryArbitrator* customArbitrator) { std::string poolName = name; if (poolName.empty()) { static std::atomic poolId{0}; @@ -298,6 +328,8 @@ std::shared_ptr MemoryManager::addRootPool( options.coreOnAllocationFailureEnabled = coreOnAllocationFailureEnabled_; options.getPreferredSize = getPreferredSize_; options.debugOptions = poolDebugOpts; + options.customAllocator = customAllocator; + options.customArbitrator = customArbitrator; auto pool = createRootPool(poolName, reclaimer, options); if (!disableMemoryPoolTracking_) { @@ -308,7 +340,7 @@ std::shared_ptr MemoryManager::addRootPool( } pools_.emplace(poolName, pool); } catch (const VeloxRuntimeError&) { - arbitrator_->removePool(pool.get()); + pool->arbitrator()->removePool(pool.get()); throw; } } @@ -340,7 +372,7 @@ uint64_t MemoryManager::shrinkPools( void MemoryManager::dropPool(MemoryPool* pool) { VELOX_CHECK_NOT_NULL(pool); VELOX_DCHECK_EQ(pool->reservedBytes(), 0); - arbitrator_->removePool(pool); + pool->arbitrator()->removePool(pool); if (disableMemoryPoolTracking_) { return; } diff --git a/velox/common/memory/Memory.h b/velox/common/memory/Memory.h index c321f204611..a973fb6fa87 100644 --- a/velox/common/memory/Memory.h +++ b/velox/common/memory/Memory.h @@ -37,6 +37,7 @@ #include "velox/common/base/CheckedArithmetic.h" #include "velox/common/base/SuccinctPrinter.h" #include "velox/common/memory/Allocation.h" +#include "velox/common/memory/CustomMemoryResource.h" #include "velox/common/memory/MemoryAllocator.h" #include "velox/common/memory/MemoryPool.h" @@ -223,6 +224,18 @@ class MemoryManager { const std::optional& poolDebugOpts = std::nullopt); + /// Creates a root memory pool backed by 'resource'. The pool's capacity + /// comes from 'resource->maxCapacity'; its reclaimer comes from + /// 'resource->reclaimerFactory()'; its allocator and arbitrator are + /// borrowed from 'resource->allocator' and 'resource->arbitrator'. The + /// caller (typically via CustomMemoryResourceRegistry) is responsible + /// for keeping 'resource' alive while the pool exists. + std::shared_ptr addCustomRootPool( + const std::string& name, + std::shared_ptr resource, + const std::optional& poolDebugOpts = + std::nullopt); + /// Creates a leaf memory pool for direct memory allocation use with specified /// 'name'. If 'name' is missing, the memory manager generates a default name /// internally to ensure uniqueness. The leaf memory pool is created as the @@ -303,6 +316,16 @@ class MemoryManager { std::unique_ptr& reclaimer, MemoryPool::Options& options); + // 'customAllocator' and 'customArbitrator' are borrowed pointers; if both + // are null, the manager's default tier is used. + std::shared_ptr addRootPoolImpl( + const std::string& name, + int64_t maxCapacity, + std::unique_ptr reclaimer, + const std::optional& poolDebugOpts, + MemoryAllocator* customAllocator, + MemoryArbitrator* customArbitrator); + void dropPool(MemoryPool* pool); // Returns the shared references to all the alive memory pools in 'pools_'. diff --git a/velox/common/memory/MemoryPool.cpp b/velox/common/memory/MemoryPool.cpp index 8ce43b2bd34..df9461992e2 100644 --- a/velox/common/memory/MemoryPool.cpp +++ b/velox/common/memory/MemoryPool.cpp @@ -457,8 +457,12 @@ MemoryPoolImpl::MemoryPoolImpl( const Options& options) : MemoryPool{name, kind, parent, options}, manager_{memoryManager}, - allocator_{manager_->allocator()}, - arbitrator_{manager_->arbitrator()}, + allocator_{ + options.customAllocator != nullptr ? options.customAllocator + : manager_->allocator()}, + arbitrator_{ + options.customArbitrator != nullptr ? options.customArbitrator + : manager_->arbitrator()}, reclaimer_(std::move(reclaimer)), // The memory manager sets the capacity through grow() according to the // actually used memory arbitration policy. @@ -903,7 +907,9 @@ std::shared_ptr MemoryPoolImpl::genChild( .threadSafe = threadSafe, .coreOnAllocationFailureEnabled = coreOnAllocationFailureEnabled_, .getPreferredSize = getPreferredSize, - .debugOptions = debugOptions_}); + .debugOptions = debugOptions_, + .customAllocator = allocator_, + .customArbitrator = arbitrator_}); } bool MemoryPoolImpl::maybeReserve(uint64_t increment) { diff --git a/velox/common/memory/MemoryPool.h b/velox/common/memory/MemoryPool.h index 4fb4216ca5b..9715dda847b 100644 --- a/velox/common/memory/MemoryPool.h +++ b/velox/common/memory/MemoryPool.h @@ -164,6 +164,17 @@ class MemoryPool : public std::enable_shared_from_this { /// If non-empty, enables debug mode for the created memory pool. std::optional debugOptions{std::nullopt}; + + /// Allocator override. Non-null routes allocations through this allocator + /// instead of the MemoryManager's default. Borrowed; the owner (typically + /// a CustomMemoryResource held by the MemoryManager) must outlive the + /// pool. + MemoryAllocator* customAllocator{nullptr}; + + /// Arbitrator override. Non-null routes capacity decisions through this + /// arbitrator instead of the MemoryManager's default. Same ownership + /// contract as 'customAllocator'. + MemoryArbitrator* customArbitrator{nullptr}; }; /// Constructs a named memory pool with specified 'name', 'parent' and 'kind'. @@ -415,6 +426,10 @@ class MemoryPool : public std::enable_shared_from_this { /// Returns the memory reclaimer of this memory pool if not null. virtual MemoryReclaimer* reclaimer() const = 0; + /// Returns the arbitrator that governs this pool's capacity. May be the + /// MemoryManager's default arbitrator or a custom resource's arbitrator. + virtual MemoryArbitrator* arbitrator() const = 0; + /// Function estimates the number of reclaimable bytes and returns in /// 'reclaimableBytes'. If the 'reclaimer' is not set, the function returns /// std::nullopt. Otherwise, it will invoke the corresponding method of the @@ -720,6 +735,10 @@ class MemoryPoolImpl : public MemoryPool { MemoryReclaimer* reclaimer() const override; + MemoryArbitrator* arbitrator() const override { + return arbitrator_; + } + std::optional reclaimableBytes() const override; uint64_t reclaim( diff --git a/velox/common/memory/tests/CMakeLists.txt b/velox/common/memory/tests/CMakeLists.txt index 77ce4489f9d..d797e39e5ed 100644 --- a/velox/common/memory/tests/CMakeLists.txt +++ b/velox/common/memory/tests/CMakeLists.txt @@ -20,6 +20,10 @@ add_executable( ArbitrationParticipantTest.cpp ByteStreamTest.cpp CompactDoubleListTest.cpp + CustomMemoryEmulationTest.cpp + CustomMemoryHierarchyTest.cpp + CustomMemoryPoolTest.cpp + CustomMemoryRegistrationTest.cpp HashStringAllocatorTest.cpp MemoryAllocatorTest.cpp MemoryArbitratorTest.cpp @@ -38,6 +42,7 @@ target_link_libraries( PRIVATE velox_caching velox_common_base + velox_custom_memory_resource_registry velox_exception velox_exec velox_exec_test_lib diff --git a/velox/common/memory/tests/CustomMemoryEmulationTest.cpp b/velox/common/memory/tests/CustomMemoryEmulationTest.cpp new file mode 100644 index 00000000000..139ee05b0d1 --- /dev/null +++ b/velox/common/memory/tests/CustomMemoryEmulationTest.cpp @@ -0,0 +1,329 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "velox/common/memory/CustomMemoryResource.h" +#include "velox/common/memory/CustomMemoryResourceRegistry.h" +#include "velox/common/memory/MallocAllocator.h" +#include "velox/common/memory/Memory.h" +#include "velox/common/memory/MemoryArbitrator.h" +#include "velox/core/QueryCtx.h" + +namespace facebook::velox::memory::test { +namespace { + +struct EmulatedRow { + int64_t key; + int64_t sum; +}; + +class EmulatedCxlHashAggregation; + +// Reclaimer installed on the operator's DRAM leaf row pool. Forwards +// reclaim() to the operator's DRAM -> CXL spill method. +class DramReclaimer : public MemoryReclaimer { + public: + explicit DramReclaimer(EmulatedCxlHashAggregation* op) + : MemoryReclaimer(0), op_(op) {} + + uint64_t reclaim( + MemoryPool* pool, + uint64_t targetBytes, + uint64_t maxWaitMs, + Stats& stats) override; + + private: + EmulatedCxlHashAggregation* const op_; +}; + +// Mini hash-aggregation operator emulating the CXL-aware HashAggregation +// described in spilling.rst (`Reclaim Across Memory Resources`). Allocates +// row bodies through MemoryPool to keep used-byte counters honest, +// partitions rows by key, and exposes the DRAM -> CXL spill method that +// the DRAM-leaf reclaimer invokes. +class EmulatedCxlHashAggregation { + public: + static constexpr int kNumPartitions = 8; + + explicit EmulatedCxlHashAggregation(core::QueryCtx* ctx) + : ctx_(ctx), + dramRowPool_(ctx->pool()->addLeafChild("emulated-dram-rows")), + cxlRootPool_(ctx->customPool("cxl")), + cxlRowPool_(cxlRootPool_->addLeafChild("emulated-cxl-rows")) { + VELOX_CHECK_NOT_NULL(cxlRootPool_); + // MemoryPool::setReclaimer requires the parent to have a reclaimer + // first. The default root pool typically has none, so install a noop + // one before attaching the DRAM-leaf reclaimer. + if (ctx_->pool()->reclaimer() == nullptr) { + ctx_->pool()->setReclaimer(MemoryReclaimer::create(0)); + } + dramRowPool_->setReclaimer(std::make_unique(this)); + } + + ~EmulatedCxlHashAggregation() { + // Free every row still resident in DRAM or CXL before the pool + // shared_ptrs go out of scope; otherwise the pool destructors abort + // on outstanding usage. + for (auto& [_, entry] : hashTable_) { + auto* pool = entry.location == Location::kDram ? dramRowPool_.get() + : cxlRowPool_.get(); + pool->free(entry.row, sizeof(EmulatedRow)); + } + hashTable_.clear(); + } + + EmulatedCxlHashAggregation(const EmulatedCxlHashAggregation&) = delete; + EmulatedCxlHashAggregation& operator=(const EmulatedCxlHashAggregation&) = + delete; + + void addInput(int64_t key, int64_t value) { + auto it = hashTable_.find(key); + if (it != hashTable_.end()) { + it->second.row->sum += value; + return; + } + auto* row = + static_cast(dramRowPool_->allocate(sizeof(EmulatedRow))); + row->key = key; + row->sum = value; + hashTable_.emplace(key, Entry{row, Location::kDram}); + } + + // Reads every row still resident in the hash table (DRAM and CXL both + // CPU-addressable in this emulation) to produce the final per-key + // aggregate. + std::unordered_map finalize() const { + std::unordered_map result; + for (const auto& [_, entry] : hashTable_) { + result[entry.row->key] += entry.row->sum; + } + return result; + } + + // Returns the DRAM leaf pool — has the operator's DramReclaimer + // installed, so the test can trigger DRAM -> CXL spill via + // dramPool()->reclaim(). + MemoryPool* dramPool() const { + return dramRowPool_.get(); + } + + size_t hashTableSize() const { + return hashTable_.size(); + } + + size_t dramRowCount() const { + return countByLocation(Location::kDram); + } + + size_t cxlRowCount() const { + return countByLocation(Location::kCxl); + } + + // Moves the partition with the most DRAM-resident rows into CXL. The + // hash-table bucket for each row is swizzled to its new CXL address; + // the entry is not removed. + uint64_t spillTopPartitionToCxl(uint64_t /*targetBytes*/) { + const int partition = pickPartition(Location::kDram); + if (partition < 0) { + return 0; + } + uint64_t freed = 0; + for (auto& [key, entry] : hashTable_) { + if (entry.location != Location::kDram || partitionOf(key) != partition) { + continue; + } + auto* cxlRow = + static_cast(cxlRowPool_->allocate(sizeof(EmulatedRow))); + *cxlRow = *entry.row; + dramRowPool_->free(entry.row, sizeof(EmulatedRow)); + entry.row = cxlRow; + entry.location = Location::kCxl; + freed += sizeof(EmulatedRow); + } + return freed; + } + + private: + enum class Location { kDram, kCxl }; + struct Entry { + EmulatedRow* row; + Location location; + }; + + static int partitionOf(int64_t key) { + return static_cast(static_cast(key) % kNumPartitions); + } + + size_t countByLocation(Location target) const { + size_t count = 0; + for (const auto& [_, entry] : hashTable_) { + if (entry.location == target) { + ++count; + } + } + return count; + } + + // Picks the partition with the most rows of the given location, or + // -1 if no rows match. + int pickPartition(Location target) const { + std::array counts{}; + for (const auto& [key, entry] : hashTable_) { + if (entry.location == target) { + ++counts[partitionOf(key)]; + } + } + int best = -1; + int bestCount = 0; + for (int i = 0; i < kNumPartitions; ++i) { + if (counts[i] > bestCount) { + best = i; + bestCount = counts[i]; + } + } + return best; + } + + core::QueryCtx* const ctx_; + std::shared_ptr dramRowPool_; + // Root of the CXL custom resource. Reserved for child-pool creation; + // allocations go through 'cxlRowPool_' below. + std::shared_ptr cxlRootPool_; + // Leaf child of 'cxlRootPool_'. Row bodies are allocated and freed + // here. + std::shared_ptr cxlRowPool_; + std::unordered_map hashTable_; +}; + +uint64_t DramReclaimer::reclaim( + MemoryPool* /*pool*/, + uint64_t targetBytes, + uint64_t /*maxWaitMs*/, + Stats& /*stats*/) { + return op_->spillTopPartitionToCxl(targetBytes); +} + +std::shared_ptr makeCxlResource() { + MemoryAllocator::Options allocatorOptions; + allocatorOptions.capacity = 1L << 30; + return std::make_shared( + "cxl", + std::make_shared(allocatorOptions), + MemoryArbitrator::create({}), + []() { return MemoryReclaimer::create(0); }); +} + +// Materializes the CXL custom pool through the registered resource and +// attaches it to a fresh QueryCtx keyed by 'tag'. +std::shared_ptr buildQueryCtxWithCxl( + std::shared_ptr registry, + std::shared_ptr resource, + const std::string& queryId) { + auto* manager = memoryManager(); + registry->insert("cxl", resource); + auto registered = registry->find("cxl"); + VELOX_CHECK_NOT_NULL(registered); + auto pool = + manager->addCustomRootPool(fmt::format("{}.cxl", queryId), registered); + return core::QueryCtx::Builder() + .customPool("cxl", std::move(pool)) + .queryId(queryId) + .build(); +} + +} // namespace + +// End-to-end coverage for the CXL-backed HashAggregation flow documented in +// spilling.rst (`Reclaim Across Memory Resources` -> `Example: CXL-Backed +// Hash Aggregation`), restricted to the DRAM -> CXL hop. The CXL custom +// memory resource is backed by MallocAllocator so the test runs on +// hardware without real CXL devices. +class CustomMemoryEmulationTest : public testing::Test { + protected: + void SetUp() override { + MemoryManager::testingSetInstance(MemoryManager::Options{}); + registry_ = CustomMemoryResourceRegistry::createRegistry(nullptr); + } + + std::shared_ptr registry_; +}; + +TEST_F(CustomMemoryEmulationTest, baselineAggregationWithoutSpill) { + auto queryCtx = + buildQueryCtxWithCxl(registry_, makeCxlResource(), "cxl-baseline"); + EmulatedCxlHashAggregation op(queryCtx.get()); + + std::unordered_map expected; + for (int i = 0; i < 64; ++i) { + const int64_t key = i % 16; + const int64_t value = i; + op.addInput(key, value); + expected[key] += value; + } + + EXPECT_EQ(op.hashTableSize(), expected.size()); + EXPECT_EQ(op.dramRowCount(), expected.size()); + EXPECT_EQ(op.cxlRowCount(), 0); + EXPECT_EQ(op.finalize(), expected); +} + +TEST_F(CustomMemoryEmulationTest, dramToCxlPreservesIntegrity) { + auto queryCtx = + buildQueryCtxWithCxl(registry_, makeCxlResource(), "cxl-chain"); + EmulatedCxlHashAggregation op(queryCtx.get()); + + std::unordered_map expected; + for (int i = 0; i < 64; ++i) { + const int64_t key = i % 16; + const int64_t value = i; + op.addInput(key, value); + expected[key] += value; + } + const size_t totalKeys = expected.size(); + ASSERT_EQ(op.dramRowCount(), totalKeys); + ASSERT_EQ(op.cxlRowCount(), 0); + + MemoryReclaimer::Stats stats; + + // DRAM -> CXL. The operator's DramReclaimer moves the top partition's + // rows into the CXL pool and swizzles the hash-table bucket pointers to + // the new addresses. The entries themselves remain; probe and finalize + // logic continue to read them directly. + const uint64_t freed1 = op.dramPool()->reclaim( + /*targetBytes=*/64, /*maxWaitMs=*/0, stats); + EXPECT_GT(freed1, 0); + const size_t cxlAfterFirst = op.cxlRowCount(); + EXPECT_GT(cxlAfterFirst, 0); + EXPECT_LT(op.dramRowCount(), totalKeys); + EXPECT_EQ(op.hashTableSize(), totalKeys) + << "DRAM -> CXL swizzles bucket pointers; size must not change."; + + // Second reclaim moves another partition to CXL. + const uint64_t freed2 = op.dramPool()->reclaim( + /*targetBytes=*/64, /*maxWaitMs=*/0, stats); + EXPECT_GT(freed2, 0); + EXPECT_GT(op.cxlRowCount(), cxlAfterFirst); + EXPECT_EQ(op.hashTableSize(), totalKeys); + + // After two DRAM -> CXL hops, the CXL-resident rows are still directly + // readable: finalize() returns the same per-key sums it would in the + // baseline run. + EXPECT_EQ(op.finalize(), expected); +} + +} // namespace facebook::velox::memory::test diff --git a/velox/common/memory/tests/CustomMemoryHierarchyTest.cpp b/velox/common/memory/tests/CustomMemoryHierarchyTest.cpp new file mode 100644 index 00000000000..db725da1e9a --- /dev/null +++ b/velox/common/memory/tests/CustomMemoryHierarchyTest.cpp @@ -0,0 +1,258 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "velox/common/memory/CustomMemoryResource.h" +#include "velox/common/memory/CustomMemoryResourceRegistry.h" +#include "velox/common/memory/MallocAllocator.h" +#include "velox/common/memory/Memory.h" +#include "velox/common/memory/MemoryArbitrator.h" +#include "velox/core/QueryCtx.h" +#include "velox/exec/Driver.h" +#include "velox/exec/OperatorType.h" +#include "velox/exec/Task.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/vector/BaseVector.h" + +namespace facebook::velox::memory::test { +namespace { + +class CustomMemoryHierarchyTest : public testing::Test { + protected: + static void SetUpTestSuite() { + MemoryManager::testingSetInstance(MemoryManager::Options{}); + } + + void SetUp() override { + vectorPool_ = memoryManager()->addLeafPool("test-vec"); + } + + void TearDown() override { + for (const auto& task : tasks_) { + task->requestCancel(); + } + tasks_.clear(); + } + + std::shared_ptr makeResource( + const std::string& tag, + int64_t capacity = 1L << 30) { + MemoryAllocator::Options options; + options.capacity = capacity; + return std::make_shared( + tag, + std::make_shared(options), + MemoryArbitrator::create({}), + []() { return MemoryReclaimer::create(0); }, + capacity); + } + + // Builds a QueryCtx with a custom root pool per tag, installs an isolated + // per-query CustomMemoryResourceRegistry on the QueryCtx so tests do not + // contend on the global registry, and inserts each backing resource into + // it so Task can resolve it at construction time. + std::shared_ptr buildQueryCtx( + const std::vector& tags, + const std::string& queryId) { + auto* manager = memoryManager(); + auto builder = core::QueryCtx::Builder().queryId(queryId); + std::vector> resources; + for (const auto& tag : tags) { + auto resource = makeResource(tag); + builder.customPool( + tag, + manager->addCustomRootPool( + fmt::format("{}.{}", queryId, tag), resource)); + resources.push_back(resource); + } + auto queryCtx = builder.build(); + auto registry = CustomMemoryResourceRegistry::createRegistry(nullptr); + queryCtx->setRegistry( + kCustomMemoryResourceRegistryKey, registry); + for (size_t i = 0; i < tags.size(); ++i) { + registry->insert(tags[i], resources[i]); + } + return queryCtx; + } + + // Returns a single-row Values plan. Vector data is allocated from + // 'vectorPool_', which outlives every task built off this plan because + // it is a fixture member destroyed after the test body. + core::PlanFragment makePlan() { + auto rowType = ROW({"a"}, {BIGINT()}); + auto rowVector = + BaseVector::create(rowType, /*size=*/1, vectorPool_.get()); + return exec::test::PlanBuilder().values({rowVector}).planFragment(); + } + + std::shared_ptr makeTask( + const std::string& taskId, + const std::shared_ptr& queryCtx) { + auto task = exec::Task::create( + taskId, + makePlan(), + /*destination=*/0, + queryCtx, + exec::Task::ExecutionMode::kSerial, + exec::Consumer{}); + // Track so TearDown can cancel it and release the Task<->Driver cycle. + tasks_.push_back(task); + return task; + } + + // Returns the first child of 'pool' whose name matches 'name', or nullptr. + static MemoryPool* findChild(MemoryPool* pool, const std::string& name) { + MemoryPool* found = nullptr; + pool->visitChildren([&](MemoryPool* child) { + if (child->name() == name) { + found = child; + return false; + } + return true; + }); + return found; + } + + std::shared_ptr vectorPool_; + + // Tasks created via makeTask, cancelled in TearDown to break the + // Task<->Driver cycle and release their pools. + std::vector> tasks_; +}; + +// Task construction creates 'task..' aggregate under each +// registered custom root. +TEST_F(CustomMemoryHierarchyTest, taskCreationMirrorsTaskPool) { + auto queryCtx = buildQueryCtx({"gpu"}, "q1"); + auto task = makeTask("t1", queryCtx); + + auto gpuRoot = queryCtx->customPool("gpu"); + ASSERT_NE(gpuRoot, nullptr); + auto* taskMirror = findChild(gpuRoot.get(), "task.t1.gpu"); + ASSERT_NE(taskMirror, nullptr); + EXPECT_EQ(taskMirror->kind(), MemoryPool::Kind::kAggregate); +} + +// Multiple tags produce independent mirror subtrees that do not bleed +// into one another. +TEST_F(CustomMemoryHierarchyTest, multipleTagsMirrorIndependently) { + auto queryCtx = buildQueryCtx({"gpu", "cxl"}, "q2"); + auto task = makeTask("t2", queryCtx); + + EXPECT_NE( + findChild(queryCtx->customPool("gpu").get(), "task.t2.gpu"), nullptr); + EXPECT_NE( + findChild(queryCtx->customPool("cxl").get(), "task.t2.cxl"), nullptr); + EXPECT_EQ( + findChild(queryCtx->customPool("gpu").get(), "task.t2.cxl"), nullptr); + EXPECT_EQ( + findChild(queryCtx->customPool("cxl").get(), "task.t2.gpu"), nullptr); +} + +// With no custom pools registered, Task creation runs the default path +// only — no exceptions and the default subtree is unaffected. +TEST_F(CustomMemoryHierarchyTest, noCustomPoolsRegisteredIsHarmless) { + auto queryCtx = buildQueryCtx({}, "q3"); + ASSERT_EQ(queryCtx->customPools().size(), 0); + EXPECT_NO_THROW(makeTask("t3", queryCtx)); +} + +// getOrAddCustomNodePool creates the aggregate under the task mirror +// and is idempotent for repeated calls. +TEST_F(CustomMemoryHierarchyTest, getOrAddCustomNodePoolIsIdempotent) { + auto queryCtx = buildQueryCtx({"gpu"}, "q4"); + auto task = makeTask("t4", queryCtx); + + auto* nodePool = task->getOrAddCustomNodePool("gpu", "n0"); + ASSERT_NE(nodePool, nullptr); + EXPECT_EQ(nodePool->name(), "node.n0.gpu"); + EXPECT_EQ(nodePool->kind(), MemoryPool::Kind::kAggregate); + EXPECT_EQ(nodePool->parent()->name(), "task.t4.gpu"); + + EXPECT_EQ(task->getOrAddCustomNodePool("gpu", "n0"), nodePool); +} + +// addCustomOperatorPool returns a fresh leaf parented to the node mirror +// for non-join operator types. +TEST_F(CustomMemoryHierarchyTest, addCustomOperatorPoolReturnsLeaf) { + auto queryCtx = buildQueryCtx({"gpu"}, "q5"); + auto task = makeTask("t5", queryCtx); + + auto* leaf = task->addCustomOperatorPool( + "gpu", + "n0", + exec::kUngroupedGroupId, + /*pipelineId=*/0, + /*driverId=*/0, + "Project"); + ASSERT_NE(leaf, nullptr); + EXPECT_EQ(leaf->name(), "op.n0.0.0.Project.gpu"); + EXPECT_EQ(leaf->kind(), MemoryPool::Kind::kLeaf); + EXPECT_EQ(leaf->parent()->name(), "node.n0.gpu"); +} + +// HashBuild / HashProbe operator types route through the join-keyed +// node pool, mirroring the default getOrAddJoinNodePool path. +TEST_F(CustomMemoryHierarchyTest, hashJoinOperatorUsesJoinNodeKey) { + auto queryCtx = buildQueryCtx({"gpu"}, "q6"); + auto task = makeTask("t6", queryCtx); + + auto* leaf = task->addCustomOperatorPool( + "gpu", + "n0", + /*splitGroupId=*/7, + /*pipelineId=*/0, + /*driverId=*/0, + std::string(exec::OperatorType::kHashBuild)); + ASSERT_NE(leaf, nullptr); + EXPECT_EQ(leaf->parent()->name(), "node.n0[7].gpu"); +} + +// customNodePool returns the cached pool after creation, and nullptr for +// unknown tags or node ids. +TEST_F(CustomMemoryHierarchyTest, customNodePoolAccessor) { + auto queryCtx = buildQueryCtx({"gpu"}, "q7"); + auto task = makeTask("t7", queryCtx); + + EXPECT_EQ(task->customNodePool("gpu", "n0"), nullptr); + auto* nodePool = task->getOrAddCustomNodePool("gpu", "n0"); + EXPECT_EQ(task->customNodePool("gpu", "n0"), nodePool); + EXPECT_EQ(task->customNodePool("missing-tag", "n0"), nullptr); + EXPECT_EQ(task->customNodePool("gpu", "missing-node"), nullptr); +} + +// Looking up a tag with no registered resource throws clearly during +// task creation. An isolated empty per-query registry is installed so the +// lookup never falls back to the process-global registry (which other +// tests in this process may have populated). +TEST_F(CustomMemoryHierarchyTest, taskCreationFailsWhenResourceMissing) { + auto* manager = memoryManager(); + auto resource = makeResource("gpu"); + auto pool = manager->addCustomRootPool("q-missing.gpu", resource); + auto queryCtx = core::QueryCtx::Builder() + .customPool("gpu", std::move(pool)) + .queryId("q-missing") + .build(); + queryCtx->setRegistry( + kCustomMemoryResourceRegistryKey, + CustomMemoryResourceRegistry::createRegistry(nullptr)); + EXPECT_THROW(makeTask("t-missing", queryCtx), VeloxRuntimeError); +} + +} // namespace +} // namespace facebook::velox::memory::test diff --git a/velox/common/memory/tests/CustomMemoryPoolTest.cpp b/velox/common/memory/tests/CustomMemoryPoolTest.cpp new file mode 100644 index 00000000000..8627033898f --- /dev/null +++ b/velox/common/memory/tests/CustomMemoryPoolTest.cpp @@ -0,0 +1,267 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "velox/common/memory/CustomMemoryResource.h" +#include "velox/common/memory/CustomMemoryResourceRegistry.h" +#include "velox/common/memory/MallocAllocator.h" +#include "velox/common/memory/Memory.h" +#include "velox/common/memory/MemoryArbitrator.h" +#include "velox/core/QueryCtx.h" + +namespace facebook::velox::memory::test { +namespace { + +std::shared_ptr makeResource( + const std::string& tag, + int64_t maxCapacity = 1L << 30) { + MemoryAllocator::Options allocatorOptions; + allocatorOptions.capacity = maxCapacity; + return std::make_shared( + tag, + std::make_shared(allocatorOptions), + MemoryArbitrator::create({}), + []() { return MemoryReclaimer::create(0); }, + maxCapacity); +} + +} // namespace + +// Each test gets a fresh MemoryManager and its own isolated registry so +// concurrent tests cannot collide on tag registration in the global scope. +class CustomMemoryPoolTest : public testing::Test { + protected: + void SetUp() override { + MemoryManager::testingSetInstance(MemoryManager::Options{}); + registry_ = CustomMemoryResourceRegistry::createRegistry(nullptr); + } + + std::shared_ptr registry_; +}; + +TEST_F(CustomMemoryPoolTest, customPoolCreation) { + auto* manager = memoryManager(); + registry_->insert("gpu", makeResource("gpu", /*maxCapacity=*/1L << 28)); + auto resource = registry_->find("gpu"); + ASSERT_NE(resource, nullptr); + + auto pool = manager->addCustomRootPool("query.q0.gpu", resource); + auto queryCtx = + core::QueryCtx::Builder().customPool("gpu", pool).queryId("q0").build(); + + auto looked = queryCtx->customPool("gpu"); + ASSERT_NE(looked, nullptr); + EXPECT_EQ(looked.get(), pool.get()); + EXPECT_EQ(looked->name(), "query.q0.gpu"); + EXPECT_EQ(looked->maxCapacity(), 1L << 28); + + EXPECT_EQ(queryCtx->customPool("missing"), nullptr); + EXPECT_EQ(queryCtx->customPools().size(), 1); +} + +TEST_F(CustomMemoryPoolTest, customPoolsKeyedByTag) { + auto* manager = memoryManager(); + for (const auto* tag : {"a", "b", "c"}) { + registry_->insert(tag, makeResource(tag)); + } + + auto builder = core::QueryCtx::Builder().queryId("q-keyed"); + for (const auto* tag : {"a", "b", "c"}) { + auto resource = registry_->find(tag); + ASSERT_NE(resource, nullptr); + builder.customPool( + tag, + manager->addCustomRootPool(fmt::format("q-keyed.{}", tag), resource)); + } + auto queryCtx = builder.build(); + + ASSERT_EQ(queryCtx->customPools().size(), 3); + EXPECT_NE(queryCtx->customPool("a"), nullptr); + EXPECT_NE(queryCtx->customPool("b"), nullptr); + EXPECT_NE(queryCtx->customPool("c"), nullptr); +} + +// A custom-resource root pool and its children must allocate through the +// resource's allocator, not the MemoryManager default. +TEST_F(CustomMemoryPoolTest, customPoolDispatchesToResourceAllocator) { + auto* manager = memoryManager(); + registry_->insert("gpu", makeResource("gpu")); + auto resource = registry_->find("gpu"); + ASSERT_NE(resource, nullptr); + auto* expectedAllocator = resource->allocator(); + + auto pool = manager->addCustomRootPool("q-dispatch.gpu", resource); + auto queryCtx = core::QueryCtx::Builder() + .customPool("gpu", pool) + .queryId("q-dispatch") + .build(); + auto root = queryCtx->customPool("gpu"); + ASSERT_NE(root, nullptr); + EXPECT_EQ( + static_cast(root.get())->testingAllocator(), + expectedAllocator); + + auto aggregate = root->addAggregateChild("agg"); + auto leaf = aggregate->addLeafChild("leaf"); + EXPECT_EQ( + static_cast(aggregate.get())->testingAllocator(), + expectedAllocator); + EXPECT_EQ( + static_cast(leaf.get())->testingAllocator(), + expectedAllocator); +} + +TEST_F(CustomMemoryPoolTest, builderRejectsBadInputs) { + auto* manager = memoryManager(); + registry_->insert("gpu", makeResource("gpu")); + auto resource = registry_->find("gpu"); + ASSERT_NE(resource, nullptr); + auto pool = manager->addCustomRootPool("q-bad.gpu", resource); + + auto builder = core::QueryCtx::Builder().queryId("q-bad"); + builder.customPool("gpu", pool); + EXPECT_THROW(builder.customPool("gpu", pool), VeloxRuntimeError); + EXPECT_THROW(builder.customPool("other", nullptr), VeloxRuntimeError); + EXPECT_THROW(builder.customPool("", pool), VeloxRuntimeError); +} + +// QueryCtx::addCustomPool is part of the public surface so a query can +// attach a custom pool after the Builder has already produced the QueryCtx. +// Verifies the same validation as Builder::customPool applies. +TEST_F(CustomMemoryPoolTest, addCustomPoolDirectly) { + auto* manager = memoryManager(); + registry_->insert("gpu", makeResource("gpu")); + auto resource = registry_->find("gpu"); + ASSERT_NE(resource, nullptr); + + auto queryCtx = core::QueryCtx::Builder().queryId("q-direct").build(); + ASSERT_EQ(queryCtx->customPools().size(), 0); + + auto pool = manager->addCustomRootPool("q-direct.gpu", resource); + queryCtx->addCustomPool("gpu", pool); + EXPECT_EQ(queryCtx->customPool("gpu").get(), pool.get()); + EXPECT_EQ(queryCtx->customPools().size(), 1); + + EXPECT_THROW(queryCtx->addCustomPool("gpu", pool), VeloxRuntimeError); + EXPECT_THROW(queryCtx->addCustomPool("other", nullptr), VeloxRuntimeError); + EXPECT_THROW(queryCtx->addCustomPool("", pool), VeloxRuntimeError); +} + +TEST_F(CustomMemoryPoolTest, addCustomRootPoolRejectsNullResource) { + auto* manager = memoryManager(); + EXPECT_THROW(manager->addCustomRootPool("q.null", nullptr), VeloxUserError); +} + +namespace { + +// Test reclaimer that "spills" by allocating from a sibling resource's pool. +// Models the chained-reclaimer pattern (e.g. device-memory pool spills into a +// pinned-host pool). The sibling pool is captured via shared_ptr. +class SpillToSiblingReclaimer : public MemoryReclaimer { + public: + static std::unique_ptr create( + std::shared_ptr sibling) { + return std::unique_ptr( + new SpillToSiblingReclaimer(std::move(sibling))); + } + + ~SpillToSiblingReclaimer() override { + // Release any buffers acquired during reclaim before the leaf pools + // they came from get destroyed; otherwise the leaf destructor aborts + // on outstanding usage. + for (auto& spill : spills_) { + spill.leaf->free(spill.buffer, static_cast(spill.bytes)); + } + } + + uint64_t reclaim( + MemoryPool* /*pool*/, + uint64_t targetBytes, + uint64_t /*maxWaitMs*/, + Stats& /*stats*/) override { + ++numReclaimCalls_; + auto leaf = + sibling_->addLeafChild(fmt::format("spill-{}", numReclaimCalls_)); + void* buffer = leaf->allocate(static_cast(targetBytes)); + spills_.push_back({std::move(leaf), buffer, targetBytes}); + return targetBytes; + } + + int numReclaimCalls() const { + return numReclaimCalls_; + } + + private: + explicit SpillToSiblingReclaimer(std::shared_ptr sibling) + : MemoryReclaimer(0), sibling_(std::move(sibling)) {} + + struct Spill { + std::shared_ptr leaf; + void* buffer; + uint64_t bytes; + }; + + std::shared_ptr sibling_; + int numReclaimCalls_{0}; + std::vector spills_; +}; + +} // namespace + +// The device resource's reclaimerFactory closes over the previously-built +// host pool, so reclaim on the device pool routes allocations into the host. +TEST_F(CustomMemoryPoolTest, deviceReclaimerSpillsToHostSibling) { + auto* manager = memoryManager(); + + registry_->insert("host", makeResource("host")); + auto hostResource = registry_->find("host"); + ASSERT_NE(hostResource, nullptr); + auto hostPool = manager->addCustomRootPool("q-spill.host", hostResource); + + MemoryAllocator::Options deviceAllocatorOptions; + deviceAllocatorOptions.capacity = 1L << 30; + auto deviceResource = std::make_shared( + "device", + std::make_shared(deviceAllocatorOptions), + MemoryArbitrator::create({}), + [hostPool]() { return SpillToSiblingReclaimer::create(hostPool); }); + + auto devicePool = + manager->addCustomRootPool("q-spill.device", deviceResource); + auto* mockReclaimer = + static_cast(devicePool->reclaimer()); + ASSERT_NE(mockReclaimer, nullptr); + + auto queryCtx = core::QueryCtx::Builder() + .customPool("host", hostPool) + .customPool("device", devicePool) + .queryId("q-spill") + .build(); + ASSERT_EQ(queryCtx->customPool("host").get(), hostPool.get()); + ASSERT_EQ(queryCtx->customPool("device").get(), devicePool.get()); + + const uint64_t target = 4 * 1024; + MemoryReclaimer::Stats stats; + const auto reclaimed = devicePool->reclaim(target, 0, stats); + + EXPECT_EQ(mockReclaimer->numReclaimCalls(), 1); + EXPECT_EQ(reclaimed, target); + EXPECT_GE(hostPool->usedBytes(), static_cast(target)); +} + +} // namespace facebook::velox::memory::test diff --git a/velox/common/memory/tests/CustomMemoryRegistrationTest.cpp b/velox/common/memory/tests/CustomMemoryRegistrationTest.cpp new file mode 100644 index 00000000000..640b2ead355 --- /dev/null +++ b/velox/common/memory/tests/CustomMemoryRegistrationTest.cpp @@ -0,0 +1,138 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/memory/CustomMemoryResource.h" +#include "velox/common/memory/CustomMemoryResourceRegistry.h" +#include "velox/common/memory/MallocAllocator.h" +#include "velox/common/memory/Memory.h" +#include "velox/common/memory/MemoryArbitrator.h" + +namespace facebook::velox::memory::test { +namespace { + +std::shared_ptr makeAllocator() { + MemoryAllocator::Options options; + options.capacity = 1L << 30; + return std::make_shared(options); +} + +CustomMemoryResource::ReclaimerFactory makeReclaimerFactory() { + return []() { return MemoryReclaimer::create(0); }; +} + +std::shared_ptr makeResource(const std::string& tag) { + return std::make_shared( + tag, + makeAllocator(), + MemoryArbitrator::create({}), + makeReclaimerFactory()); +} + +} // namespace + +TEST(CustomMemoryRegistration, constructorRejectsInvalidFields) { + auto allocator = makeAllocator(); + std::shared_ptr arbitrator = MemoryArbitrator::create({}); + auto factory = makeReclaimerFactory(); + + // Valid construction succeeds. + CustomMemoryResource("ok", allocator, arbitrator, factory); + + VELOX_ASSERT_THROW( + CustomMemoryResource("", allocator, arbitrator, factory), + "CustomMemoryResource tag is empty"); + VELOX_ASSERT_THROW( + CustomMemoryResource("tag", nullptr, arbitrator, factory), + "CustomMemoryResource allocator is null for tag: tag"); + VELOX_ASSERT_THROW( + CustomMemoryResource("tag", allocator, nullptr, factory), + "CustomMemoryResource arbitrator is null for tag: tag"); + VELOX_ASSERT_THROW( + CustomMemoryResource("tag", allocator, arbitrator, nullptr), + "CustomMemoryResource reclaimerFactory is null for tag: tag"); +} + +TEST(CustomMemoryRegistration, insertAndFindOnIsolatedRegistry) { + auto registry = CustomMemoryResourceRegistry::createRegistry(nullptr); + for (const auto* tag : {"a", "b", "c"}) { + registry->insert(tag, makeResource(tag)); + } + EXPECT_NE(registry->find("a"), nullptr); + EXPECT_NE(registry->find("b"), nullptr); + EXPECT_NE(registry->find("c"), nullptr); + EXPECT_EQ(registry->find("missing"), nullptr); +} + +TEST(CustomMemoryRegistration, insertRejectsDuplicateTag) { + auto registry = CustomMemoryResourceRegistry::createRegistry(nullptr); + registry->insert("device", makeResource("device")); + VELOX_ASSERT_THROW( + registry->insert("device", makeResource("device")), + "Key already registered: device"); +} + +TEST(CustomMemoryRegistration, childRegistryShadowsParent) { + auto parent = CustomMemoryResourceRegistry::createRegistry(nullptr); + auto defaultResource = makeResource("device"); + auto* defaultAllocator = defaultResource->allocator(); + parent->insert("device", std::move(defaultResource)); + + auto child = CustomMemoryResourceRegistry::createRegistry(parent.get()); + auto tenantResource = makeResource("device"); + auto* tenantAllocator = tenantResource->allocator(); + child->insert("device", std::move(tenantResource)); + + EXPECT_EQ(child->find("device")->allocator(), tenantAllocator); + EXPECT_EQ(parent->find("device")->allocator(), defaultAllocator) + << "Parent must remain untouched by scoped registration."; +} + +TEST(CustomMemoryRegistration, childRegistryFallsBackToParent) { + auto parent = CustomMemoryResourceRegistry::createRegistry(nullptr); + parent->insert("only-parent", makeResource("only-parent")); + auto child = CustomMemoryResourceRegistry::createRegistry(parent.get()); + EXPECT_NE(child->find("only-parent"), nullptr); +} + +TEST(CustomMemoryRegistration, isolationRegistryHasNoFallback) { + auto parent = CustomMemoryResourceRegistry::createRegistry(nullptr); + parent->insert("only-parent", makeResource("only-parent")); + + auto isolated = CustomMemoryResourceRegistry::createRegistry(nullptr); + EXPECT_EQ(isolated->find("only-parent"), nullptr) + << "Isolation mode must not fall back to any parent."; + + isolated->insert("only-local", makeResource("only-local")); + EXPECT_NE(isolated->find("only-local"), nullptr); + EXPECT_EQ(parent->find("only-local"), nullptr); +} + +TEST(CustomMemoryRegistration, addCustomRootPoolWithResource) { + MemoryManager manager{}; + auto registry = CustomMemoryResourceRegistry::createRegistry(nullptr); + registry->insert("gpu", makeResource("gpu")); + + auto resource = registry->find("gpu"); + ASSERT_NE(resource, nullptr); + + auto tagged = manager.addCustomRootPool("tagged-root", resource); + ASSERT_NE(tagged, nullptr); +} + +} // namespace facebook::velox::memory::test diff --git a/velox/core/QueryCtx.cpp b/velox/core/QueryCtx.cpp index 6cfcb3950ba..c6362f62d32 100644 --- a/velox/core/QueryCtx.cpp +++ b/velox/core/QueryCtx.cpp @@ -59,6 +59,9 @@ std::shared_ptr QueryCtx::Builder::build() { for (auto& cb : releaseCallbacks_) { queryCtx->addReleaseCallback(std::move(cb)); } + for (auto& [tag, pool] : customPools_) { + queryCtx->addCustomPool(tag, std::move(pool)); + } return queryCtx; } @@ -105,6 +108,21 @@ QueryCtx::~QueryCtx() { return fmt::format("query.{}.{}", queryId, seqNum++); } +void QueryCtx::addCustomPool( + std::string tag, + std::shared_ptr pool) { + VELOX_CHECK(!tag.empty(), "Custom pool tag is empty"); + VELOX_CHECK_NOT_NULL(pool, "Custom pool is null for tag: {}", tag); + auto [_, inserted] = customPools_.emplace(tag, std::move(pool)); + VELOX_CHECK(inserted, "Duplicate custom pool tag: {}", tag); +} + +std::shared_ptr QueryCtx::customPool( + const std::string& tag) const { + auto it = customPools_.find(tag); + return it == customPools_.end() ? nullptr : it->second; +} + void QueryCtx::maybeSetReclaimer() { VELOX_CHECK_NOT_NULL(pool_); VELOX_CHECK(!underArbitration_); diff --git a/velox/core/QueryCtx.h b/velox/core/QueryCtx.h index 749c6d0be3f..12d8f5daa83 100644 --- a/velox/core/QueryCtx.h +++ b/velox/core/QueryCtx.h @@ -180,6 +180,19 @@ class QueryCtx : public std::enable_shared_from_this { return *this; } + /// Registers a caller-built root pool under 'tag' on the resulting + /// QueryCtx. Throws if 'tag' is already present or 'pool' is null. The + /// pool is typically built through MemoryManager::addCustomRootPool. + Builder& customPool( + std::string tag, + std::shared_ptr pool) { + VELOX_CHECK(!tag.empty(), "Custom pool tag is empty"); + VELOX_CHECK_NOT_NULL(pool, "Custom pool is null for tag: {}", tag); + auto [_, inserted] = customPools_.emplace(tag, std::move(pool)); + VELOX_CHECK(inserted, "Duplicate custom pool tag: {}", tag); + return *this; + } + /// Adds a callback to be invoked when the QueryCtx is destroyed. /// Multiple callbacks can be added by calling this method multiple times. Builder& releaseCallback(ReleaseCallback callback) { @@ -209,6 +222,8 @@ class QueryCtx : public std::enable_shared_from_this { std::shared_ptr tokenProvider_; std::deque releaseCallbacks_; TraceCtxProvider traceCtxProvider_; + std::unordered_map> + customPools_; }; /// Generates a unique memory pool name for a query. @@ -379,6 +394,23 @@ class QueryCtx : public std::enable_shared_from_this { pool_ = std::move(pool); } + /// Tracks an additional root pool keyed by 'tag'. The pool's allocator + /// and arbitrator are borrowed from the CustomMemoryResource the caller + /// passed to MemoryManager::addCustomRootPool; the resource's lifetime + /// is governed externally. Throws if 'tag' is already present or 'pool' + /// is null. + void addCustomPool(std::string tag, std::shared_ptr pool); + + /// Returns the custom root pool for the given resource tag, or nullptr if + /// none is registered under that tag for this query. + std::shared_ptr customPool(const std::string& tag) const; + + /// Returns all custom root pools for this query, keyed by resource tag. + const std::unordered_map>& + customPools() const { + return customPools_; + } + /// Indicates if the query is under memory arbitration or not. bool testingUnderArbitration() const { std::lock_guard l(mutex_); @@ -471,6 +503,7 @@ class QueryCtx : public std::enable_shared_from_this { std::unordered_map> connectorSessionProperties_; std::shared_ptr pool_; + QueryConfig queryConfig_; std::atomic numSpilledBytes_{0}; std::atomic numTracedBytes_{0}; @@ -499,6 +532,10 @@ class QueryCtx : public std::enable_shared_from_this { // Per-query registry overrides keyed by subsystem name. folly::Synchronized> registries_; + + // Custom root memory pools keyed by tag. + std::unordered_map> + customPools_; }; // Represents the state of one thread of query execution. diff --git a/velox/docs/develop/memory.rst b/velox/docs/develop/memory.rst index 3675f821816..94437cb90c9 100644 --- a/velox/docs/develop/memory.rst +++ b/velox/docs/develop/memory.rst @@ -869,6 +869,103 @@ we don’t cap the memory allocations delegated to std::malloc in the reserve a small amount of memory capacity in *MmapAllocator* to compensate for these ad-hoc small allocations in practice. +Custom Memory Resources +----------------------- + +*CustomMemoryResource* lets an extension expose memory tiers other than +host DRAM (GPU device memory, CXL-attached memory, pinned host memory, +NUMA-bound pools) side-by-side with the default CPU tier. A resource +bundles a tag, an allocator, an arbitrator, and a factory that builds a +per-query reclaimer. The constructor requires a non-empty tag and +non-null allocator, arbitrator, and reclaimerFactory; the resource is +immutable once constructed: + +.. code-block:: c++ + + class CustomMemoryResource { + public: + CustomMemoryResource( + std::string tag, + std::shared_ptr allocator, + std::shared_ptr arbitrator, + std::function()> reclaimerFactory, + int64_t maxCapacity = std::numeric_limits::max()); + + const std::string& tag() const; + int64_t maxCapacity() const; + MemoryAllocator* allocator() const; + MemoryArbitrator* arbitrator() const; + std::unique_ptr newReclaimer() const; + }; + +Each resource carries its own allocator and arbitrator; the default CPU +allocator and arbitrator on *MemoryManager* are not shared with custom +resources. Per-tier accounting and capacity decisions stay self-contained, +which matters when tiers differ in size, latency, alignment, or allocation +failure modes. + +Registration +^^^^^^^^^^^^ + +Resources are tracked by *CustomMemoryResourceRegistry*. The class exposes +a process-global root via *global()* and creates child scopes via +*createRegistry(parent)*. Registration and lookup go directly through the +*Registry* instance (its *insert*, *find*, and *clear* methods); each +scope is protected by its own lock. Extensions register resources in the +global scope at process startup, after *initializeMemoryManager*: + +.. code-block:: c++ + + auto resource = std::make_shared( + "device", + std::make_shared(...), + MemoryArbitrator::create(...), + []() { return std::make_unique(); }, + deviceCapacity); + memory::CustomMemoryResourceRegistry::global().insert( + resource->tag(), resource); + +For each query that wants to use a registered resource, the caller looks +the resource up by tag, materializes the per-resource root pool through +*MemoryManager::addCustomRootPool*, and hands the pool to the *QueryCtx* +via *Builder::customPool*: + +.. code-block:: c++ + + auto* manager = memory::memoryManager(); + auto resource = + memory::CustomMemoryResourceRegistry::global().find("device"); + VELOX_USER_CHECK_NOT_NULL(resource, "Unknown resource tag: device"); + auto devicePool = manager->addCustomRootPool("query.q0.device", resource); + auto queryCtx = core::QueryCtx::Builder() + .customPool("device", std::move(devicePool)) + .queryId("q0") + .build(); + +*addCustomRootPool* invokes *resource->newReclaimer()* to build the +pool's reclaimer, uses *resource->maxCapacity()* as the pool capacity, and +backs the pool with *resource->allocator()* and *resource->arbitrator()*. +The root pool is exposed on *QueryCtx* keyed by tag: + +.. code-block:: c++ + + // Returns the custom root pool for the given resource tag, or nullptr. + std::shared_ptr QueryCtx::customPool( + const std::string& tag) const; + +Per-Query Pool Hierarchy +^^^^^^^^^^^^^^^^^^^^^^^^ + +For every custom root pool registered on *QueryCtx* via +*Builder::customPool*, *Task* builds a parallel ``task → node → operator`` +aggregate/leaf subtree beneath it that mirrors the default hierarchy. +Aggregate children under a custom root are created at the same moment as +their default counterparts. Reclaimers for these aggregates come +from each resource's ``reclaimerFactory`` via +*CustomMemoryResource::newReclaimer*, so capacity decisions and reclaim +on a custom subtree are governed end-to-end by the resource's own +arbitrator and reclaimer — separate from the default DRAM tier. + Server OOM Prevention --------------------- diff --git a/velox/docs/develop/spilling.rst b/velox/docs/develop/spilling.rst index d6cd4113ba5..5723e8be852 100644 --- a/velox/docs/develop/spilling.rst +++ b/velox/docs/develop/spilling.rst @@ -530,6 +530,126 @@ spilling: bridge will split the spill partition files among the hash build operators with each one having an equally-sized shard to restore. +Reclaim Across Memory Resources +------------------------------- + +The spilling algorithms above describe how a spillable operator releases +memory by serializing in-memory state to disk. With +`custom memory resources `_, the +reclaim path is not restricted to disk. The per-query reclaimer attached +to a custom root pool can move data into a sibling tier — for example, +copying buffers out of a device memory pool into a pinned-host pool, or +evicting cold pages from a CXL-attached pool into a local DRAM pool. +*MemoryPool::reclaim* on a custom pool dispatches to the reclaimer the +*reclaimerFactory* produced; what the reclaimer does is up to the +extension. + +A chain of reclaimers — for instance, hash table on CPU DRAM → CXL on +first reclaim → disk on second reclaim — is built by composing reclaimers +across resources. Each link is an independent reclaim decision made by +the arbitrator of the pool that ran short. The framework does not +coordinate the chain; the extension picks the topology by deciding which +sibling each reclaimer allocates into. A reclaimer can reach a sibling pool +by closure-capturing the sibling at the resource's +*reclaimerFactory*. + +Example: CXL-Backed Hash Aggregation +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The reclaim path above is meaningful when the custom resource exposes a +tier with different access properties from CPU DRAM. CXL-attached memory +is one such tier: it is slower than local DRAM but exposed to the CPU as +part of the coherent virtual address space — host code can dereference a +CXL pointer directly, with no copy back to DRAM required. + +A CXL extension takes advantage of this by registering a CXL custom +resource and installing a CXL-aware *HashAggregation* through +*DriverAdapter*. The adapter is the standard mechanism Velox uses to swap +in alternate operator implementations (also used by the cuDF and Wave +extensions); see *DriverAdapter* in *velox/exec/Driver.h*. The CXL-backed +operator preserves the public *HashAggregation* contract while changing +the reclaim policy: + +#. **First reclaim trigger (DRAM is tight).** Like the default operator, + the CXL-backed *HashAggregation* selects a spill target — the set of + partitions with the most data — and processes the corresponding rows + in its DRAM row container. Unlike the default operator, it copies each + row into the CXL custom pool and pointer-swizzles the corresponding + hash-table bucket entries to the new CXL addresses, rather than clearing + them. The swizzle has the same cost shape as the default operator's + hash-table entry removal — a *ProbeState* walk per row to find each bucket + slot — but with greater benefit: because CXL memory is coherent with the + CPU address space, probe and finalize logic continue to read the relocated + entries directly with no restore step. + +#. **CXL pool fills.** Subsequent reclaim triggers continue moving + partitions from DRAM into CXL until the CXL custom pool's arbitrator + decides it is out of capacity. At that point arbitration on the CXL + pool calls *MemoryPool::reclaim* on the CXL root pool, which dispatches + to the reclaimer the resource's *reclaimerFactory* produced. + +#. **Second reclaim — CXL into disk.** The reclaimer behaves like the + default *HashAggregation* spill path, just sourced from CXL instead of + DRAM. It scans the partitions stored in CXL, serializes them through + the standard *Spiller* onto disk, and notifies the operator to remove + the corresponding entries from the hash table. Removal happens here, + not in step 1, because this is the first point at which the entries + are no longer directly addressable. + +#. **Finalize.** After receiving all input the operator produces results + by merging three sources for each partition: rows still in the DRAM row + container, rows that live in CXL (read directly with no restore), and + rows on disk (read back through the standard *Spiller* restore path). + For partitions that never made it past step 1 the merge degenerates to + "DRAM + CXL"; for partitions reclaimed in step 3 it degenerates to + "DRAM + disk" — the same shape as the default *HashAggregation* + algorithm. + +The CXL pool is registered like any other custom resource on the +*CustomMemoryResourceRegistry*. The resource carries the reclaimer factory +the caller uses to materialize per-query reclaimers for pools tagged +``"cxl"``: + +.. code-block:: c++ + auto cxlResource = std::make_shared( + "cxl", + std::make_shared(...), + MemoryArbitrator::create(...), + []() { return std::make_unique(); }, + cxlCapacityBytes); + memory::CustomMemoryResourceRegistry::global().insert( + cxlResource->tag(), cxlResource); + + // Per query: resolve the resource and materialize the CXL root pool via + // addCustomRootPool, then hand it to the QueryCtx. addCustomRootPool + // invokes the resource's reclaimerFactory and uses the resource's + // maxCapacity, allocator, and arbitrator under the hood. + auto* manager = memory::memoryManager(); + auto resolved = memory::CustomMemoryResourceRegistry::global().find("cxl"); + VELOX_USER_CHECK_NOT_NULL(resolved); + auto cxlPool = manager->addCustomRootPool("query.q0.cxl", resolved); + auto queryCtx = core::QueryCtx::Builder() + .customPool("cxl", std::move(cxlPool)) + .queryId("q0") + .build(); + +The arbitrator passed here governs only the CXL pool's capacity. The +framework never makes the *decision* to move bytes from DRAM to CXL on +its own; the trigger and the reclaim policy come from the extension. +Step 1 can be driven as follows: the operator calls *MemoryPool::setReclaimer* +on its DRAM-side row pool with a custom reclaimer that moves rows to +CXL. The default *MemoryArbitrator* on *MemoryManager* triggers +reclaim normally when the query is under capacity pressure; the +propagation reaches the installed reclaimer, which redirects the move +to CXL instead of disk. See *DramReclaimer* in +*velox/common/memory/tests/CustomMemoryEmulationTest.cpp* for a +worked example. + +The CXL → disk hop is analogous: install a reclaimer on the CXL root +pool through the resource's *reclaimerFactory*, and let the CXL +pool's arbitrator trigger it when the CXL budget is exhausted. This way, +we can achieve cross-resource arbitration. + Future Work ----------- diff --git a/velox/dwio/dwrf/test/WriterFlushTest.cpp b/velox/dwio/dwrf/test/WriterFlushTest.cpp index 4612e5e46b1..3d308cfe3ed 100644 --- a/velox/dwio/dwrf/test/WriterFlushTest.cpp +++ b/velox/dwio/dwrf/test/WriterFlushTest.cpp @@ -198,6 +198,10 @@ class MockMemoryPool : public velox::memory::MemoryPool { return nullptr; } + memory::MemoryArbitrator* arbitrator() const override { + return nullptr; + } + void enterArbitration() override { VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__); } diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 1514f9a8516..92c5ea76e39 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -122,6 +122,25 @@ velox::memory::MemoryPool* DriverCtx::addOperatorPool( planNodeId, splitGroupId, pipelineId, driverId, operatorType); } +std::unordered_map +DriverCtx::addCustomOperatorPools( + const core::PlanNodeId& planNodeId, + const std::string& operatorType) { + std::unordered_map result; + const auto& customRoots = task->queryCtx()->customPools(); + if (customRoots.empty()) { + return result; + } + result.reserve(customRoots.size()); + for (const auto& [tag, _] : customRoots) { + result.emplace( + tag, + task->addCustomOperatorPool( + tag, planNodeId, splitGroupId, pipelineId, driverId, operatorType)); + } + return result; +} + namespace { bool isHashJoinSpillOperator(std::string_view operatorType) { return operatorType == OperatorType::kHashBuild || diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 2407e7240cc..e791d6ee7d7 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -264,6 +265,15 @@ struct DriverCtx { const core::PlanNodeId& planNodeId, const std::string& operatorType); + /// Creates one leaf operator pool per registered custom root pool on the + /// owning task's QueryCtx, mirroring 'addOperatorPool' but under each + /// custom root. Returns a map keyed by resource tag. Empty when no custom + /// pools are registered. + std::unordered_map + addCustomOperatorPools( + const core::PlanNodeId& planNodeId, + const std::string& operatorType); + /// Builds the spill config for the operator with specified 'operatorId' and /// 'operatorType'. std::optional makeSpillConfig( diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 9e6e6a13fd3..aa804287233 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -37,7 +37,14 @@ OperatorCtx::OperatorCtx( planNodeId_(planNodeId), operatorId_(operatorId), operatorType_(operatorType), - pool_(driverCtx_->addOperatorPool(planNodeId, operatorType_)) {} + pool_(driverCtx_->addOperatorPool(planNodeId, operatorType_)), + customPools_( + driverCtx_->addCustomOperatorPools(planNodeId, operatorType_)) {} + +memory::MemoryPool* OperatorCtx::customPool(std::string_view tag) const { + auto it = customPools_.find(std::string(tag)); + return it == customPools_.end() ? nullptr : it->second; +} core::ExecCtx* OperatorCtx::execCtx() const { if (!execCtx_) { diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 30f109b48a8..a93dac3d978 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -17,6 +17,7 @@ #include #include +#include #include "velox/core/PlanNode.h" #include "velox/core/QueryCtx.h" #include "velox/exec/Driver.h" @@ -65,6 +66,10 @@ class OperatorCtx { return pool_; } + /// Returns the leaf operator pool under the custom root registered with + /// 'tag', or nullptr if no such custom root is registered on this query. + velox::memory::MemoryPool* customPool(std::string_view tag) const; + const core::PlanNodeId& planNodeId() const { return planNodeId_; } @@ -102,6 +107,11 @@ class OperatorCtx { const std::string operatorType_; velox::memory::MemoryPool* const pool_; + // Per-resource-tag leaf pools mirroring 'pool_' under each registered + // custom root pool. Empty when no custom pools are registered. + const std::unordered_map + customPools_; + // These members are created on demand. mutable std::unique_ptr execCtx_; }; @@ -372,6 +382,12 @@ class Operator : public BaseRuntimeStatWriter { return operatorCtx_->pool(); } + /// Returns this operator's leaf pool under the custom root registered with + /// 'tag', or nullptr if no such custom root is registered. + velox::memory::MemoryPool* customPool(std::string_view tag) const { + return operatorCtx_->customPool(tag); + } + /// Returns true if the operator is reclaimable. Currently, we only support /// to reclaim memory from a spillable operator. FOLLY_ALWAYS_INLINE virtual bool canReclaim() const { diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 38efc1c4aa3..270c52f9c31 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -23,6 +23,8 @@ #include "velox/common/base/Counters.h" #include "velox/common/base/StatsReporter.h" #include "velox/common/file/FileSystems.h" +#include "velox/common/memory/CustomMemoryResource.h" +#include "velox/common/memory/CustomMemoryResourceRegistry.h" #include "velox/common/testutil/TestValue.h" #include "velox/common/time/Timer.h" #include "velox/exec/Exchange.h" @@ -212,6 +214,18 @@ bool isHashJoinOperator(const std::string& operatorType) { (operatorType == OperatorType::kHashProbe); } +// Returns the CustomMemoryResourceRegistry visible to 'queryCtx': the +// per-query scoped registry if one has been installed under +// 'kCustomMemoryResourceRegistryKey', otherwise the process-global +// registry. Mirrors connector::ConnectorRegistry's per-query lookup. +memory::CustomMemoryResourceRegistry::Registry& customMemoryResourceRegistryFor( + const core::QueryCtx& queryCtx) { + auto perQuery = + queryCtx.registry( + memory::kCustomMemoryResourceRegistryKey); + return perQuery ? *perQuery : memory::CustomMemoryResourceRegistry::global(); +} + class QueueSplitsStore : public SplitsStore { public: using SplitsStore::SplitsStore; @@ -487,6 +501,9 @@ Task::~Task() { CLEAR(exchangeClients_.clear()); CLEAR(exception_ = nullptr); CLEAR(nodePools_.clear()); + CLEAR(customNodePools_.clear()); + CLEAR(customTaskPools_.clear()); + CLEAR(customChildPools_.clear()); CLEAR(childPools_.clear()); CLEAR(planFragment_ = core::PlanFragment()); CLEAR(pool_.reset()); @@ -711,6 +728,25 @@ void Task::initTaskPool() { VELOX_CHECK_NULL(pool_); pool_ = queryCtx_->pool()->addAggregateChild( fmt::format("task.{}", taskId_.c_str()), createTaskReclaimer()); + initCustomTaskPools(); +} + +void Task::initCustomTaskPools() { + VELOX_CHECK_NOT_NULL(pool_); + const auto& customRoots = queryCtx_->customPools(); + if (customRoots.empty()) { + return; + } + auto& registry = customMemoryResourceRegistryFor(*queryCtx_); + for (const auto& [tag, root] : customRoots) { + auto resource = registry.find(tag); + VELOX_CHECK_NOT_NULL( + resource, "No CustomMemoryResource registered for tag: {}", tag); + customChildPools_.push_back(root->addAggregateChild( + fmt::format("task.{}.{}", taskId_.c_str(), tag), + resource->newReclaimer())); + customTaskPools_[tag] = customChildPools_.back().get(); + } } velox::memory::MemoryPool* Task::getOrAddNodePool( @@ -725,6 +761,31 @@ velox::memory::MemoryPool* Task::getOrAddNodePool( }))); auto* nodePool = childPools_.back().get(); nodePools_[planNodeId] = nodePool; + for (const auto& [tag, _] : queryCtx_->customPools()) { + getOrAddCustomNodePool(tag, planNodeId); + } + return nodePool; +} + +memory::MemoryPool* Task::getOrAddCustomNodePool( + const std::string& tag, + const core::PlanNodeId& planNodeId) { + auto& nodeMap = customNodePools_[tag]; + if (auto it = nodeMap.find(planNodeId); it != nodeMap.end()) { + return it->second; + } + auto taskIt = customTaskPools_.find(tag); + VELOX_CHECK( + taskIt != customTaskPools_.end(), + "Custom task pool missing for tag: {}", + tag); + auto resource = customMemoryResourceRegistryFor(*queryCtx_).find(tag); + VELOX_CHECK_NOT_NULL( + resource, "No CustomMemoryResource registered for tag: {}", tag); + customChildPools_.push_back(taskIt->second->addAggregateChild( + fmt::format("node.{}.{}", planNodeId, tag), resource->newReclaimer())); + auto* nodePool = customChildPools_.back().get(); + nodeMap[planNodeId] = nodePool; return nodePool; } @@ -747,9 +808,73 @@ memory::MemoryPool* Task::getOrAddJoinNodePool( }))); auto* nodePool = childPools_.back().get(); nodePools_[nodeId] = nodePool; + for (const auto& [tag, _] : queryCtx_->customPools()) { + getOrAddCustomJoinNodePool(tag, planNodeId, splitGroupId); + } + return nodePool; +} + +memory::MemoryPool* Task::getOrAddCustomJoinNodePool( + const std::string& tag, + const core::PlanNodeId& planNodeId, + uint32_t splitGroupId) { + const std::string nodeId = splitGroupId == kUngroupedGroupId + ? planNodeId + : fmt::format("{}[{}]", planNodeId, splitGroupId); + auto& nodeMap = customNodePools_[tag]; + if (auto it = nodeMap.find(nodeId); it != nodeMap.end()) { + return it->second; + } + auto taskIt = customTaskPools_.find(tag); + VELOX_CHECK( + taskIt != customTaskPools_.end(), + "Custom task pool missing for tag: {}", + tag); + auto resource = customMemoryResourceRegistryFor(*queryCtx_).find(tag); + VELOX_CHECK_NOT_NULL( + resource, "No CustomMemoryResource registered for tag: {}", tag); + customChildPools_.push_back(taskIt->second->addAggregateChild( + fmt::format("node.{}.{}", nodeId, tag), resource->newReclaimer())); + auto* nodePool = customChildPools_.back().get(); + nodeMap[nodeId] = nodePool; return nodePool; } +memory::MemoryPool* Task::addCustomOperatorPool( + const std::string& tag, + const core::PlanNodeId& planNodeId, + uint32_t splitGroupId, + int pipelineId, + uint32_t driverId, + const std::string& operatorType) { + memory::MemoryPool* nodePool; + if (isHashJoinOperator(operatorType)) { + nodePool = getOrAddCustomJoinNodePool(tag, planNodeId, splitGroupId); + } else { + nodePool = getOrAddCustomNodePool(tag, planNodeId); + } + customChildPools_.push_back(nodePool->addLeafChild( + fmt::format( + "op.{}.{}.{}.{}.{}", + planNodeId, + pipelineId, + driverId, + operatorType, + tag))); + return customChildPools_.back().get(); +} + +memory::MemoryPool* Task::customNodePool( + const std::string& tag, + const core::PlanNodeId& planNodeId) const { + auto tagIt = customNodePools_.find(tag); + if (tagIt == customNodePools_.end()) { + return nullptr; + } + auto nodeIt = tagIt->second.find(planNodeId); + return nodeIt == tagIt->second.end() ? nullptr : nodeIt->second; +} + std::unique_ptr Task::createNodeReclaimer( const std::function()>& reclaimerFactory) const { diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 2b483b806a9..2c7ba1c7132 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -407,6 +407,38 @@ class Task : public std::enable_shared_from_this { uint32_t driverId, const std::string& operatorType); + /// Returns the node-level aggregate pool under the custom root for 'tag' + /// and 'planNodeId', creating it if needed. Throws if 'tag' has no + /// registered custom root on this query's QueryCtx. + memory::MemoryPool* getOrAddCustomNodePool( + const std::string& tag, + const core::PlanNodeId& planNodeId); + + /// Hash-join variant of getOrAddCustomNodePool. Mirrors + /// getOrAddJoinNodePool: keys the node pool by '[]' + /// for grouped execution. + memory::MemoryPool* getOrAddCustomJoinNodePool( + const std::string& tag, + const core::PlanNodeId& planNodeId, + uint32_t splitGroupId); + + /// Returns a new leaf operator pool under the custom root for 'tag', + /// mirroring addOperatorPool. Selects the join or non-join node parent + /// based on 'operatorType'. + memory::MemoryPool* addCustomOperatorPool( + const std::string& tag, + const core::PlanNodeId& planNodeId, + uint32_t splitGroupId, + int pipelineId, + uint32_t driverId, + const std::string& operatorType); + + /// Read-only accessor for the cached custom node pool under 'tag' and + /// 'planNodeId', or nullptr if absent. + memory::MemoryPool* customNodePool( + const std::string& tag, + const core::PlanNodeId& planNodeId) const; + /// Creates new instance of MemoryPool with aggregate kind for the connector /// use, stores it in the task to ensure lifetime and returns a raw pointer. /// Not thread safe, e.g. must be called from the Operator's constructor. @@ -891,6 +923,11 @@ class Task : public std::enable_shared_from_this { // Invoked to initialize the memory pool for this task on creation. void initTaskPool(); + // Creates the per-tag 'task..' aggregate child under every + // custom root pool registered on the QueryCtx. Called from initTaskPool + // after the default task pool is created. + void initCustomTaskPools(); + // Creates a scaled scan controller for a given table scan node. void addScaledScanControllerLocked( uint32_t splitGroupId, @@ -1214,6 +1251,23 @@ class Task : public std::enable_shared_from_this { // NOTE: 'childPools_' holds the ownerships of node memory pools. std::unordered_map nodePools_; + // Aggregate child of each registered custom root pool. Keyed by resource + // tag. Lifetime is held in 'customChildPools_'. + std::unordered_map customTaskPools_; + + // Node-level aggregate pools under each custom root, keyed first by + // resource tag, then by plan node id. Lifetime is held in + // 'customChildPools_'. + std::unordered_map< + std::string, + std::unordered_map> + customNodePools_; + + // Owns the shared_ptrs to all custom task/node/operator pools mirrored + // under registered custom roots. Kept separate from 'childPools_' so the + // default hierarchy is unchanged. + std::vector> customChildPools_; + // Set to true by OutputBufferManager when all output is // acknowledged. If this happens before Drivers are at end, the last // Driver to finish will set state_ to kFinished. If Drivers have