Skip to content

Commit 9851a4e

Browse files
xiaoxmengfacebook-github-bot
authored andcommitted
Avoid the deadlock when failed to create root memory pool (facebookincubator#11042)
Summary: Pull Request resolved: facebookincubator#11042 The recent code change will put root memory pool creation under memory manager's lock which could cause deadlock. This PR fixes the problems when either arbitrator add pool fails or duplicate root name detected. We never expect set destruction callback fails Reviewed By: tanjialiang, oerling Differential Revision: D63039106 fbshipit-source-id: 95faed6f93854a8854a9121cb9aa914ad1ad5350
1 parent 19d184e commit 9851a4e

File tree

6 files changed

+75
-33
lines changed

6 files changed

+75
-33
lines changed

velox/common/memory/Memory.cpp

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options)
146146
MemoryPool::Kind::kAggregate,
147147
nullptr,
148148
nullptr,
149-
nullptr,
150149
// NOTE: the default root memory pool has no capacity limit, and it is
151150
// used for system usage in production such as disk spilling.
152151
MemoryPool::Options{
@@ -247,7 +246,7 @@ uint16_t MemoryManager::alignment() const {
247246
return alignment_;
248247
}
249248

250-
std::shared_ptr<MemoryPool> MemoryManager::createRootPool(
249+
std::shared_ptr<MemoryPoolImpl> MemoryManager::createRootPool(
251250
std::string poolName,
252251
std::unique_ptr<MemoryReclaimer>& reclaimer,
253252
MemoryPool::Options& options) {
@@ -257,7 +256,6 @@ std::shared_ptr<MemoryPool> MemoryManager::createRootPool(
257256
MemoryPool::Kind::kAggregate,
258257
nullptr,
259258
std::move(reclaimer),
260-
poolDestructionCb_,
261259
options);
262260
VELOX_CHECK_EQ(pool->capacity(), 0);
263261
arbitrator_->addPool(pool);
@@ -283,16 +281,23 @@ std::shared_ptr<MemoryPool> MemoryManager::addRootPool(
283281
options.debugEnabled = debugEnabled_;
284282
options.coreOnAllocationFailureEnabled = coreOnAllocationFailureEnabled_;
285283

286-
if (disableMemoryPoolTracking_) {
287-
return createRootPool(poolName, reclaimer, options);
288-
}
289-
290-
std::unique_lock guard{mutex_};
291-
if (pools_.find(poolName) != pools_.end()) {
292-
VELOX_FAIL("Duplicate root pool name found: {}", poolName);
293-
}
294284
auto pool = createRootPool(poolName, reclaimer, options);
295-
pools_.emplace(poolName, pool);
285+
if (!disableMemoryPoolTracking_) {
286+
try {
287+
std::unique_lock guard{mutex_};
288+
if (pools_.find(poolName) != pools_.end()) {
289+
VELOX_FAIL("Duplicate root pool name found: {}", poolName);
290+
}
291+
pools_.emplace(poolName, pool);
292+
} catch (const VeloxRuntimeError& ex) {
293+
arbitrator_->removePool(pool.get());
294+
throw;
295+
}
296+
}
297+
// NOTE: we need to set destruction callback at the end to avoid potential
298+
// deadlock or failure because of duplicate memory pool name or unexpected
299+
// failure to add memory pool to the arbitrator.
300+
pool->setDestructionCallback(poolDestructionCb_);
296301
return pool;
297302
}
298303

@@ -315,12 +320,12 @@ uint64_t MemoryManager::shrinkPools(
315320
}
316321

317322
void MemoryManager::dropPool(MemoryPool* pool) {
323+
VELOX_CHECK_NOT_NULL(pool);
318324
VELOX_DCHECK_EQ(pool->reservedBytes(), 0);
319325
arbitrator_->removePool(pool);
320326
if (disableMemoryPoolTracking_) {
321327
return;
322328
}
323-
VELOX_CHECK_NOT_NULL(pool);
324329
std::unique_lock guard{mutex_};
325330
auto it = pools_.find(pool->name());
326331
if (it == pools_.end()) {

velox/common/memory/Memory.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ class MemoryManager {
352352
}
353353

354354
private:
355-
std::shared_ptr<MemoryPool> createRootPool(
355+
std::shared_ptr<MemoryPoolImpl> createRootPool(
356356
std::string poolName,
357357
std::unique_ptr<MemoryReclaimer>& reclaimer,
358358
MemoryPool::Options& options);

velox/common/memory/MemoryPool.cpp

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -418,23 +418,17 @@ MemoryPoolImpl::MemoryPoolImpl(
418418
Kind kind,
419419
std::shared_ptr<MemoryPool> parent,
420420
std::unique_ptr<MemoryReclaimer> reclaimer,
421-
DestructionCallback destructionCb,
422421
const Options& options)
423422
: MemoryPool{name, kind, parent, options},
424423
manager_{memoryManager},
425424
allocator_{manager_->allocator()},
426425
arbitrator_{manager_->arbitrator()},
427-
destructionCb_(std::move(destructionCb)),
428426
debugPoolNameRegex_(debugEnabled_ ? *(debugPoolNameRegex().rlock()) : ""),
429427
reclaimer_(std::move(reclaimer)),
430428
// The memory manager sets the capacity through grow() according to the
431429
// actually used memory arbitration policy.
432430
capacity_(parent_ != nullptr ? kMaxMemory : 0) {
433431
VELOX_CHECK(options.threadSafe || isLeaf());
434-
VELOX_CHECK(
435-
isRoot() || destructionCb_ == nullptr,
436-
"Only root memory pool allows to set destruction callbacks: {}",
437-
name_);
438432
}
439433

440434
MemoryPoolImpl::~MemoryPoolImpl() {
@@ -736,7 +730,6 @@ std::shared_ptr<MemoryPool> MemoryPoolImpl::genChild(
736730
kind,
737731
parent,
738732
std::move(reclaimer),
739-
nullptr,
740733
Options{
741734
.alignment = alignment_,
742735
.trackUsage = trackUsage_,
@@ -1148,6 +1141,18 @@ void MemoryPoolImpl::checkIfAborted() const {
11481141
}
11491142
}
11501143

1144+
void MemoryPoolImpl::setDestructionCallback(
1145+
const DestructionCallback& callback) {
1146+
VELOX_CHECK_NOT_NULL(callback);
1147+
VELOX_CHECK(
1148+
isRoot(),
1149+
"Only root memory pool allows to set destruction callbacks: {}",
1150+
name_);
1151+
std::lock_guard<std::mutex> l(mutex_);
1152+
VELOX_CHECK_NULL(destructionCb_);
1153+
destructionCb_ = callback;
1154+
}
1155+
11511156
void MemoryPoolImpl::testingSetCapacity(int64_t bytes) {
11521157
if (parent_ != nullptr) {
11531158
return toImpl(parent_)->testingSetCapacity(bytes);

velox/common/memory/MemoryPool.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,6 @@ class MemoryPoolImpl : public MemoryPool {
582582
Kind kind,
583583
std::shared_ptr<MemoryPool> parent,
584584
std::unique_ptr<MemoryReclaimer> reclaimer,
585-
DestructionCallback destructionCb,
586585
const Options& options = Options{});
587586

588587
~MemoryPoolImpl() override;
@@ -656,6 +655,8 @@ class MemoryPoolImpl : public MemoryPool {
656655

657656
void abort(const std::exception_ptr& error) override;
658657

658+
void setDestructionCallback(const DestructionCallback& callback);
659+
659660
std::string toString() const override {
660661
std::lock_guard<std::mutex> l(mutex_);
661662
return toStringLocked();
@@ -1001,7 +1002,6 @@ class MemoryPoolImpl : public MemoryPool {
10011002
MemoryManager* const manager_;
10021003
MemoryAllocator* const allocator_;
10031004
MemoryArbitrator* const arbitrator_;
1004-
const DestructionCallback destructionCb_;
10051005

10061006
// Regex for filtering on 'name_' when debug mode is enabled. This allows us
10071007
// to only track the callsites of memory allocations for memory pools whose
@@ -1015,6 +1015,8 @@ class MemoryPoolImpl : public MemoryPool {
10151015
// the same parent do not have to be serialized.
10161016
mutable std::mutex mutex_;
10171017

1018+
DestructionCallback destructionCb_;
1019+
10181020
// Used by memory arbitration to reclaim memory from the associated query
10191021
// object if not null. For example, a memory pool can reclaim the used memory
10201022
// from a spillable operator through disk spilling. If null, we can't reclaim

velox/common/memory/tests/MemoryManagerTest.cpp

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,13 +117,18 @@ TEST_F(MemoryManagerTest, ctor) {
117117
namespace {
118118
class FakeTestArbitrator : public MemoryArbitrator {
119119
public:
120-
explicit FakeTestArbitrator(const Config& config)
120+
explicit FakeTestArbitrator(
121+
const Config& config,
122+
bool injectAddPoolFailure = false)
121123
: MemoryArbitrator(
122124
{.kind = config.kind,
123125
.capacity = config.capacity,
124-
.extraConfigs = config.extraConfigs}) {}
126+
.extraConfigs = config.extraConfigs}),
127+
injectAddPoolFailure_(injectAddPoolFailure) {}
125128

126-
void addPool(const std::shared_ptr<MemoryPool>& /*unused*/) override {}
129+
void addPool(const std::shared_ptr<MemoryPool>& /*unused*/) override {
130+
VELOX_CHECK(!injectAddPoolFailure_, "Failed to add pool");
131+
}
127132

128133
void removePool(MemoryPool* /*unused*/) override {}
129134

@@ -152,6 +157,9 @@ class FakeTestArbitrator : public MemoryArbitrator {
152157
std::string kind() const override {
153158
return "FAKE";
154159
}
160+
161+
private:
162+
const bool injectAddPoolFailure_{false};
155163
};
156164
} // namespace
157165

@@ -173,6 +181,22 @@ TEST_F(MemoryManagerTest, createWithCustomArbitrator) {
173181
ASSERT_EQ(manager.allocator()->capacity(), options.allocatorCapacity);
174182
}
175183

184+
TEST_F(MemoryManagerTest, addPoolFailure) {
185+
const std::string kindString = "FAKE";
186+
MemoryArbitrator::Factory factory =
187+
[](const MemoryArbitrator::Config& config) {
188+
return std::make_unique<FakeTestArbitrator>(
189+
config, /*injectAddPoolFailure*/ true);
190+
};
191+
MemoryArbitrator::registerFactory(kindString, factory);
192+
auto guard = folly::makeGuard(
193+
[&] { MemoryArbitrator::unregisterFactory(kindString); });
194+
MemoryManagerOptions options;
195+
options.arbitratorKind = kindString;
196+
MemoryManager manager{options};
197+
VELOX_ASSERT_THROW(manager.addRootPool(), "Failed to add pool");
198+
}
199+
176200
TEST_F(MemoryManagerTest, addPool) {
177201
MemoryManager manager{};
178202

velox/common/memory/tests/MemoryPoolTest.cpp

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -152,23 +152,21 @@ TEST_P(MemoryPoolTest, ctor) {
152152
ASSERT_EQ(root->parent(), nullptr);
153153
ASSERT_EQ(root->root(), root.get());
154154
ASSERT_EQ(root->capacity(), capacity);
155+
VELOX_ASSERT_THROW(
156+
static_cast<MemoryPoolImpl*>(root.get())
157+
->setDestructionCallback([](MemoryPool*) {}),
158+
"");
155159

156160
{
157161
auto fakeRoot = std::make_shared<MemoryPoolImpl>(
158-
&manager,
159-
"fake_root",
160-
MemoryPool::Kind::kAggregate,
161-
nullptr,
162-
nullptr,
163-
nullptr);
162+
&manager, "fake_root", MemoryPool::Kind::kAggregate, nullptr, nullptr);
164163
// We can't construct an aggregate memory pool with non-thread safe.
165164
ASSERT_ANY_THROW(std::make_shared<MemoryPoolImpl>(
166165
&manager,
167166
"fake_root",
168167
MemoryPool::Kind::kAggregate,
169168
nullptr,
170169
nullptr,
171-
nullptr,
172170
MemoryPool::Options{.threadSafe = false}));
173171
ASSERT_EQ("fake_root", fakeRoot->name());
174172
ASSERT_EQ(
@@ -182,6 +180,10 @@ TEST_P(MemoryPoolTest, ctor) {
182180
ASSERT_EQ(child->parent(), root.get());
183181
ASSERT_EQ(child->root(), root.get());
184182
ASSERT_EQ(child->capacity(), capacity);
183+
VELOX_ASSERT_THROW(
184+
static_cast<MemoryPoolImpl*>(child.get())
185+
->setDestructionCallback([](MemoryPool*) {}),
186+
"");
185187
auto& favoriteChild = dynamic_cast<MemoryPoolImpl&>(*child);
186188
ASSERT_EQ("child", favoriteChild.name());
187189
ASSERT_EQ(
@@ -194,6 +196,10 @@ TEST_P(MemoryPoolTest, ctor) {
194196
ASSERT_EQ(aggregateChild->parent(), root.get());
195197
ASSERT_EQ(aggregateChild->root(), root.get());
196198
ASSERT_EQ(aggregateChild->capacity(), capacity);
199+
VELOX_ASSERT_THROW(
200+
static_cast<MemoryPoolImpl*>(aggregateChild.get())
201+
->setDestructionCallback([](MemoryPool*) {}),
202+
"");
197203
auto grandChild = aggregateChild->addLeafChild("child", isLeafThreadSafe_);
198204
ASSERT_EQ(grandChild->parent(), aggregateChild.get());
199205
ASSERT_EQ(grandChild->root(), root.get());

0 commit comments

Comments
 (0)