Skip to content

Commit 91f6686

Browse files
author
Andrei Nasonov
committed
issue-1751: Restore and drain WriteBackCache at session recreation
1 parent d93550d commit 91f6686

File tree

4 files changed

+268
-79
lines changed

4 files changed

+268
-79
lines changed

cloud/filestore/libs/vfs_fuse/fs_impl.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,20 @@ struct TReleaseRequest
5757

5858
////////////////////////////////////////////////////////////////////////////////
5959

60+
enum class EServerWriteBackCacheMode
61+
{
62+
// The request should go to the session
63+
Disabled,
64+
65+
// The request should go to the WriteBackCache
66+
Enabled,
67+
68+
// The request should wait until the cache is empty then go to the session
69+
Draining
70+
};
71+
72+
////////////////////////////////////////////////////////////////////////////////
73+
6074
class TFileSystem final
6175
: public IFileSystem
6276
, public std::enable_shared_from_this<TFileSystem>
@@ -395,7 +409,8 @@ class TFileSystem final
395409
fuse_ino_t ino,
396410
uint64_t fh);
397411

398-
bool ShouldUseServerWriteBackCache(const fuse_file_info* fi) const;
412+
EServerWriteBackCacheMode GetServerWriteBackCacheMode(
413+
const fuse_file_info* fi) const;
399414

