Skip to content

Commit baffe07

Browse files
authored
[Core] Adjust NodeDeathInfo in raylet (#45533)
In #45357, in the case of preemption, and after the draining deadline, we expect autoscaler to directly SIGKILL the node without sending SIGTERM first. However, autoscaler would send SIGTERM first then SIGKILL. Therefore, this PR does the following: - At Raylet, distinguish if a SIGTERM is initiated from preemption draining or normal termination based on existing draining request - Construct node death info accordingly and send UnregisterNode() RPC to GCS Signed-off-by: Rui Qiao <[email protected]>
1 parent bbbf51e commit baffe07

11 files changed

+105
-63
lines changed

python/ray/tests/test_draining.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,11 @@ def ping(self):
143143
assert worker_node["DeathReasonMessage"] == "preemption"
144144

145145

146-
def test_preemption_after_draining_deadline(monkeypatch, ray_start_cluster):
146+
@pytest.mark.parametrize(
147+
"graceful",
148+
[True, False],
149+
)
150+
def test_preemption_after_draining_deadline(monkeypatch, ray_start_cluster, graceful):
147151
monkeypatch.setenv("RAY_health_check_failure_threshold", "3")
148152
monkeypatch.setenv("RAY_health_check_timeout_ms", "100")
149153
monkeypatch.setenv("RAY_health_check_period_ms", "1000")
@@ -182,9 +186,8 @@ def ping(self):
182186
)
183187
assert is_accepted
184188

185-
# Simulate node provider forcefully terminates the worker node
186-
# after the draining deadline.
187-
cluster.remove_node(worker_node, False)
189+
# Simulate autoscaler terminates the worker node after the draining deadline.
190+
cluster.remove_node(worker_node, graceful)
188191

189192
wait_for_condition(
190193
lambda: {node["NodeID"] for node in ray.nodes() if (node["Alive"])}
@@ -384,7 +387,11 @@ def get_node_id():
384387
ray.get(obj, timeout=2) == head_node_id
385388

386389

387-
def test_draining_reason(ray_start_cluster):
390+
@pytest.mark.parametrize(
391+
"graceful",
392+
[False, True],
393+
)
394+
def test_draining_reason(ray_start_cluster, graceful):
388395
cluster = ray_start_cluster
389396
cluster.add_node(num_cpus=1, resources={"node1": 1})
390397
ray.init(
@@ -414,8 +421,8 @@ def ping(self):
414421
)
415422
assert is_accepted
416423

417-
# Simulate node provider forcefully terminates the worker node
418-
cluster.remove_node(node2, False)
424+
# Simulate autoscaler terminates the worker node after the draining deadline.
425+
cluster.remove_node(node2, graceful)
419426
try:
420427
ray.get(actor.ping.remote())
421428
raise

src/ray/raylet/main.cc

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -439,11 +439,25 @@ int main(int argc, char *argv[]) {
439439
raylet->Start();
440440
}));
441441

