diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h index 67b7ff64beee..9057aa88fe0b 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h @@ -164,6 +164,7 @@ class TInFlightComputes { private: YDB_READONLY_DEF(NActors::TActorId, ActorId); YDB_READONLY(ui64, FreeSpace, 0); + YDB_READONLY(TMonotonic, LastAckInstant, TMonotonic::Now()); std::deque> DataQueue; bool SendData() { @@ -190,6 +191,7 @@ class TInFlightComputes { } void OnAckReceived(const ui64 freeSpace) { + LastAckInstant = TMonotonic::Now(); AFL_ENSURE(!FreeSpace); AFL_ENSURE(freeSpace); FreeSpace = freeSpace; @@ -209,9 +211,11 @@ class TInFlightComputes { private: std::deque> UndefinedShardTaskData; - std::deque ComputeActors; + YDB_READONLY_DEF(std::deque, ComputeActors); + YDB_READONLY(TMonotonic, StartInstant, TMonotonic::Now()); THashMap ComputeActorsById; public: + ui32 GetPacksToSendCount() const { ui32 result = UndefinedShardTaskData.size(); for (auto&& i : ComputeActors) { @@ -220,6 +224,15 @@ class TInFlightComputes { return result; } + THashMap GetPacksToSend() const { + THashMap result; + result.emplace(NActors::TActorId(), UndefinedShardTaskData.size()); + for (auto&& i : ComputeActors) { + AFL_ENSURE(result.emplace(i.GetActorId(), i.GetPacksToSendCount()).second); + } + return result; + } + TInFlightComputes(const std::vector& computeActorIds) { for (auto&& i : computeActorIds) { ComputeActors.emplace_back(i); diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp index 0a383b154a6f..bc80f8703cb5 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp @@ -81,9 +81,39 @@ void TKqpScanFetcherActor::Bootstrap() { } AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "bootstrap")("compute", ComputeActorIds.size())("shards", PendingShards.size()); StartTableScan(); + Schedule(TDuration::Seconds(30), new NActors::TEvents::TEvWakeup); Become(&TKqpScanFetcherActor::StateFunc); } +void TKqpScanFetcherActor::HandleExecute(NActors::TEvents::TEvWakeup::TPtr& /*ev*/) { + const TMonotonic now = TMonotonic::Now(); + TMonotonic maxInstant = TMonotonic::Zero(); + for (auto&& i : InFlightComputes.GetComputeActors()) { + if (maxInstant < i.GetLastAckInstant()) { + maxInstant = i.GetLastAckInstant(); + } + } + TStringBuilder sb; + sb << "["; + for (auto&& i : InFlightComputes.GetPacksToSend()) { + sb << "{" << i.first << ":" << i.second << "}"; + } + sb << "];"; + sb << "["; + for (auto&& i : InFlightComputes.GetComputeActors()) { + sb << "{" << i.GetActorId() << ":" << now - i.GetLastAckInstant() << ":" << i.IsFree() << "}"; + } + sb << "]"; + if (TDuration::Seconds(120) < now - maxInstant) { + AFL_ENSURE(false)("in_flight", sb)("count", InFlightComputes.GetComputeActors().size()) + ("shards", InFlightShards.GetShardsCount())("scans", InFlightShards.GetScansCount())("duration", now - InFlightComputes.GetStartInstant()); + } else { + AFL_INFO(NKikimrServices::KQP_COMPUTE)("in_flight", sb)("count", InFlightComputes.GetComputeActors().size()) + ("shards", InFlightShards.GetShardsCount())("scans", InFlightShards.GetScansCount())("duration", now - InFlightComputes.GetStartInstant()); + } + Schedule(TDuration::Seconds(30), new NActors::TEvents::TEvWakeup); +} + void TKqpScanFetcherActor::HandleExecute(TEvScanExchange::TEvAckData::TPtr& ev) { AFL_ENSURE(ev->Get()->GetFreeSpace()); AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "AckDataFromCompute")("self_id", SelfId())("scan_id", ScanId) diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h index 73ead0a5ef0e..790211906148 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h @@ -82,6 +82,7 @@ class TKqpScanFetcherActor: public NActors::TActorBootstrapped