Skip to content

Issue-3: reject requests to unavailable agents in RDMA partition #3260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 35 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
b7debd1
issue-3: add device timeout notify to rdma partition
vladstepanyuk Mar 14, 2025
be705e7
issue-3: correct issues
vladstepanyuk Mar 14, 2025
d2f131e
issue-3: correct issues
vladstepanyuk Mar 17, 2025
3eb347f
issue-3: fix timestamp reseting
vladstepanyuk Mar 17, 2025
66289d8
issue-3: correct small issues
vladstepanyuk Mar 17, 2025
030cdb7
issue-3: rmv request cancellation
vladstepanyuk Mar 18, 2025
146caff
issue-3: correct issues
vladstepanyuk Mar 18, 2025
fa9abf8
issue-3: rename ParentWasNotified to VolumeWasNotified and remove Dev…
vladstepanyuk Mar 18, 2025
f2d9d35
issue-3: fix build issue
vladstepanyuk Mar 19, 2025
e4628d0
issue-3:reduce boilerplate
vladstepanyuk Mar 20, 2025
df05605
issue-3: fix test
vladstepanyuk Mar 20, 2025
3a73d29
issue-3: reduce copy paste code
vladstepanyuk Mar 24, 2025
efb8ebc
issue-3: fix test
vladstepanyuk Mar 25, 2025
1580e7f
issue-3: correct issues
vladstepanyuk Mar 25, 2025
3903229
issue-3: fix build
vladstepanyuk Mar 25, 2025
b0a667e
issue-2869: set disconnect result into StopResult promise (#3174)
antonmyagkov Mar 14, 2025
ac20313
issue-1146: slight follow-up refactor: get rid of `IIndexTabletDataba…
debnatkh Mar 14, 2025
041cf43
Allow nonreplicated partition to timeout devices (#3171)
komarevtsev-d Mar 14, 2025
8d895fc
Add `cloud/storage/core/tools/testing/qemu/{bin,image}` to `filestore…
debnatkh Mar 14, 2025
3db151a
issue-176: generate root kms config file (#3188)
sharpeye Mar 14, 2025
e667b3f
vhost bug fix (#3192)
Sazonov99 Mar 17, 2025
e2ba3d3
Fix -Wc++11-narrowing-const-reference (#3196)
drbasic Mar 17, 2025
cbd2b82
avoid reconnecting live nbd sockets; resolves #3138 (#3172)
tpashkin Mar 17, 2025
835749a
issue-3212: added request handle to be able to cancel it
vladstepanyuk Mar 19, 2025
2e33e37
issue-3212: correct fmt issue
vladstepanyuk Mar 19, 2025
1ad952c
issue-3212: fix build issue
vladstepanyuk Mar 19, 2025
0f6f7ac
issue-3212: add handle cancel requests to handle events function
vladstepanyuk Mar 20, 2025
a26fcc1
issue-3212: add cancel status
vladstepanyuk Mar 21, 2025
afa60dd
issue-3212: rename canceled -> cancelled
vladstepanyuk Mar 21, 2025
f4ed878
issue-3212: fix test, change shared endpoint ptr in request handle to…
vladstepanyuk Mar 24, 2025
d02fee9
issue-3212: fix test
vladstepanyuk Mar 24, 2025
acc1b85
issue-3212: remove feature and replace handle with reqId
vladstepanyuk Mar 25, 2025
5b93690
FMT
vladstepanyuk Mar 25, 2025
bba02ce
issue-3: add cancellation of requests sended to unavailable agent
vladstepanyuk Mar 25, 2025
32c7e67
issue-3: add test for rejecting requests to unavailable agent
vladstepanyuk Mar 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cloud/blockstore/libs/endpoint_proxy/server/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ void TBootstrap::Init()
Options.StoredEndpointsPath,
Options.NbdRequestTimeout,
Options.NbdReconnectDelay,
Options.DebugRestartEventsCount,
},
CreateWallClockTimer(),
Scheduler,
Expand All @@ -44,8 +45,12 @@ void TBootstrap::Start()

void TBootstrap::Stop()
{
Server->Stop();
Scheduler->Stop();
if (Server) {
Server->Stop();
}
if (Scheduler) {
Scheduler->Stop();
}
}

} // namespace NCloud::NBlockStore::NServer
11 changes: 9 additions & 2 deletions cloud/blockstore/libs/endpoint_proxy/server/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ TOptions::TOptions()
.StoreResult(&StoredEndpointsPath);

Opts.AddLongOption("nbd-request-timeout")
.OptionalArgument("NUM")
.RequiredArgument("NUM")
.Handler1T<TString>([this] (const auto& s) {
NbdRequestTimeout = TDuration::Parse(s);
});

Opts.AddLongOption("nbd-reconnect-delay")
.OptionalArgument("NUM")
.RequiredArgument("NUM")
.Handler1T<TString>([this] (const auto& s) {
NbdReconnectDelay = TDuration::Parse(s);
});
Expand All @@ -48,6 +48,13 @@ TOptions::TOptions()
Opts.AddLongOption("without-libnl")
.NoArgument()
.SetFlag(&WithoutLibnl);

Opts.AddLongOption(
"debug-restart-events-count",
"issue multiple restart events for each io error. this option is "
"used to debug nbd server reconnect")
.RequiredArgument("NUM")
.StoreResult(&DebugRestartEventsCount);
}

void TOptions::Parse(int argc, char** argv)
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/endpoint_proxy/server/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ struct TOptions final: TOptionsBase
TDuration NbdRequestTimeout = TDuration::Minutes(10);
TDuration NbdReconnectDelay = TDuration::MilliSeconds(100);
bool WithoutLibnl = false;
std::optional<ui32> DebugRestartEventsCount;

TOptions();

Expand Down
60 changes: 45 additions & 15 deletions cloud/blockstore/libs/endpoint_proxy/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,11 @@ struct TEndpoint: std::enable_shared_from_this<TEndpoint>
NBD::IDevicePtr NbdDevice;
std::unique_ptr<TNetworkAddress> ListenAddress;
IProxyRequestStatsPtr RequestStats;

TString UnixSocketPath;
TString InternalUnixSocketPath;
TString NbdDevicePath;

NBD::TStorageOptions NbdOptions;
std::atomic<ui64> Generation = 0;
};

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -191,16 +190,19 @@ struct TRestartAlarmContext: TRequestContextBase
{
std::weak_ptr<TEndpoint> Endpoint;
grpc::ServerCompletionQueue& CQ;
const ui64 Generation;
TBackoffDelayProvider Backoff;
grpc::Alarm Alarm;

TRestartAlarmContext(
std::weak_ptr<TEndpoint> endpoint,
grpc::ServerCompletionQueue& cq,
ui64 generation,
TDuration minReconnectDelay,
TDuration maxReconnectDelay)
: Endpoint(std::move(endpoint))
, CQ(cq)
, Generation(generation)
, Backoff{minReconnectDelay, maxReconnectDelay}
{
SetAlarm();
Expand Down Expand Up @@ -275,27 +277,36 @@ struct TServer: IEndpointProxyServer
{
std::weak_ptr<TEndpoint> Endpoint;
grpc::ServerCompletionQueue& CQ;
const ui64 Generation;
TDuration ReconnectDelay;
std::optional<ui32> DebugRestartEventsCount;

TErrorHandler(
std::weak_ptr<TEndpoint> endpoint,
grpc::ServerCompletionQueue& cq,
TDuration reconnectDelay)
ui64 generation,
TDuration reconnectDelay,
std::optional<ui32> debugRestartEventsCount)
: Endpoint(std::move(endpoint))
, CQ(cq)
, Generation(generation)
, ReconnectDelay(reconnectDelay)
, DebugRestartEventsCount(debugRestartEventsCount)
{}

void ProcessException(std::exception_ptr e) override
{
Y_UNUSED(e);

// context will be held by grpc until CQ->Next returns it
new TRestartAlarmContext(
Endpoint,
CQ,
ReconnectDelay,
MAX_RECONNECT_DELAY);
for (ui32 i = 0; i < DebugRestartEventsCount.value_or(1); i++) {
// context will be held by grpc until CQ->Next returns it
new TRestartAlarmContext(
Endpoint,
CQ,
Generation,
ReconnectDelay,
MAX_RECONNECT_DELAY);
}
}
};

Expand Down Expand Up @@ -755,9 +766,16 @@ struct TServer: IEndpointProxyServer
response.SetNbdDevice(ep.NbdDevicePath);

if (ep.NbdDevice) {
ep.NbdDevice->Stop(true);
STORAGE_INFO(request.ShortDebugString().Quote()
<< " - Stopped NBD device");
auto err = ep.NbdDevice->Stop(true).GetValueSync();
if (HasError(err)) {
STORAGE_ERROR(
request.ShortDebugString().Quote()
<< " - Failed to stop NBD device: " << err.GetCode());
} else {
STORAGE_INFO(
request.ShortDebugString().Quote()
<< " - Stopped NBD device");
}
}

if (ep.ListenAddress) {
Expand Down Expand Up @@ -945,7 +963,9 @@ struct TServer: IEndpointProxyServer
std::make_shared<TErrorHandler>(
ep.shared_from_this(),
*CQ,
Config.NbdReconnectDelay),
ep.Generation,
Config.NbdReconnectDelay,
Config.DebugRestartEventsCount),
ep.NbdOptions);