442-
auto signal_handler = [shutdown_raylet_gracefully_internal](
442+
auto signal_handler = [&raylet, shutdown_raylet_gracefully_internal](
443443
const boost::system::error_code &error, int signal_number) {
444444
ray::rpc::NodeDeathInfo node_death_info;
445-
node_death_info.set_reason(ray::rpc::NodeDeathInfo::EXPECTED_TERMINATION);
446-
node_death_info.set_reason_message("received SIGTERM");
445+
optional<ray::rpc::DrainRayletRequest> drain_request =
446+
raylet->node_manager().GetLocalDrainRequest();
447+
RAY_LOG(INFO) << "received SIGTERM. Existing local drain request = "
448+
<< (drain_request.has_value() ? drain_request->DebugString() : "None");
449+
if (drain_request.has_value() &&
450+
drain_request->reason() ==
451+
ray::rpc::autoscaler::DrainNodeReason::DRAIN_NODE_REASON_PREEMPTION &&
452+
drain_request->deadline_timestamp_ms() != 0 &&
453+
drain_request->deadline_timestamp_ms() < current_time_ms()) {
454+
node_death_info.set_reason(ray::rpc::NodeDeathInfo::AUTOSCALER_DRAIN_PREEMPTED);
455+
node_death_info.set_reason_message(drain_request->reason_message());
456+
} else {
457+
node_death_info.set_reason(ray::rpc::NodeDeathInfo::EXPECTED_TERMINATION);
458+
node_death_info.set_reason_message("received SIGTERM");
459+
}
460+
447461
shutdown_raylet_gracefully_internal(node_death_info);
448462
};
449463
boost::asio::signal_set signals(main_service);

src/ray/raylet/node_manager.cc

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1974,11 +1974,8 @@ void NodeManager::HandleDrainRaylet(rpc::DrainRayletRequest request,
19741974
const bool is_idle =
19751975
cluster_resource_scheduler_->GetLocalResourceManager().IsLocalNodeIdle();
19761976
if (is_idle) {
1977-
rpc::NodeDeathInfo node_death_info;
1978-
node_death_info.set_reason(rpc::NodeDeathInfo::AUTOSCALER_DRAIN_IDLE);
1979-
node_death_info.set_reason_message(request.reason_message());
19801977
cluster_resource_scheduler_->GetLocalResourceManager().SetLocalNodeDraining(
1981-
request.deadline_timestamp_ms(), node_death_info);
1978+
request);
19821979
reply->set_is_accepted(true);
19831980
} else {
19841981
reply->set_is_accepted(false);
@@ -1989,11 +1986,7 @@ void NodeManager::HandleDrainRaylet(rpc::DrainRayletRequest request,
19891986
// Non-rejectable draining request.
19901987
RAY_CHECK_EQ(request.reason(),
19911988
rpc::autoscaler::DrainNodeReason::DRAIN_NODE_REASON_PREEMPTION);
1992-
rpc::NodeDeathInfo node_death_info;
1993-
node_death_info.set_reason(rpc::NodeDeathInfo::AUTOSCALER_DRAIN_PREEMPTED);
1994-
node_death_info.set_reason_message(request.reason_message());
1995-
cluster_resource_scheduler_->GetLocalResourceManager().SetLocalNodeDraining(
1996-
request.deadline_timestamp_ms(), node_death_info);
1989+
cluster_resource_scheduler_->GetLocalResourceManager().SetLocalNodeDraining(request);
19971990
reply->set_is_accepted(true);
19981991
}
19991992

src/ray/raylet/node_manager.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
223223
return mutable_object_provider_;
224224
}
225225

226+
/// Get the local drain request.
227+
optional<rpc::DrainRayletRequest> GetLocalDrainRequest() const {
228+
return cluster_resource_scheduler_->GetLocalResourceManager().GetLocalDrainRequest();
229+
}
230+
226231
private:
227232
void ReleaseWorker(const WorkerID &worker_id) {
228233
leased_workers_.erase(worker_id);

src/ray/raylet/placement_group_resource_manager_test.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,10 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewPrepareBundleDuringDraining)
184184

185185
ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles(bundle1_specs));
186186
// Drain the node, new bundle prepare will fail.
187-
rpc::NodeDeathInfo node_death_info;
187+
rpc::DrainRayletRequest drain_request;
188+
drain_request.set_deadline_timestamp_ms(std::numeric_limits<int64_t>::max());
188189
cluster_resource_scheduler_->GetLocalResourceManager().SetLocalNodeDraining(
189-
std::numeric_limits<int64_t>::max(), node_death_info);
190+
drain_request);
190191
ASSERT_FALSE(new_placement_group_resource_manager_->PrepareBundles(bundle2_specs));
191192
// Prepared bundles can still be committed.
192193
new_placement_group_resource_manager_->CommitBundles(bundle1_specs);

