Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion cloud/filestore/libs/vfs_fuse/fs_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ struct TReleaseRequest

////////////////////////////////////////////////////////////////////////////////

enum class EServerWriteBackCacheMode
{
// The request should go to the session
Disabled,

// The request should go to the WriteBackCache
Enabled,

// The request should wait until the cache is empty then go to the session
Draining
};

////////////////////////////////////////////////////////////////////////////////

class TFileSystem final
: public IFileSystem
, public std::enable_shared_from_this<TFileSystem>
Expand Down Expand Up @@ -395,7 +409,8 @@ class TFileSystem final
fuse_ino_t ino,
uint64_t fh);

bool ShouldUseServerWriteBackCache(const fuse_file_info* fi) const;
EServerWriteBackCacheMode GetServerWriteBackCacheMode(
const fuse_file_info* fi) const;

bool UpdateNodeCache(
const NProto::TNodeAttr& attrs,
Expand Down
181 changes: 116 additions & 65 deletions cloud/filestore/libs/vfs_fuse/fs_impl_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,45 +413,67 @@ void TFileSystem::Write(
request->SetBuffer(alignedBuffer.TakeBuffer());

const auto size = buffer.size();
const auto wbcMode = GetServerWriteBackCacheMode(fi);

if (ShouldUseServerWriteBackCache(fi)) {
const auto handle = fi->fh;
const auto reqId = callContext->RequestId;

auto callback = [=, ptr = weak_from_this()](const auto& future)
{
auto self = ptr.lock();
if (!self) {
return;
}

const auto& response = future.GetValue();
const auto& error = response.GetError();

if (wbcMode != EServerWriteBackCacheMode::Enabled) {
self->FSyncQueue
->Dequeue(reqId, error, TNodeId{ino}, THandle{handle});
}

if (self->CheckResponse(self, *callContext, req, response)) {
self->ReplyWrite(*callContext, error, req, size);
}
};

if (wbcMode == EServerWriteBackCacheMode::Enabled) {
WriteBackCache.WriteData(callContext, std::move(request))
.Subscribe(
[=,
ptr = weak_from_this()] (const auto& future)
{
auto self = ptr.lock();
if (!self) {
return;
}

const auto& response = future.GetValue();
const auto& error = response.GetError();

if (CheckResponse(self, *callContext, req, response)) {
self->ReplyWrite(*callContext, error, req, size);
}
});
.Subscribe(std::move(callback));
return;
}

const auto handle = fi->fh;
const auto reqId = callContext->RequestId;
FSyncQueue->Enqueue(reqId, TNodeId {ino}, THandle {handle});

Session->WriteData(callContext, std::move(request))
.Subscribe([=, ptr = weak_from_this()] (const auto& future) {
auto self = ptr.lock();
if (!self) {
return;
}
if (wbcMode == EServerWriteBackCacheMode::Disabled) {
Session->WriteData(callContext, std::move(request))
.Subscribe(std::move(callback));
return;
}

const auto& response = future.GetValue();
const auto& error = response.GetError();
self->FSyncQueue->Dequeue(reqId, error, TNodeId {ino}, THandle {handle});
Y_ABORT_UNLESS(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use STORAGE_VERIFY / STORAGE_VERIFY_C macros - they force the developer to provide the entity id (filesystem id / tablet id / client id / etc), otherwise we'll just see that some error happened without knowing anything about the entity that caused the problem (so the debugging process will take more time)

here we can use client id in the STORAGE_VERIFY message

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be done in a separate PR - there're many other ABORT_UNLESS usages in vfs_fuse which would be nice to replace with STORAGE_VERIFY(clientId)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say, that ABORT_UNLESS is the only usage for checks in vfs_fuse and filestore vhost for now. Wasn't able to find a single instance of Verify in those components

wbcMode == EServerWriteBackCacheMode::Draining,
"Invalid EServerWriteBackCacheMode value = %d",
wbcMode);

if (CheckResponse(self, *callContext, req, response)) {
self->ReplyWrite(*callContext, error, req, size);
auto flushFuture = WriteBackCache.FlushNodeData(request->GetNodeId());
if (flushFuture.HasValue()) {
Session->WriteData(callContext, std::move(request))
.Subscribe(std::move(callback));
return;
}

flushFuture.Subscribe(
[ptr = weak_from_this(),
callback = std::move(callback),
callContext = std::move(callContext),
request = std::move(request)](const auto& f) mutable
{
f.GetValue();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (can??) flush fails, we just proceed to write normally

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlushAllData is not expected to throw an exception, we can safely make this call.
I don't like this code and I'd prefer Y_UNUSED(f) but this pattern is commonly used in other places.

if (auto self = ptr.lock()) {
self->Session->WriteData(callContext, std::move(request))
.Subscribe(std::move(callback));
}
});
}
Expand Down Expand Up @@ -577,43 +599,66 @@ void TFileSystem::WriteBuf(
request->SetHandle(fi->fh);
request->SetOffset(offset);

if (ShouldUseServerWriteBackCache(fi)) {
const auto wbcMode = GetServerWriteBackCacheMode(fi);
const auto handle = fi->fh;
const auto reqId = callContext->RequestId;

auto callback = [=, ptr = weak_from_this()](const auto& future)
{
auto self = ptr.lock();
if (!self) {
return;
}

const auto& response = future.GetValue();
const auto& error = response.GetError();

if (wbcMode != EServerWriteBackCacheMode::Enabled) {
self->FSyncQueue
->Dequeue(reqId, error, TNodeId{ino}, THandle{handle});
}

if (self->CheckResponse(self, *callContext, req, response)) {
self->ReplyWrite(*callContext, error, req, size);
}
};

if (wbcMode == EServerWriteBackCacheMode::Enabled) {
WriteBackCache.WriteData(callContext, std::move(request))
.Subscribe(
[=, ptr = weak_from_this()] (const auto& future)
{
auto self = ptr.lock();
if (!self) {
return;
}

const auto& response = future.GetValue();
const auto& error = response.GetError();

if (CheckResponse(self, *callContext, req, response)) {
self->ReplyWrite(*callContext, error, req, size);
}
});
.Subscribe(std::move(callback));
return;
}

const auto handle = fi->fh;
const auto reqId = callContext->RequestId;
FSyncQueue->Enqueue(reqId, TNodeId {ino}, THandle {handle});
FSyncQueue->Enqueue(reqId, TNodeId{ino}, THandle{handle});

Session->WriteData(callContext, std::move(request))
.Subscribe([=, ptr = weak_from_this()] (const auto& future) {
auto self = ptr.lock();
if (!self) {
return;
}
if (wbcMode == EServerWriteBackCacheMode::Disabled) {
Session->WriteData(callContext, std::move(request))
.Subscribe(std::move(callback));
return;
}

const auto& response = future.GetValue();
const auto& error = response.GetError();
self->FSyncQueue->Dequeue(reqId, error, TNodeId {ino}, THandle {handle});
Y_ABORT_UNLESS(
wbcMode == EServerWriteBackCacheMode::Draining,
"Invalid EServerWriteBackCacheMode value = %d",
wbcMode);

if (CheckResponse(self, *callContext, req, response)) {
self->ReplyWrite(*callContext, error, req, size);
auto flushFuture = WriteBackCache.FlushNodeData(request->GetNodeId());
if (flushFuture.HasValue()) {
Session->WriteData(callContext, std::move(request))
.Subscribe(std::move(callback));
return;
}

flushFuture.Subscribe(
[ptr = weak_from_this(),
callback = std::move(callback),
callContext = std::move(callContext),
request = std::move(request)](const auto& f) mutable
{
f.GetValue();
if (auto self = ptr.lock()) {
self->Session->WriteData(callContext, std::move(request))
.Subscribe(std::move(callback));
}
});
}
Expand Down Expand Up @@ -960,17 +1005,23 @@ void TFileSystem::FSyncDir(

////////////////////////////////////////////////////////////////////////////////

bool TFileSystem::ShouldUseServerWriteBackCache(const fuse_file_info* fi) const
EServerWriteBackCacheMode TFileSystem::GetServerWriteBackCacheMode(
const fuse_file_info* fi) const
{
if (!WriteBackCache || !Config->GetServerWriteBackCacheEnabled()) {
return false;
if (!WriteBackCache) {
return EServerWriteBackCacheMode::Disabled;
}

if (fi->flags & O_DIRECT) {
return false;
return EServerWriteBackCacheMode::Disabled;
}

return true;
if (Config->GetServerWriteBackCacheEnabled()) {
return EServerWriteBackCacheMode::Enabled;
}

return WriteBackCache.IsEmpty() ? EServerWriteBackCacheMode::Disabled
: EServerWriteBackCacheMode::Draining;
}

} // namespace NCloud::NFileStore::NFuse
Loading
Loading