// TODO fix StartEndpoint signature - it's actually synchronous
Expand Down Expand Up @@ -1006,6 +1026,11 @@ struct TServer: IEndpointProxyServer
bool ProcessAlarm(TRestartAlarmContext* context)
{
if (auto ep = context->Endpoint.lock()) {
if (context->Generation != ep->Generation) {
return false;
}
ep->Generation++;

const auto tag = TStringBuilder()
<< "UnixSocketPath: " << ep->UnixSocketPath.Quote() << " - ";

Expand All @@ -1025,8 +1050,13 @@ struct TServer: IEndpointProxyServer
bool DoProcessAlarm(TEndpoint& ep, const TString& tag)
{
if (ep.NbdDevice) {
ep.NbdDevice->Stop(NBD_DELETE_DEVICE).GetValueSync();
STORAGE_INFO(tag << "Stopped NBD device");
auto err = ep.NbdDevice->Stop(NBD_DELETE_DEVICE).GetValueSync();
if (HasError(err)) {
STORAGE_ERROR(
tag << "Failed to stop NBD device: " << err.GetCode());
} else {
STORAGE_INFO(tag << "Stopped NBD device");
}
}

if (ep.ListenAddress) {
Expand Down
6 changes: 5 additions & 1 deletion cloud/blockstore/libs/endpoint_proxy/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct TEndpointProxyServerConfig
TString StoredEndpointsPath;
TDuration NbdRequestTimeout;
TDuration NbdReconnectDelay;
std::optional<ui32> DebugRestartEventsCount;

TEndpointProxyServerConfig(
ui16 port,
Expand All @@ -40,7 +41,8 @@ struct TEndpointProxyServerConfig
bool netlink,
TString storedEndpointsPath,
TDuration nbdRequestTimeout,
TDuration nbdReconnectDelay)
TDuration nbdReconnectDelay,
std::optional<ui32> debugRestartEventsCount)
: Port(port)
, SecurePort(securePort)
, RootCertsFile(std::move(rootCertsFile))
Expand All @@ -51,7 +53,9 @@ struct TEndpointProxyServerConfig
, StoredEndpointsPath(std::move(storedEndpointsPath))
, NbdRequestTimeout(nbdRequestTimeout)
, NbdReconnectDelay(nbdReconnectDelay)
, DebugRestartEventsCount(debugRestartEventsCount)
{
Y_ENSURE(debugRestartEventsCount.value_or(1) > 0);
}
};

Expand Down
3 changes: 2 additions & 1 deletion cloud/blockstore/libs/endpoint_proxy/server/server_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ Y_UNIT_TEST_SUITE(TServerTest)
false,
"",
TDuration::Seconds(1),
TDuration::MilliSeconds(100));
TDuration::MilliSeconds(100),
1);

