Skip to content

Destroy lagging migration actor for each agent #3275

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ TLaggingAgentMigrationActor::TLaggingAgentMigrationActor(
TStorageConfigPtr config,
TDiagnosticsConfigPtr diagnosticsConfig,
TNonreplicatedPartitionConfigPtr partConfig,
TActorId parentActorId,
IProfileLogPtr profileLog,
IBlockDigestGeneratorPtr blockDigestGenerator,
TString rwClientId,
Expand All @@ -39,6 +40,7 @@ TLaggingAgentMigrationActor::TLaggingAgentMigrationActor(
config->GetMaxMigrationIoDepth())
, Config(std::move(config))
, PartConfig(std::move(partConfig))
, ParentActorId(parentActorId)
, TargetActorId(targetActorId)
, SourceActorId(sourceActorId)
, AgentId(std::move(agentId))
Expand Down Expand Up @@ -86,7 +88,7 @@ void TLaggingAgentMigrationActor::OnMigrationProgress(
void TLaggingAgentMigrationActor::OnMigrationFinished(const TActorContext& ctx)
{
ctx.Send(
PartConfig->GetParentActorId(),
ParentActorId,
std::make_unique<TEvVolumePrivate::TEvLaggingAgentMigrationFinished>(
AgentId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class TLaggingAgentMigrationActor final
private:
const TStorageConfigPtr Config;
const TNonreplicatedPartitionConfigPtr PartConfig;
const NActors::TActorId ParentActorId;
const NActors::TActorId TargetActorId;
const NActors::TActorId SourceActorId;
const TString AgentId;
Expand All @@ -30,6 +31,7 @@ class TLaggingAgentMigrationActor final
TStorageConfigPtr config,
TDiagnosticsConfigPtr diagnosticsConfig,
TNonreplicatedPartitionConfigPtr partConfig,
NActors::TActorId parentActorId,
IProfileLogPtr profileLog,
IBlockDigestGeneratorPtr blockDigestGenerator,
TString rwClientId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,10 +644,10 @@ void TLaggingAgentsReplicaProxyActor::HandleAgentIsUnavailable(
state.CleanBlocksMap->Set(0, PartConfig->GetBlockCount());
}

LOG_DEBUG(
LOG_INFO(
ctx,
TBlockStoreComponents::PARTITION_WORKER,
"[%s] Agent %s Block count: %lu, dirty block count: %lu",
"[%s] Agent %s block count: %lu, dirty block count: %lu",
PartConfig->GetName().c_str(),
agentId.c_str(),
PartConfig->GetBlockCount(),
Expand Down Expand Up @@ -678,7 +678,7 @@ void TLaggingAgentsReplicaProxyActor::HandleAgentIsBackOnline(
LOG_INFO(
ctx,
TBlockStoreComponents::PARTITION_WORKER,
"[%s] Agent %s is back online. Starting migration actor",
"[%s] Lagging agent %s is back online. Starting migration actor",
PartConfig->GetName().c_str(),
msg->AgentId.Quote().c_str());

Expand All @@ -701,7 +701,7 @@ void TLaggingAgentsReplicaProxyActor::HandleAgentIsBackOnline(
TCompressedBitmap cleanBlocksCopy(PartConfig->GetBlockCount());
cleanBlocksCopy.Update(*state.CleanBlocksMap, 0);

LOG_DEBUG(
LOG_INFO(
ctx,
TBlockStoreComponents::PARTITION_WORKER,
"[%s] Starting lagging agent %s migration. Block count: %lu, dirty "
Expand All @@ -717,6 +717,7 @@ void TLaggingAgentsReplicaProxyActor::HandleAgentIsBackOnline(
Config,
DiagnosticsConfig,
PartConfig,
SelfId(),
ProfileLog,
BlockDigestGenerator,
RwClientId,
Expand All @@ -727,6 +728,44 @@ void TLaggingAgentsReplicaProxyActor::HandleAgentIsBackOnline(
PoisonPillHelper.TakeOwnership(ctx, state.MigrationActorId);
}

void TLaggingAgentsReplicaProxyActor::HandleLaggingAgentMigrationFinished(
const TEvVolumePrivate::TEvLaggingAgentMigrationFinished::TPtr& ev,
const NActors::TActorContext& ctx)
{
const auto* msg = ev->Get();

auto it = AgentState.find(msg->AgentId);
Y_DEBUG_ABORT_UNLESS(it != AgentState.end());
if (it == AgentState.end()) {
return;
}

auto& state = it->second;
switch (state.State) {
case EAgentState::Unavailable:
return;
case EAgentState::Resyncing:
break;
}

LOG_INFO(
ctx,
TBlockStoreComponents::PARTITION_WORKER,
"[%s] Lagging agent %s migration has been finished",
PartConfig->GetName().c_str(),
msg->AgentId.Quote().c_str());

Y_DEBUG_ABORT_UNLESS(state.MigrationActorId);
Y_DEBUG_ABORT_UNLESS(!state.AvailabilityMonitoringActorId);
DestroyChildActor(ctx, &state.MigrationActorId);
AgentState.erase(it);

ctx.Send(std::make_unique<NActors::IEventHandle>(
PartConfig->GetParentActorId(),
SelfId(),
ev->ReleaseBase().Release()));
}

void TLaggingAgentsReplicaProxyActor::HandleWriteBlocks(
const TEvService::TEvWriteBlocksRequest::TPtr& ev,
const TActorContext& ctx)
Expand Down Expand Up @@ -815,6 +854,9 @@ STFUNC(TLaggingAgentsReplicaProxyActor::StateWork)
HFunc(
TEvNonreplPartitionPrivate::TEvAgentIsBackOnline,
HandleAgentIsBackOnline);
HFunc(
TEvVolumePrivate::TEvLaggingAgentMigrationFinished,
HandleLaggingAgentMigrationFinished);
HFunc(TEvVolume::TEvRWClientIdChanged, HandleRWClientIdChanged);

HFunc(TEvents::TEvPoisonPill, HandlePoisonPill);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <cloud/blockstore/libs/kikimr/helpers.h>
#include <cloud/blockstore/libs/storage/api/service.h>
#include <cloud/blockstore/libs/storage/api/volume.h>
#include <cloud/blockstore/libs/storage/volume/volume_events_private.h>

#include <cloud/storage/core/libs/actors/poison_pill_helper.h>
#include <cloud/storage/core/libs/common/compressed_bitmap.h>

Expand Down Expand Up @@ -129,6 +131,10 @@ class TLaggingAgentsReplicaProxyActor final
const TEvNonreplPartitionPrivate::TEvAgentIsBackOnline::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleLaggingAgentMigrationFinished(
const TEvVolumePrivate::TEvLaggingAgentMigrationFinished::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleWriteOrZeroCompleted(
const TEvNonreplPartitionPrivate::TEvWriteOrZeroCompleted::TPtr& ev,
const NActors::TActorContext& ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,35 +693,58 @@ Y_UNIT_TEST_SUITE(TLaggingAgentsReplicaProxyActorTest)

bool seenHealthCheck = false;
bool seenMigrationReads = false;
ui32 seenMigrationFinish = 0;

bool shouldDropMigrationFinishedEvent = true;
runtime.SetEventFilter(
[&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event)
{
switch (event->GetTypeRewrite()) {
case TEvService::EvReadBlocksRequest: {
if (event->Recipient != env.ReplicaActors[0]) {
break;
case TEvNonreplPartitionPrivate::EvChecksumBlocksRequest: {
auto* msg = event->Get<TEvNonreplPartitionPrivate::
TEvChecksumBlocksRequest>();
auto clientId = msg->Record.GetHeaders().GetClientId();
if (clientId == CheckHealthClientId) {
UNIT_ASSERT_VALUES_EQUAL(
env.ReplicaActors[0],
event->Recipient);
seenHealthCheck = true;
}
break;
}
case TEvService::EvReadBlocksRequest: {
auto* msg =
event->Get<TEvService::TEvReadBlocksRequest>();
auto clientId = msg->Record.GetHeaders().GetClientId();
if (clientId == CheckHealthClientId) {
seenHealthCheck = true;
} else if (clientId == BackgroundOpsClientId) {
if (clientId == BackgroundOpsClientId &&
event->Recipient != env.ReplicaActors[0])
{
UNIT_ASSERT(seenHealthCheck);
seenMigrationReads = true;
}
break;
}
case TEvVolumePrivate::EvLaggingAgentMigrationFinished: {
UNIT_ASSERT(seenMigrationReads);
seenMigrationFinish++;
return shouldDropMigrationFinishedEvent;
}
}
return false;
});

// Health check should happen and notify the controller that the replica
// is back online. Migration should start immediately after that.
env.WaitForMigrationFinishEvent();
runtime.DispatchEvents(
{.CustomFinalCondition = [&]()
{
return seenMigrationFinish == 1;
}});

// The agent is lagging again.
shouldDropMigrationFinishedEvent = false;
seenHealthCheck = seenMigrationReads = false;
seenMigrationFinish = 0;
env.AddLaggingAgent(runtime.GetNodeId(1), 0);

// Mark second half as dirty.
Expand All @@ -735,8 +758,14 @@ Y_UNIT_TEST_SUITE(TLaggingAgentsReplicaProxyActorTest)
TBlockRange64::WithLength(DeviceBlockCount, DeviceBlockCount);
env.WriteBlocksToReplica(1, secondDevice, 'C');

// Wait for migration finish.
// Wait for migration finish. Waiting for two events here: one for the
// proxy and one for the volume.
env.WaitForMigrationFinishEvent();
runtime.DispatchEvents(
{.CustomFinalCondition = [&]()
{
return seenMigrationFinish == 2;
}});

// Migration should migrate whole second device.
env.ReadFromReplicaAndCheckContents(0, secondDevice, 'C');
Expand Down
Loading