400415
bool UpdateNodeCache(
401416
const NProto::TNodeAttr& attrs,

cloud/filestore/libs/vfs_fuse/fs_impl_data.cpp

Lines changed: 116 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -413,45 +413,67 @@ void TFileSystem::Write(
413413
request->SetBuffer(alignedBuffer.TakeBuffer());
414414

415415
const auto size = buffer.size();
416+
const auto wbcMode = GetServerWriteBackCacheMode(fi);
416417

417-
if (ShouldUseServerWriteBackCache(fi)) {
418+
const auto handle = fi->fh;
419+
const auto reqId = callContext->RequestId;
420+
421+
auto callback = [=, ptr = weak_from_this()](const auto& future)
422+
{
423+
auto self = ptr.lock();
424+
if (!self) {
425+
return;
426+
}
427+
428+
const auto& response = future.GetValue();
429+
const auto& error = response.GetError();
430+
431+
if (wbcMode != EServerWriteBackCacheMode::Enabled) {
432+
self->FSyncQueue
433+
->Dequeue(reqId, error, TNodeId{ino}, THandle{handle});
434+
}
435+
436+
if (self->CheckResponse(self, *callContext, req, response)) {
437+
self->ReplyWrite(*callContext, error, req, size);
438+
}
439+
};
440+
441+
if (wbcMode == EServerWriteBackCacheMode::Enabled) {
418442
WriteBackCache.WriteData(callContext, std::move(request))
419-
.Subscribe(
420-
[=,
421-
ptr = weak_from_this()] (const auto& future)
422-
{
423-
auto self = ptr.lock();
424-
if (!self) {
425-
return;
426-
}
427-
428-
const auto& response = future.GetValue();
429-
const auto& error = response.GetError();
430-
431-
if (CheckResponse(self, *callContext, req, response)) {
432-
self->ReplyWrite(*callContext, error, req, size);
433-
}
434-
});
443+
.Subscribe(std::move(callback));
435444
return;
436445
}
437446

438-
const auto handle = fi->fh;
439-
const auto reqId = callContext->RequestId;
440447
FSyncQueue->Enqueue(reqId, TNodeId {ino}, THandle {handle});
441448

442-
Session->WriteData(callContext, std::move(request))
443-
.Subscribe([=, ptr = weak_from_this()] (const auto& future) {
444-
auto self = ptr.lock();
445-
if (!self) {
446-
return;
447-
}
449+
if (wbcMode == EServerWriteBackCacheMode::Disabled) {
450+
Session->WriteData(callContext, std::move(request))
451+
.Subscribe(std::move(callback));
452+
return;
453+
}
448454

449-
const auto& response = future.GetValue();
450-
const auto& error = response.GetError();
451-
self->FSyncQueue->Dequeue(reqId, error, TNodeId {ino}, THandle {handle});
455+
Y_ABORT_UNLESS(
456+
wbcMode == EServerWriteBackCacheMode::Draining,
457+
"Invalid EServerWriteBackCacheMode value = %d",
458+
wbcMode);
452459

453-
if (CheckResponse(self, *callContext, req, response)) {
454-
self->ReplyWrite(*callContext, error, req, size);
460+
auto flushFuture = WriteBackCache.FlushNodeData(request->GetNodeId());
461+
if (flushFuture.HasValue()) {
462+
Session->WriteData(callContext, std::move(request))
463+
.Subscribe(std::move(callback));
464+
return;
465+
}
466+
467+
flushFuture.Subscribe(
468+
[ptr = weak_from_this(),
469+
callback = std::move(callback),
470+
callContext = std::move(callContext),
471+
request = std::move(request)](const auto& f) mutable
472+
{
473+
f.GetValue();
474+
if (auto self = ptr.lock()) {
475+
self->Session->WriteData(callContext, std::move(request))
476+
.Subscribe(std::move(callback));
455477
}
456478
});
457479
}
@@ -577,43 +599,66 @@ void TFileSystem::WriteBuf(
577599
request->SetHandle(fi->fh);
578600
request->SetOffset(offset);
579601

580-
if (ShouldUseServerWriteBackCache(fi)) {
602+
const auto wbcMode = GetServerWriteBackCacheMode(fi);
603+
const auto handle = fi->fh;
604+
const auto reqId = callContext->RequestId;
605+
606+
auto callback = [=, ptr = weak_from_this()](const auto& future)
607+
{
608+
auto self = ptr.lock();
609+
if (!self) {
610+
return;
611+
}
612+
613+
const auto& response = future.GetValue();
614+
const auto& error = response.GetError();
615+
616+
if (wbcMode != EServerWriteBackCacheMode::Enabled) {
617+
self->FSyncQueue
618+
->Dequeue(reqId, error, TNodeId{ino}, THandle{handle});
619+
}
620+
621+
if (self->CheckResponse(self, *callContext, req, response)) {
622+
self->ReplyWrite(*callContext, error, req, size);
623+
}
624+
};
625+
626+
if (wbcMode == EServerWriteBackCacheMode::Enabled) {
581627
WriteBackCache.WriteData(callContext, std::move(request))
582-
.Subscribe(
583-
[=, ptr = weak_from_this()] (const auto& future)
584-
{
585-
auto self = ptr.lock();
586-
if (!self) {
587-
return;
588-
}
589-
590-
const auto& response = future.GetValue();
591-
const auto& error = response.GetError();
592-
593-
if (CheckResponse(self, *callContext, req, response)) {
594-
self->ReplyWrite(*callContext, error, req, size);
595-
}
596-
});
628+
.Subscribe(std::move(callback));
597629
return;
598630
}
599631

600-
const auto handle = fi->fh;
601-
const auto reqId = callContext->RequestId;
602-
FSyncQueue->Enqueue(reqId, TNodeId {ino}, THandle {handle});
632+
FSyncQueue->Enqueue(reqId, TNodeId{ino}, THandle{handle});
603633

604-
Session->WriteData(callContext, std::move(request))
605-
.Subscribe([=, ptr = weak_from_this()] (const auto& future) {
606-
auto self = ptr.lock();
607-
if (!self) {
608-
return;
609-
}
634+
if (wbcMode == EServerWriteBackCacheMode::Disabled) {
635+
Session->WriteData(callContext, std::move(request))
636+
.Subscribe(std::move(callback));
637+
return;
638+
}
610639

611-
const auto& response = future.GetValue();
612-
const auto& error = response.GetError();
613-
self->FSyncQueue->Dequeue(reqId, error, TNodeId {ino}, THandle {handle});
640+
Y_ABORT_UNLESS(
641+
wbcMode == EServerWriteBackCacheMode::Draining,
642+
"Invalid EServerWriteBackCacheMode value = %d",
643+
wbcMode);
614644

615-
if (CheckResponse(self, *callContext, req, response)) {
616-
self->ReplyWrite(*callContext, error, req, size);
645+
auto flushFuture = WriteBackCache.FlushNodeData(request->GetNodeId());
646+
if (flushFuture.HasValue()) {
647+
Session->WriteData(callContext, std::move(request))
648+
.Subscribe(std::move(callback));
649+
return;
650+
}
651+
652+
flushFuture.Subscribe(
653+
[ptr = weak_from_this(),
654+
callback = std::move(callback),
655+
callContext = std::move(callContext),
656+
request = std::move(request)](const auto& f) mutable
657+
{
658+
f.GetValue();
659+
if (auto self = ptr.lock()) {
660+
self->Session->WriteData(callContext, std::move(request))
661+
.Subscribe(std::move(callback));
617662
}
618663
});
619664
}
@@ -960,17 +1005,23 @@ void TFileSystem::FSyncDir(
9601005

9611006
////////////////////////////////////////////////////////////////////////////////
9621007

963-
bool TFileSystem::ShouldUseServerWriteBackCache(const fuse_file_info* fi) const
1008+
EServerWriteBackCacheMode TFileSystem::GetServerWriteBackCacheMode(
1009+
const fuse_file_info* fi) const
9641010
{
965-
if (!WriteBackCache || !Config->GetServerWriteBackCacheEnabled()) {
966-
return false;
1011+
if (!WriteBackCache) {
1012+
return EServerWriteBackCacheMode::Disabled;
9671013
}
9681014

9691015
if (fi->flags & O_DIRECT) {
970-
return false;
1016+
return EServerWriteBackCacheMode::Disabled;
9711017
}
9721018

973-
return true;
1019+
if (Config->GetServerWriteBackCacheEnabled()) {
1020+
return EServerWriteBackCacheMode::Enabled;
1021+
}
1022+
1023+
return WriteBackCache.IsEmpty() ? EServerWriteBackCacheMode::Disabled
1024+
: EServerWriteBackCacheMode::Draining;
9741025
}
9751026

9761027
} // namespace NCloud::NFileStore::NFuse

cloud/filestore/libs/vfs_fuse/fs_ut.cpp

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2808,6 +2808,129 @@ Y_UNIT_TEST_SUITE(TFileSystemTest)
28082808
TestShouldSupportZeroCopyWriteByWriteBackCache(false);
28092809
TestShouldSupportZeroCopyWriteByWriteBackCache(true);
28102810
}
2811+
2812+
Y_UNIT_TEST(ShouldRestoreAndDrainCacheAfterSessionRestart)
2813+
{
2814+
const TString sessionId = CreateGuidAsString();
2815+
2816+
std::atomic<int> writeDataCalled = 0;
2817+
std::atomic<int> writeDataCalled2 = 0;
2818+
2819+
const ui64 nodeId = 123;
2820+
const ui64 handleId = 456;
2821+
2822+
{
2823+
NProto::TFileStoreFeatures features;
2824+
features.SetServerWriteBackCacheEnabled(true);
2825+
2826+
TBootstrap bootstrap(
2827+
CreateWallClockTimer(),
2828+
CreateScheduler(),
2829+
features);
2830+
2831+
bootstrap.Service->CreateSessionHandler = [&](auto, auto)
2832+
{
2833+
NProto::TCreateSessionResponse result;
2834+
result.MutableSession()->SetSessionId(sessionId);
2835+
result.MutableFileStore()->SetBlockSize(4096);
2836+
result.MutableFileStore()->MutableFeatures()->CopyFrom(
2837+
features);
2838+
result.MutableFileStore()->SetFileSystemId(FileSystemId);
2839+
return MakeFuture(result);
2840+
};
2841+
2842+
bootstrap.Service->WriteDataHandler = [&](auto, const auto&)
2843+
{
2844+
writeDataCalled++;
2845+
NProto::TWriteDataResponse result;
2846+
return MakeFuture(result);
2847+
};
2848+
2849+
bootstrap.Start();
2850+
Y_DEFER
2851+
{
2852+
bootstrap.Stop();
2853+
};
2854+
2855+
auto reqWrite = std::make_shared<TWriteRequest>(
2856+
nodeId,
2857+
handleId,
2858+
0,
2859+
CreateBuffer(4096, 'a'));
2860+
reqWrite->In->Body.flags |= O_WRONLY;
2861+
auto write = bootstrap.Fuse->SendRequest<TWriteRequest>(reqWrite);
2862+
UNIT_ASSERT_NO_EXCEPTION(write.GetValue(WaitTimeout));
2863+
2864+
auto suspend = bootstrap.Loop->SuspendAsync();
2865+
UNIT_ASSERT(suspend.Wait(WaitTimeout));
2866+
}
2867+
2868+
// Since write-back cache was enabled, the actual write didn't happen
2869+
// and the request is stored in the persistent queue
2870+
UNIT_ASSERT_VALUES_EQUAL(0, writeDataCalled.load());
2871+
2872+
{
2873+
NProto::TFileStoreFeatures features;
2874+
features.SetServerWriteBackCacheEnabled(true);
2875+
2876+
TBootstrap bootstrap(
2877+
CreateWallClockTimer(),
2878+
CreateScheduler(),
2879+
features);
2880+
2881+
bootstrap.Service->CreateSessionHandler = [&](auto, auto)
2882+
{
2883+
// It is expected that the unwritten requests are restored from
2884+
// the persistent queue even if write-back cache is disabled now
2885+
NProto::TFileStoreFeatures features;
2886+
features.SetServerWriteBackCacheEnabled(false);
2887+
2888+
NProto::TCreateSessionResponse result;
2889+
result.MutableSession()->SetSessionId(sessionId);
2890+
result.MutableFileStore()->SetBlockSize(4096);
2891+
result.MutableFileStore()->MutableFeatures()->CopyFrom(
2892+
features);
2893+
result.MutableFileStore()->SetFileSystemId(FileSystemId);
2894+
return MakeFuture(result);
2895+
};
2896+
2897+
bootstrap.Service->WriteDataHandler = [&](auto, const auto&)
2898+
{
2899+
writeDataCalled2++;
2900+
NProto::TWriteDataResponse result;
2901+
return MakeFuture(result);
2902+
};
2903+
2904+
bootstrap.Start();
2905+
Y_DEFER
2906+
{
2907+
bootstrap.Stop();
2908+
};
2909+
2910+
UNIT_ASSERT_VALUES_EQUAL(0, writeDataCalled2.load());
2911+
2912+
auto flush =
2913+
bootstrap.Fuse->SendRequest<TFlushRequest>(nodeId, handleId);
2914+
UNIT_ASSERT_NO_EXCEPTION(flush.GetValue(WaitTimeout));
2915+
2916+
// cache should be flushed
2917+
UNIT_ASSERT_VALUES_EQUAL(0, writeDataCalled.load());
2918+
UNIT_ASSERT_VALUES_EQUAL(1, writeDataCalled2.load());
2919+
2920+
auto reqWrite = std::make_shared<TWriteRequest>(
2921+
nodeId,
2922+
handleId,
2923+
0,
2924+
CreateBuffer(4096, 'a'));
2925+
reqWrite->In->Body.flags |= O_WRONLY;
2926+
auto write = bootstrap.Fuse->SendRequest<TWriteRequest>(reqWrite);
2927+
UNIT_ASSERT_NO_EXCEPTION(write.GetValue(WaitTimeout));
2928+
2929+
// Cache is drained and disabled - new requests go directly
2930+
// to the session
2931+
UNIT_ASSERT_VALUES_EQUAL(2, writeDataCalled2.load());
2932+
}
2933+
}
28112934
}
28122935

28132936
} // namespace NCloud::NFileStore::NFuse

0 commit comments

Comments
 (0)