auto server = CreateServer(
serverConfig,
Expand Down
2 changes: 1 addition & 1 deletion cloud/blockstore/libs/nbd/netlink_device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ void TNetlinkDevice::Disconnect()
socket.Receive(response);
StopResult.SetValue(MakeError(S_OK));
} catch (const TServiceError& e) {
StartResult.SetValue(MakeError(
StopResult.SetValue(MakeError(
e.GetCode(),
TStringBuilder()
<< "unable to disconnect " << DeviceName << ": " << e.what()));
Expand Down
5 changes: 4 additions & 1 deletion cloud/blockstore/libs/rdma/iface/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,13 @@ struct IClientEndpoint
size_t requestBytes,
size_t responseBytes) = 0;

virtual void SendRequest(
// Returns id of sended request. It can be used to cancel this request.
virtual ui64 SendRequest(
TClientRequestPtr req,
TCallContextPtr callContext) = 0;

virtual void CancelRequest(ui64 reqId) = 0;

virtual NThreading::TFuture<void> Stop() = 0;
};

Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/rdma/iface/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ enum {
RDMA_PROTO_CONFIG_MISMATCH = 2,
RDMA_PROTO_THROTTLED = 3,
RDMA_PROTO_FAIL = 4,
RDMA_PROTO_CANCELLED = 5,
};

enum {
Expand Down
Loading
Loading