Skip to content

Commit e69a90d

Browse files
authored
[KIKIMR-22131] Handle potential race in computation pattern cache (#16010)
1 parent e987bf6 commit e69a90d

File tree

3 files changed

+83
-111
lines changed

3 files changed

+83
-111
lines changed

ydb/library/yql/dq/runtime/dq_tasks_runner.cpp

+15-9
Original file line numberDiff line numberDiff line change
@@ -434,17 +434,23 @@ class TDqTaskRunner : public IDqTaskRunner {
434434
bool canBeCached;
435435
if (UseSeparatePatternAlloc(task) && Context.PatternCache) {
436436
auto& cache = Context.PatternCache;
437-
auto ticket = cache->FindOrSubscribe(program.GetRaw());
438-
if (!ticket.HasFuture()) {
439-
entry = CreateComputationPattern(task, program.GetRaw(), true, canBeCached);
440-
if (canBeCached && entry->Pattern->GetSuitableForCache()) {
441-
cache->EmplacePattern(task.GetProgram().GetRaw(), entry);
442-
ticket.Close();
443-
} else {
444-
cache->IncNotSuitablePattern();
437+
auto future = cache->FindOrSubscribe(program.GetRaw());
438+
if (!future.HasValue()) {
439+
try {
440+
entry = CreateComputationPattern(task, program.GetRaw(), true, canBeCached);
441+
if (canBeCached && entry->Pattern->GetSuitableForCache()) {
442+
cache->EmplacePattern(task.GetProgram().GetRaw(), entry);
443+
} else {
444+
cache->IncNotSuitablePattern();
445+
cache->NotifyPatternMissing(program.GetRaw());
446+
}
447+
} catch (...) {
448+
// TODO: not sure if there may be exceptions in the first place.
449+
cache->NotifyPatternMissing(program.GetRaw());
450+
throw;
445451
}
446452
} else {
447-
entry = ticket.GetValueSync();
453+
entry = future.GetValueSync();
448454
}
449455
}
450456

ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp

+54-51
Original file line numberDiff line numberDiff line change
@@ -33,28 +33,29 @@ class TComputationPatternLRUCache::TLRUPatternCacheImpl
3333
return CurrentPatternsCompiledCodeSizeInBytes;
3434
}
3535

36-
std::shared_ptr<TPatternCacheEntry>* Find(const TString& serializedProgram) {
36+
TPatternCacheEntryPtr Find(const TString& serializedProgram) {
3737
auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram);
3838
if (it == SerializedProgramToPatternCacheHolder.end()) {
39-
return nullptr;
39+
return {};
4040
}
4141

4242
PromoteEntry(&it->second);
4343

44-
return &it->second.Entry;
44+
return it->second.Entry;
4545
}
4646

47-
void Insert(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) {
47+
void Insert(const TString& serializedProgram, TPatternCacheEntryPtr& entry) {
4848
auto [it, inserted] = SerializedProgramToPatternCacheHolder.emplace(std::piecewise_construct,
4949
std::forward_as_tuple(serializedProgram),
5050
std::forward_as_tuple(serializedProgram, entry));
5151

5252
if (!inserted) {
5353
RemoveEntryFromLists(&it->second);
54+
entry = it->second.Entry;
55+
} else {
56+
entry->UpdateSizeForCache();
5457
}
5558

56-
entry->UpdateSizeForCache();
57-
5859
/// New item is inserted, insert it in the back of both LRU lists and recalculate sizes
5960
CurrentPatternsSizeBytes += entry->SizeForCache;
6061
LRUPatternList.PushBack(&it->second);
@@ -69,15 +70,20 @@ class TComputationPatternLRUCache::TLRUPatternCacheImpl
6970
ClearIfNeeded();
7071
}
7172

72-
void NotifyPatternCompiled(const TString & serializedProgram) {
73+
void NotifyPatternCompiled(const TString& serializedProgram) {
7374
auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram);
7475
if (it == SerializedProgramToPatternCacheHolder.end()) {
7576
return;
7677
}
7778

7879
const auto& entry = it->second.Entry;
7980

80-
Y_ASSERT(entry->Pattern->IsCompiled());
81+
if (!entry->Pattern->IsCompiled()) {
82+
// This is possible if the old entry got removed from cache while being compiled - and the new entry got in.
83+
// TODO: add metrics for this inefficient cache usage.
84+
// TODO: make this scenario more consistent - don't waste compilation result.
85+
return;
86+
}
8187

8288
if (it->second.LinkedInCompiledPatternLRUList()) {
8389
return;
@@ -113,7 +119,7 @@ class TComputationPatternLRUCache::TLRUPatternCacheImpl
113119
* Most recently accessed items are in back of the lists, least recently accessed items are in front of the lists.
114120
*/
115121
struct TPatternCacheHolder : public TIntrusiveListItem<TPatternCacheHolder, TPatternLRUListTag>, TIntrusiveListItem<TPatternCacheHolder, TCompiledPatternLRUListTag> {
116-
TPatternCacheHolder(TString serializedProgram, std::shared_ptr<TPatternCacheEntry> entry)
122+
TPatternCacheHolder(TString serializedProgram, TPatternCacheEntryPtr entry)
117123
: SerializedProgram(std::move(serializedProgram))
118124
, Entry(std::move(entry))
119125
{}
@@ -126,8 +132,8 @@ class TComputationPatternLRUCache::TLRUPatternCacheImpl
126132
return !TIntrusiveListItem<TPatternCacheHolder, TCompiledPatternLRUListTag>::Empty();
127133
}
128134

129-
TString SerializedProgram;
130-
std::shared_ptr<TPatternCacheEntry> Entry;
135+
const TString SerializedProgram;
136+
TPatternCacheEntryPtr Entry;
131137
};
132138

133139
void PromoteEntry(TPatternCacheHolder* holder) {
@@ -228,52 +234,51 @@ TComputationPatternLRUCache::~TComputationPatternLRUCache() {
228234
CleanCache();
229235
}
230236

231-
std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TString& serializedProgram) {
237+
TPatternCacheEntryPtr TComputationPatternLRUCache::Find(const TString& serializedProgram) {
232238
std::lock_guard<std::mutex> lock(Mutex);
233239
if (auto it = Cache->Find(serializedProgram)) {
234240
++*Hits;
235241

236-
if ((*it)->Pattern->IsCompiled())
242+
if (it->Pattern->IsCompiled())
237243
++*HitsCompiled;
238244

239-
return *it;
245+
return it;
240246
}
241247

242248
++*Misses;
243249
return {};
244250
}
245251

246-
TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscribe(const TString& serializedProgram) {
252+
TPatternCacheEntryFuture TComputationPatternLRUCache::FindOrSubscribe(const TString& serializedProgram) {
247253
std::lock_guard lock(Mutex);
248254
if (auto it = Cache->Find(serializedProgram)) {
249255
++*Hits;
250-
AccessPattern(serializedProgram, *it);
251-
return TTicket(serializedProgram, false, NThreading::MakeFuture<std::shared_ptr<TPatternCacheEntry>>(*it), nullptr);
256+
AccessPattern(serializedProgram, it);
257+
return NThreading::MakeFuture<TPatternCacheEntryPtr>(it);
252258
}
253259

254-
auto [notifyIt, isNew] = Notify.emplace(serializedProgram, Nothing());
260+
auto [notifyIt, isNew] = Notify.emplace(std::piecewise_construct, std::forward_as_tuple(serializedProgram), std::forward_as_tuple());
255261
if (isNew) {
256262
++*Misses;
257-
return TTicket(serializedProgram, true, {}, this);
263+
// First future is empty - so the subscriber can initiate the entry creation.
264+
return {};
258265
}
259266

260267
++*Waits;
261-
auto promise = NThreading::NewPromise<std::shared_ptr<TPatternCacheEntry>>();
268+
auto promise = NThreading::NewPromise<TPatternCacheEntryPtr>();
262269
auto& subscribers = notifyIt->second;
263-
if (!subscribers) {
264-
subscribers.ConstructInPlace();
265-
}
270+
subscribers.push_back(promise);
266271

267-
subscribers->push_back(promise);
268-
return TTicket(serializedProgram, false, promise, nullptr);
272+
// Second and next futures are not empty - so subscribers can wait while first one creates the entry.
273+
return promise;
269274
}
270275

271-
void TComputationPatternLRUCache::EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
276+
void TComputationPatternLRUCache::EmplacePattern(const TString& serializedProgram, TPatternCacheEntryPtr& patternWithEnv) {
272277
Y_DEBUG_ABORT_UNLESS(patternWithEnv && patternWithEnv->Pattern);
273-
TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
278+
TVector<NThreading::TPromise<TPatternCacheEntryPtr>> subscribers;
274279

275280
{
276-
std::lock_guard<std::mutex> lock(Mutex);
281+
std::lock_guard lock(Mutex);
277282
Cache->Insert(serializedProgram, patternWithEnv);
278283

279284
auto notifyIt = Notify.find(serializedProgram);
@@ -288,10 +293,8 @@ void TComputationPatternLRUCache::EmplacePattern(const TString& serializedProgra
288293
*SizeCompiledBytes = Cache->PatternsCompiledCodeSizeInBytes();
289294
}
290295

291-
if (subscribers) {
292-
for (auto& subscriber : *subscribers) {
293-
subscriber.SetValue(patternWithEnv);
294-
}
296+
for (auto& subscriber : subscribers) {
297+
subscriber.SetValue(patternWithEnv);
295298
}
296299
}
297300

@@ -300,6 +303,24 @@ void TComputationPatternLRUCache::NotifyPatternCompiled(const TString& serialize
300303
Cache->NotifyPatternCompiled(serializedProgram);
301304
}
302305

306+
void TComputationPatternLRUCache::NotifyPatternMissing(const TString& serializedProgram) {
307+
TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>> subscribers;
308+
{
309+
std::lock_guard lock(Mutex);
310+
311+
auto notifyIt = Notify.find(serializedProgram);
312+
if (notifyIt != Notify.end()) {
313+
subscribers.swap(notifyIt->second);
314+
Notify.erase(notifyIt);
315+
}
316+
}
317+
318+
for (auto& subscriber : subscribers) {
319+
// It's part of API - to set nullptr as broken promise.
320+
subscriber.SetValue(nullptr);
321+
}
322+
}
323+
303324
size_t TComputationPatternLRUCache::GetSize() const {
304325
std::lock_guard lock(Mutex);
305326
return Cache->PatternsSize();
@@ -314,7 +335,7 @@ void TComputationPatternLRUCache::CleanCache() {
314335
Cache->Clear();
315336
}
316337

317-
void TComputationPatternLRUCache::AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry) {
338+
void TComputationPatternLRUCache::AccessPattern(const TString& serializedProgram, TPatternCacheEntryPtr entry) {
318339
if (!Configuration.PatternAccessTimesBeforeTryToCompile || entry->Pattern->IsCompiled()) {
319340
return;
320341
}
@@ -326,22 +347,4 @@ void TComputationPatternLRUCache::AccessPattern(const TString & serializedProgra
326347
}
327348
}
328349

329-
void TComputationPatternLRUCache::NotifyMissing(const TString& serialized) {
330-
TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
331-
{
332-
std::lock_guard<std::mutex> lock(Mutex);
333-
auto notifyIt = Notify.find(serialized);
334-
if (notifyIt != Notify.end()) {
335-
subscribers.swap(notifyIt->second);
336-
Notify.erase(notifyIt);
337-
}
338-
}
339-
340-
if (subscribers) {
341-
for (auto& subscriber : *subscribers) {
342-
subscriber.SetValue(nullptr);
343-
}
344-
}
345-
}
346-
347350
} // namespace NKikimr::NMiniKQL

ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h

+14-51
Original file line numberDiff line numberDiff line change
@@ -53,43 +53,11 @@ struct TPatternCacheEntry {
5353
}
5454
};
5555

56+
using TPatternCacheEntryPtr = std::shared_ptr<TPatternCacheEntry>;
57+
using TPatternCacheEntryFuture = NThreading::TFuture<TPatternCacheEntryPtr>;
58+
5659
class TComputationPatternLRUCache {
5760
public:
58-
class TTicket : private TNonCopyable {
59-
public:
60-
TTicket(const TString& serialized, bool isOwned, const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>>& future, TComputationPatternLRUCache* cache)
61-
: Serialized(serialized)
62-
, IsOwned(isOwned)
63-
, Future(future)
64-
, Cache(cache)
65-
{}
66-
67-
~TTicket() {
68-
if (Cache) {
69-
Cache->NotifyMissing(Serialized);
70-
}
71-
}
72-
73-
bool HasFuture() const {
74-
return !IsOwned;
75-
}
76-
77-
std::shared_ptr<TPatternCacheEntry> GetValueSync() const {
78-
Y_ABORT_UNLESS(HasFuture());
79-
return Future.GetValueSync();
80-
}
81-
82-
void Close() {
83-
Cache = nullptr;
84-
}
85-
86-
private:
87-
const TString Serialized;
88-
const bool IsOwned;
89-
const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>> Future;
90-
TComputationPatternLRUCache* Cache;
91-
};
92-
9361
struct Config {
9462
Config(size_t maxSizeBytes, size_t maxCompiledSizeBytes)
9563
: MaxSizeBytes(maxSizeBytes)
@@ -120,17 +88,17 @@ class TComputationPatternLRUCache {
12088

12189
~TComputationPatternLRUCache();
12290

123-
static std::shared_ptr<TPatternCacheEntry> CreateCacheEntry(bool useAlloc = true) {
91+
static TPatternCacheEntryPtr CreateCacheEntry(bool useAlloc = true) {
12492
return std::make_shared<TPatternCacheEntry>(useAlloc);
12593
}
12694

127-
std::shared_ptr<TPatternCacheEntry> Find(const TString& serializedProgram);
95+
TPatternCacheEntryPtr Find(const TString& serializedProgram);
96+
TPatternCacheEntryFuture FindOrSubscribe(const TString& serializedProgram);
12897

129-
TTicket FindOrSubscribe(const TString& serializedProgram);
130-
131-
void EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
98+
void EmplacePattern(const TString& serializedProgram, TPatternCacheEntryPtr& patternWithEnv);
13299

133100
void NotifyPatternCompiled(const TString& serializedProgram);
101+
void NotifyPatternMissing(const TString& serializedProgram);
134102

135103
size_t GetSize() const;
136104

@@ -159,27 +127,22 @@ class TComputationPatternLRUCache {
159127
return PatternsToCompile.size();
160128
}
161129

162-
void GetPatternsToCompile(THashMap<TString, std::shared_ptr<TPatternCacheEntry>> & result) {
130+
void GetPatternsToCompile(THashMap<TString, TPatternCacheEntryPtr> & result) {
163131
std::lock_guard lock(Mutex);
164132
result.swap(PatternsToCompile);
165133
}
166134

167135
private:
168-
void AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry);
169-
170-
void NotifyMissing(const TString& serialized);
136+
class TLRUPatternCacheImpl;
171137

172138
static constexpr size_t CacheMaxElementsSize = 10000;
173139

174-
friend class TTicket;
140+
void AccessPattern(const TString& serializedProgram, TPatternCacheEntryPtr entry);
175141

176142
mutable std::mutex Mutex;
177-
THashMap<TString, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify;
178-
179-
class TLRUPatternCacheImpl;
180-
std::unique_ptr<TLRUPatternCacheImpl> Cache;
181-
182-
THashMap<TString, std::shared_ptr<TPatternCacheEntry>> PatternsToCompile;
143+
THashMap<TString, TVector<NThreading::TPromise<TPatternCacheEntryPtr>>> Notify; // protected by Mutex
144+
std::unique_ptr<TLRUPatternCacheImpl> Cache; // protected by Mutex
145+
THashMap<TString, TPatternCacheEntryPtr> PatternsToCompile; // protected by Mutex
183146

184147
const Config Configuration;
185148

0 commit comments

Comments
 (0)