Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion cloud/filestore/libs/vfs_fuse/fs_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ struct TReleaseRequest

////////////////////////////////////////////////////////////////////////////////

enum class EServerWriteBackCacheState
{
// Requests should bypass WriteBackCache and go directly to the session
// (even if WriteBackCache is initialized)
Disabled,

// Requests should go to the WriteBackCache
Enabled,

// WriteBackCache is in the transition from Enabled to Disabled state.
// Requests should wait until WriteBackCache is flushed and then go
// directly to the session
Draining
};

////////////////////////////////////////////////////////////////////////////////

class TFileSystem final
: public IFileSystem
, public std::enable_shared_from_this<TFileSystem>
Expand Down Expand Up @@ -395,7 +412,8 @@ class TFileSystem final
fuse_ino_t ino,
uint64_t fh);

bool ShouldUseServerWriteBackCache(const fuse_file_info* fi) const;
EServerWriteBackCacheState GetServerWriteBackCacheState(
const fuse_file_info* fi) const;

bool UpdateNodeCache(
const NProto::TNodeAttr& attrs,
Expand Down Expand Up @@ -450,6 +468,14 @@ class TFileSystem final
void ScheduleProcessHandleOpsQueue();
void ProcessHandleOpsQueue();

void DoWrite(
TCallContextPtr callContext,
fuse_req_t req,
fuse_ino_t ino,
std::shared_ptr<NProto::TWriteDataRequest> request,
ui64 size,
fuse_file_info* fi);

#define FILESYSTEM_REPLY_IMPL(name, ...) \
template<typename... TArgs> \
int Reply##name( \
Expand Down
148 changes: 71 additions & 77 deletions cloud/filestore/libs/vfs_fuse/fs_impl_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,48 +412,74 @@ void TFileSystem::Write(
request->SetBufferOffset(alignedBuffer.AlignedDataOffset());
request->SetBuffer(alignedBuffer.TakeBuffer());

const auto size = buffer.size();
DoWrite(callContext, req, ino, std::move(request), buffer.size(), fi);
}

void TFileSystem::DoWrite(
TCallContextPtr callContext,
fuse_req_t req,
fuse_ino_t ino,
std::shared_ptr<NProto::TWriteDataRequest> request,
ui64 size,
fuse_file_info* fi)
{
const auto wbcMode = GetServerWriteBackCacheState(fi);

const auto handle = fi->fh;
const auto reqId = callContext->RequestId;

auto callback = [=, ptr = weak_from_this()](const auto& future)
{
auto self = ptr.lock();
if (!self) {
return;
}

const auto& response = future.GetValue();
const auto& error = response.GetError();

if (ShouldUseServerWriteBackCache(fi)) {
if (wbcMode != EServerWriteBackCacheState::Enabled) {
self->FSyncQueue
->Dequeue(reqId, error, TNodeId{ino}, THandle{handle});
}

if (self->CheckResponse(self, *callContext, req, response)) {
self->ReplyWrite(*callContext, error, req, size);
}
};

if (wbcMode == EServerWriteBackCacheState::Enabled) {
WriteBackCache.WriteData(callContext, std::move(request))
.Subscribe(
[=,
ptr = weak_from_this()] (const auto& future)
{
auto self = ptr.lock();
if (!self) {
return;
}

const auto& response = future.GetValue();
const auto& error = response.GetError();

if (CheckResponse(self, *callContext, req, response)) {
self->ReplyWrite(*callContext, error, req, size);
}
});
.Subscribe(std::move(callback));
return;
}

const auto handle = fi->fh;
const auto reqId = callContext->RequestId;
FSyncQueue->Enqueue(reqId, TNodeId {ino}, THandle {handle});

Session->WriteData(callContext, std::move(request))
.Subscribe([=, ptr = weak_from_this()] (const auto& future) {
auto self = ptr.lock();
if (!self) {
return;
}
if (wbcMode == EServerWriteBackCacheState::Disabled) {
Session->WriteData(callContext, std::move(request))
.Subscribe(std::move(callback));
return;
}

const auto& response = future.GetValue();
const auto& error = response.GetError();
self->FSyncQueue->Dequeue(reqId, error, TNodeId {ino}, THandle {handle});
Y_ABORT_UNLESS(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use STORAGE_VERIFY / STORAGE_VERIFY_C macros - they force the developer to provide the entity id (filesystem id / tablet id / client id / etc), otherwise we'll just see that some error happened without knowing anything about the entity that caused the problem (so the debugging process will take more time)

here we can use client id in the STORAGE_VERIFY message

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be done in a separate PR - there're many other ABORT_UNLESS usages in vfs_fuse which would be nice to replace with STORAGE_VERIFY(clientId)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say, that ABORT_UNLESS is the only usage for checks in vfs_fuse and filestore vhost for now. Wasn't able to find a single instance of Verify in those components

wbcMode == EServerWriteBackCacheState::Draining,
"Invalid EServerWriteBackCacheState value = %d",
wbcMode);

if (CheckResponse(self, *callContext, req, response)) {
self->ReplyWrite(*callContext, error, req, size);
}
});
WriteBackCache.FlushNodeData(request->GetNodeId())
.Subscribe(
[ptr = weak_from_this(),
callback = std::move(callback),
callContext = std::move(callContext),
request = std::move(request)](const auto& f) mutable
{
f.GetValue();
if (auto self = ptr.lock()) {
self->Session->WriteData(callContext, std::move(request))
.Subscribe(std::move(callback));
}
});
}

void TFileSystem::WriteBufLocal(
Expand Down Expand Up @@ -577,45 +603,7 @@ void TFileSystem::WriteBuf(
request->SetHandle(fi->fh);
request->SetOffset(offset);

if (ShouldUseServerWriteBackCache(fi)) {
WriteBackCache.WriteData(callContext, std::move(request))
.Subscribe(
[=, ptr = weak_from_this()] (const auto& future)
{
auto self = ptr.lock();
if (!self) {
return;
}

const auto& response = future.GetValue();
const auto& error = response.GetError();

if (CheckResponse(self, *callContext, req, response)) {
self->ReplyWrite(*callContext, error, req, size);
}
});
return;
}

const auto handle = fi->fh;
const auto reqId = callContext->RequestId;
FSyncQueue->Enqueue(reqId, TNodeId {ino}, THandle {handle});

Session->WriteData(callContext, std::move(request))
.Subscribe([=, ptr = weak_from_this()] (const auto& future) {
auto self = ptr.lock();
if (!self) {
return;
}

const auto& response = future.GetValue();
const auto& error = response.GetError();
self->FSyncQueue->Dequeue(reqId, error, TNodeId {ino}, THandle {handle});

if (CheckResponse(self, *callContext, req, response)) {
self->ReplyWrite(*callContext, error, req, size);
}
});
DoWrite(callContext, req, ino, std::move(request), size, fi);
}

void TFileSystem::FAllocate(
Expand Down Expand Up @@ -960,17 +948,23 @@ void TFileSystem::FSyncDir(

////////////////////////////////////////////////////////////////////////////////

bool TFileSystem::ShouldUseServerWriteBackCache(const fuse_file_info* fi) const
EServerWriteBackCacheState TFileSystem::GetServerWriteBackCacheState(
const fuse_file_info* fi) const
{
if (!WriteBackCache || !Config->GetServerWriteBackCacheEnabled()) {
return false;
if (!WriteBackCache) {
return EServerWriteBackCacheState::Disabled;
}

if (fi->flags & O_DIRECT) {
return false;
return EServerWriteBackCacheState::Disabled;
}

return true;
if (Config->GetServerWriteBackCacheEnabled()) {
return EServerWriteBackCacheState::Enabled;
}

return WriteBackCache.IsEmpty() ? EServerWriteBackCacheState::Disabled
: EServerWriteBackCacheState::Draining;
}

} // namespace NCloud::NFileStore::NFuse
Loading
Loading