src/ray/raylet/scheduling/cluster_resource_scheduler.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ class ClusterResourceScheduler {
125125
bool requires_object_store_memory);
126126

127127
LocalResourceManager &GetLocalResourceManager() { return *local_resource_manager_; }
128+
128129
ClusterResourceManager &GetClusterResourceManager() {
129130
return *cluster_resource_manager_;
130131
}

src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -362,9 +362,9 @@ TEST_F(ClusterResourceSchedulerTest, NodeAffinitySchedulingStrategyTest) {
362362
ASSERT_TRUE(resource_scheduler.GetLocalResourceManager().AllocateLocalTaskResources(
363363
resource_request, task_allocation));
364364
// Drain the local node so that it's not schedulable for new tasks.
365-
rpc::NodeDeathInfo node_death_info;
366-
resource_scheduler.GetLocalResourceManager().SetLocalNodeDraining(
367-
std::numeric_limits<int64_t>::max(), node_death_info);
365+
rpc::DrainRayletRequest drain_request;
366+
drain_request.set_deadline_timestamp_ms(std::numeric_limits<int64_t>::max());
367+
resource_scheduler.GetLocalResourceManager().SetLocalNodeDraining(drain_request);
368368

369369
scheduling_strategy.mutable_node_affinity_scheduling_strategy()->set_node_id(
370370
local_node_id.Binary());

src/ray/raylet/scheduling/cluster_task_manager_test.cc

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -743,9 +743,9 @@ TEST_F(ClusterTaskManagerTest, DrainingWhileResolving) {
743743
ASSERT_EQ(pool_.workers.size(), 1);
744744

745745
// Drain the local node.
746-
rpc::NodeDeathInfo node_death_info;
747-
scheduler_->GetLocalResourceManager().SetLocalNodeDraining(
748-
std::numeric_limits<int64_t>::max(), node_death_info);
746+
rpc::DrainRayletRequest drain_request;
747+
drain_request.set_deadline_timestamp_ms(std::numeric_limits<int64_t>::max());
748+
scheduler_->GetLocalResourceManager().SetLocalNodeDraining(drain_request);
749749

750750
// Arg is resolved.
751751
missing_objects_.erase(missing_arg);
@@ -1078,9 +1078,9 @@ TEST_F(ClusterTaskManagerTest, NotOKPopWorkerAfterDrainingTest) {
10781078
AddNode(remote_node_id, 5);
10791079

10801080
// Drain the local node.
1081-
rpc::NodeDeathInfo node_death_info;
1082-
scheduler_->GetLocalResourceManager().SetLocalNodeDraining(
1083-
std::numeric_limits<int64_t>::max(), node_death_info);
1081+
rpc::DrainRayletRequest drain_request;
1082+
drain_request.set_deadline_timestamp_ms(std::numeric_limits<int64_t>::max());
1083+
scheduler_->GetLocalResourceManager().SetLocalNodeDraining(drain_request);
10841084

10851085
pool_.callbacks[task1.GetTaskSpecification().GetRuntimeEnvHash()].front()(
10861086
nullptr, PopWorkerStatus::WorkerPendingRegistration, "");
@@ -2637,9 +2637,9 @@ TEST_F(ClusterTaskManagerTest, PopWorkerBeforeDraining) {
26372637
task_manager_.QueueAndScheduleTask(task, false, false, &reply, callback);
26382638

26392639
// Drain the local node.
2640-
rpc::NodeDeathInfo node_death_info;
2641-
scheduler_->GetLocalResourceManager().SetLocalNodeDraining(
2642-
std::numeric_limits<int64_t>::max(), node_death_info);
2640+
rpc::DrainRayletRequest drain_request;
2641+
drain_request.set_deadline_timestamp_ms(std::numeric_limits<int64_t>::max());
2642+
scheduler_->GetLocalResourceManager().SetLocalNodeDraining(drain_request);
26432643

26442644
std::shared_ptr<MockWorker> worker =
26452645
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);
@@ -2677,9 +2677,9 @@ TEST_F(ClusterTaskManagerTest, UnscheduleableWhileDraining) {
26772677
AddNode(remote_node_id, 5);
26782678

26792679
// Drain the local node.
2680-
rpc::NodeDeathInfo node_death_info;
2681-
scheduler_->GetLocalResourceManager().SetLocalNodeDraining(
2682-
std::numeric_limits<int64_t>::max(), node_death_info);
2680+
rpc::DrainRayletRequest drain_request;
2681+
drain_request.set_deadline_timestamp_ms(std::numeric_limits<int64_t>::max());
2682+
scheduler_->GetLocalResourceManager().SetLocalNodeDraining(drain_request);
26832683

26842684
RayTask spillback_task = CreateTask({{ray::kCPU_ResourceLabel, 1}});
26852685
rpc::RequestWorkerLeaseReply spillback_reply;

src/ray/raylet/scheduling/local_resource_manager.cc

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,8 @@ NodeResources LocalResourceManager::ToNodeResources() const {
254254
node_resources.available = local_resources_.available.ToNodeResourceSet();
255255
node_resources.total = local_resources_.total.ToNodeResourceSet();
256256
node_resources.labels = local_resources_.labels;
257-
node_resources.is_draining = is_local_node_draining_;
258-
node_resources.draining_deadline_timestamp_ms =
259-
local_node_draining_deadline_timestamp_ms_;
257+
node_resources.is_draining = IsLocalNodeDraining();
258+
node_resources.draining_deadline_timestamp_ms = GetDrainingDeadline();
260259
return node_resources;
261260
}
262261

@@ -330,8 +329,7 @@ void LocalResourceManager::PopulateResourceViewSyncMessage(
330329
}
331330

332331
resource_view_sync_message.set_is_draining(IsLocalNodeDraining());
333-
resource_view_sync_message.set_draining_deadline_timestamp_ms(
334-
local_node_draining_deadline_timestamp_ms_);
332+
resource_view_sync_message.set_draining_deadline_timestamp_ms(GetDrainingDeadline());
335333

336334
for (const auto &iter : last_idle_times_) {
337335
if (iter.second == absl::nullopt) {
@@ -383,7 +381,8 @@ std::optional<syncer::RaySyncMessage> LocalResourceManager::CreateSyncMessage(
383381
void LocalResourceManager::OnResourceOrStateChanged() {
384382
if (IsLocalNodeDraining() && IsLocalNodeIdle()) {
385383
RAY_LOG(INFO) << "The node is drained, continue to shut down raylet...";
386-
shutdown_raylet_gracefully_(node_death_info_);
384+
rpc::NodeDeathInfo node_death_info = DeathInfoFromDrainRequest();
385+
shutdown_raylet_gracefully_(std::move(node_death_info));
387386
}
388387

389388
++version_;
@@ -393,6 +392,22 @@ void LocalResourceManager::OnResourceOrStateChanged() {
393392
resource_change_subscriber_(ToNodeResources());
394393
}
395394

395+
rpc::NodeDeathInfo LocalResourceManager::DeathInfoFromDrainRequest() {
396+
rpc::NodeDeathInfo death_info;
397+
RAY_CHECK(drain_request_.has_value());
398+
if (drain_request_->reason() ==
399+
rpc::autoscaler::DrainNodeReason::DRAIN_NODE_REASON_IDLE_TERMINATION) {
400+
death_info.set_reason(rpc::NodeDeathInfo::AUTOSCALER_DRAIN_IDLE);
401+
death_info.set_reason_message(drain_request_->reason_message());
402+
} else {
403+
RAY_CHECK_EQ(drain_request_->reason(),
404+
rpc::autoscaler::DrainNodeReason::DRAIN_NODE_REASON_PREEMPTION);
405+
death_info.set_reason(rpc::NodeDeathInfo::AUTOSCALER_DRAIN_PREEMPTED);
406+
death_info.set_reason_message(drain_request_->reason_message());
407+
}
408+
return death_info;
409+
}
410+
396411
bool LocalResourceManager::ResourcesExist(scheduling::ResourceID resource_id) const {
397412
return local_resources_.total.Has(resource_id);
398413
}
@@ -445,11 +460,8 @@ void LocalResourceManager::RecordMetrics() const {
445460
}
446461

447462
void LocalResourceManager::SetLocalNodeDraining(
448-
int64_t draining_deadline_timestamp_ms, const rpc::NodeDeathInfo &node_death_info) {
449-
RAY_CHECK_GE(draining_deadline_timestamp_ms, 0);
450-
is_local_node_draining_ = true;
451-
local_node_draining_deadline_timestamp_ms_ = draining_deadline_timestamp_ms;
452-
node_death_info_ = node_death_info;
463+
const rpc::DrainRayletRequest &drain_request) {
464+
drain_request_ = std::make_optional(drain_request);
453465
OnResourceOrStateChanged();
454466
}
455467

src/ray/raylet/scheduling/local_resource_manager.h

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "ray/gcs/gcs_client/gcs_client.h"
3232
#include "ray/util/logging.h"
3333
#include "src/ray/protobuf/gcs.pb.h"
34+
#include "src/ray/protobuf/node_manager.pb.h"
3435

3536
namespace ray {
3637

@@ -154,10 +155,17 @@ class LocalResourceManager : public syncer::ReporterInterface {
154155

155156
/// Change the local node to the draining state.
156157
/// After that, no new tasks can be scheduled onto the local node.
157-
void SetLocalNodeDraining(int64_t draining_deadline_timestamp_ms,
158-
const rpc::NodeDeathInfo &node_death_info);
158+
void SetLocalNodeDraining(const rpc::DrainRayletRequest &drain_request);
159159

160-
bool IsLocalNodeDraining() const { return is_local_node_draining_; }
160+
bool IsLocalNodeDraining() const { return drain_request_.has_value(); }
161+
162+
/// Get the local drain request.
163+
std::optional<rpc::DrainRayletRequest> GetLocalDrainRequest() const {
164+
return drain_request_;
165+
}
166+
167+
/// Generate node death info from existing drain request.
168+
rpc::NodeDeathInfo DeathInfoFromDrainRequest();
161169

162170
private:
163171
struct ResourceUsage {
@@ -206,6 +214,12 @@ class LocalResourceManager : public syncer::ReporterInterface {
206214

207215
absl::optional<absl::Time> GetResourceIdleTime() const;
208216

217+
/// Get the draining deadline if node is in draining state.
218+
///
219+
/// \return The draining deadline if node is in draining state, otherwise -1.
220+
int64_t GetDrainingDeadline() const {
221+
return drain_request_.has_value() ? drain_request_->deadline_timestamp_ms() : -1;
222+
}
209223
/// Identifier of local node.
210224
scheduling::NodeID local_node_id_;
211225
/// Resources of local node.
@@ -226,15 +240,8 @@ class LocalResourceManager : public syncer::ReporterInterface {
226240
// Version of this resource. It will incr by one whenever the state changed.
227241
int64_t version_ = 0;
228242

229-
// Whether the local node is being drained or not.
230-
bool is_local_node_draining_ = false;
231-
// The value is the timestamp when
232-
// the node will be force killed.
233-
// 0 if there is no deadline.
234-
int64_t local_node_draining_deadline_timestamp_ms_ = -1;
235-
236-
/// This is set when the node is being drained and indicates the reason for draining.
237-
rpc::NodeDeathInfo node_death_info_;
243+
/// The draining request this node received.
244+
std::optional<rpc::DrainRayletRequest> drain_request_;
238245

239246
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingUpdateTotalResourcesTest);
240247
FRIEND_TEST(ClusterResourceSchedulerTest, AvailableResourceInstancesOpsTest);

0 commit comments

Comments
 (0)