From 815202b299b4e42e2097a219cf88f45bf340b9a0 Mon Sep 17 00:00:00 2001 From: Binbin Date: Wed, 27 Aug 2025 15:22:15 +0800 Subject: [PATCH 1/2] Save cluster file in bio to avoid the stuck io latency When the cluster changes, we need to persist the cluster configuration, if I/O is delayed or blocked, possibly by disk contention, this may result in large latencies on the main thread. We should avoid synchronous I/O from the main thread. So in this commit, we will try to bio to save the config file. We add a bio job and send a sds version of the config file, which does the synchronous save, so there is some eventually consistent version consistently stored on disk. This may break our previous assumption that nodes.conf is in sync and has the strong consistency. For shutdown and cluster saveconfig, we will wait for the bio job to get drained and trigger a new save in a sync way. Closes #2424. Signed-off-by: Binbin --- src/bio.c | 21 ++++++++ src/bio.h | 2 + src/cluster.h | 2 +- src/cluster_legacy.c | 98 ++++++++++++++++++++++------------ tests/support/cluster_util.tcl | 2 +- 5 files changed, 90 insertions(+), 35 deletions(-) diff --git a/src/bio.c b/src/bio.c index b044f9124c..56f0fff7d5 100644 --- a/src/bio.c +++ b/src/bio.c @@ -62,6 +62,7 @@ #include "server.h" #include "connection.h" +#include "cluster.h" #include "bio.h" #include @@ -71,6 +72,7 @@ static unsigned int bio_job_to_worker[] = { [BIO_CLOSE_AOF] = 1, [BIO_LAZY_FREE] = 2, [BIO_RDB_SAVE] = 3, + [BIO_CLUSTER_SAVE] = 4, }; typedef struct { @@ -86,6 +88,7 @@ static bio_worker_data bio_workers[] = { {"bio_aof"}, {"bio_lazy_free"}, {"bio_rdb_save"}, + {"bio_cluster_save_save"}, }; static const bio_worker_data *const bio_worker_end = bio_workers + (sizeof bio_workers / sizeof *bio_workers); @@ -128,6 +131,12 @@ typedef union bio_job { connection *conn; /* Connection to download the RDB from */ int is_dual_channel; /* Single vs dual channel */ } save_to_disk_args; + + struct { + int type; + sds content; /* Cluster config file content. */ + unsigned do_fsync : 1; /* A flag to indicate that a fsync is required. */ + } cluster_save_args; } bio_job; void *bioProcessBackgroundJobs(void *arg); @@ -227,6 +236,14 @@ void bioCreateSaveRDBToDiskJob(connection *conn, int is_dual_channel) { bioSubmitJob(BIO_RDB_SAVE, job); } +void bioCreateClusterConfigSaveJob(sds content, int do_fsync) { + bio_job *job = zmalloc(sizeof(*job)); + job->cluster_save_args.content = content; + job->cluster_save_args.do_fsync = do_fsync; + + bioSubmitJob(BIO_CLUSTER_SAVE, job); +} + void *bioProcessBackgroundJobs(void *arg) { bio_worker_data *const bwd = arg; bio_job *job; @@ -304,6 +321,10 @@ void *bioProcessBackgroundJobs(void *arg) { job->free_args.free_fn(job->free_args.free_args); } else if (job_type == BIO_RDB_SAVE) { replicaReceiveRDBFromPrimaryToDisk(job->save_to_disk_args.conn, job->save_to_disk_args.is_dual_channel); + } else if (job_type == BIO_CLUSTER_SAVE) { + if (clusterSaveConfigFromBio(job->cluster_save_args.content, job->cluster_save_args.do_fsync) == C_ERR) { + serverLog(LL_WARNING, "Fail to save the cluster config file in bio."); + } } else { serverPanic("Wrong job type in bioProcessBackgroundJobs()."); } diff --git a/src/bio.h b/src/bio.h index 8ae76ec0a2..0bb801952a 100644 --- a/src/bio.h +++ b/src/bio.h @@ -42,6 +42,7 @@ void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache); void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache); void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...); void bioCreateSaveRDBToDiskJob(connection *conn, int is_dual_channel); +void bioCreateClusterConfigSaveJob(sds content, int do_fsync); int inBioThread(void); /* Background job opcodes */ @@ -51,6 +52,7 @@ enum { BIO_LAZY_FREE, /* Deferred objects freeing. */ BIO_CLOSE_AOF, /* Deferred close for AOF files. */ BIO_RDB_SAVE, /* Deferred save RDB to disk on replica */ + BIO_CLUSTER_SAVE, /* Deferred cluster config file save and fsync. */ BIO_NUM_OPS }; diff --git a/src/cluster.h b/src/cluster.h index 443b035127..250d5a8942 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -149,8 +149,8 @@ sds aggregateClientOutputBuffer(client *c); void resetClusterStats(void); unsigned int delKeysInSlot(unsigned int hashslot, int lazy, bool propagate_del, bool send_del_event); -unsigned int propagateSlotDeletionByKeys(unsigned int hashslot); void clusterUpdateState(void); +int clusterSaveConfigFromBio(sds content, int do_fsync); void clusterSaveConfigOrDie(int do_fsync); int clusterDelSlot(int slot); int clusterAddSlot(clusterNode *n, int slot); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index aa479edd30..b385931b68 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -44,6 +44,7 @@ #include "endianconv.h" #include "connection.h" #include "module.h" +#include "bio.h" #include #include @@ -886,6 +887,16 @@ int clusterLoadConfig(char *filename) { serverPanic("Unrecoverable error: corrupted cluster config file \"%s\".", line); } +/* Get the nodes description and concatenate our "vars" directive to + * save currentEpoch and lastVoteEpoch. */ +sds clusterGenNodesConfContent(void) { + sds content = clusterGenNodesDescription(NULL, CLUSTER_NODE_HANDSHAKE, 0); + content = sdscatfmt(content, "vars currentEpoch %U lastVoteEpoch %U\n", + (unsigned long long)server.cluster->currentEpoch, + (unsigned long long)server.cluster->lastVoteEpoch); + return content; +} + /* Cluster node configuration is exactly the same as CLUSTER NODES output. * * This function writes the node config and returns C_OK, on error C_ERR @@ -898,23 +909,14 @@ int clusterLoadConfig(char *filename) { * a single write to write the whole file. If the pre-existing file was * bigger we pad our payload with newlines that are anyway ignored and truncate * the file afterward. */ -int clusterSaveConfig(int do_fsync) { - sds ci, tmpfilename; +int clusterSaveConfigImpl(sds content, int from_bio, int do_fsync) { + sds tmpfilename; size_t content_size, offset = 0; ssize_t written_bytes; int fd = -1; int retval = C_ERR; mstime_t latency; - - server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG; - - /* Get the nodes description and concatenate our "vars" directive to - * save currentEpoch and lastVoteEpoch. */ - ci = clusterGenNodesDescription(NULL, CLUSTER_NODE_HANDSHAKE, 0); - ci = sdscatfmt(ci, "vars currentEpoch %U lastVoteEpoch %U\n", - (unsigned long long)server.cluster->currentEpoch, - (unsigned long long)server.cluster->lastVoteEpoch); - content_size = sdslen(ci); + content_size = sdslen(content); /* Create a temp file with the new content. */ tmpfilename = sdscatfmt(sdsempty(), "%s.tmp-%i-%I", server.cluster_configfile, (int)getpid(), mstime()); @@ -924,11 +926,11 @@ int clusterSaveConfig(int do_fsync) { goto cleanup; } latencyEndMonitor(latency); - latencyAddSampleIfNeeded("cluster-config-open", latency); - latencyTraceIfNeeded(cluster, cluster_config_open, latency); + if (!from_bio) latencyAddSampleIfNeeded("cluster-config-open", latency); + if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_open, latency); latencyStartMonitor(latency); while (offset < content_size) { - written_bytes = write(fd, ci + offset, content_size - offset); + written_bytes = write(fd, content + offset, content_size - offset); if (written_bytes <= 0) { if (errno == EINTR) continue; serverLog(LL_WARNING, "Failed after writing (%zd) bytes to tmp cluster config file: %s", offset, @@ -938,8 +940,8 @@ int clusterSaveConfig(int do_fsync) { offset += written_bytes; } latencyEndMonitor(latency); - latencyAddSampleIfNeeded("cluster-config-write", latency); - latencyTraceIfNeeded(cluster, cluster_config_write, latency); + if (!from_bio) latencyAddSampleIfNeeded("cluster-config-write", latency); + if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_write, latency); if (do_fsync) { latencyStartMonitor(latency); server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG; @@ -948,8 +950,8 @@ int clusterSaveConfig(int do_fsync) { goto cleanup; } latencyEndMonitor(latency); - latencyAddSampleIfNeeded("cluster-config-fsync", latency); - latencyTraceIfNeeded(cluster, cluster_config_fsync, latency); + if (!from_bio) latencyAddSampleIfNeeded("cluster-config-fsync", latency); + if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_fsync, latency); } latencyStartMonitor(latency); @@ -958,8 +960,8 @@ int clusterSaveConfig(int do_fsync) { goto cleanup; } latencyEndMonitor(latency); - latencyAddSampleIfNeeded("cluster-config-rename", latency); - latencyTraceIfNeeded(cluster, cluster_config_rename, latency); + if (!from_bio) latencyAddSampleIfNeeded("cluster-config-rename", latency); + if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_rename, latency); if (do_fsync) { latencyStartMonitor(latency); if (fsyncFileDir(server.cluster_configfile) == -1) { @@ -967,8 +969,8 @@ int clusterSaveConfig(int do_fsync) { goto cleanup; } latencyEndMonitor(latency); - latencyAddSampleIfNeeded("cluster-config-dir-fsync", latency); - latencyTraceIfNeeded(cluster, cluster_config_dir_fsync, latency); + if (!from_bio) latencyAddSampleIfNeeded("cluster-config-dir-fsync", latency); + if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_dir_fsync, latency); } retval = C_OK; /* If we reached this point, everything is fine. */ @@ -977,23 +979,48 @@ int clusterSaveConfig(int do_fsync) { latencyStartMonitor(latency); close(fd); latencyEndMonitor(latency); - latencyAddSampleIfNeeded("cluster-config-close", latency); - latencyTraceIfNeeded(cluster, cluster_config_close, latency); + if (!from_bio) latencyAddSampleIfNeeded("cluster-config-close", latency); + if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_close, latency); } if (retval == C_ERR) { latencyStartMonitor(latency); unlink(tmpfilename); latencyEndMonitor(latency); - latencyAddSampleIfNeeded("cluster-config-unlink", latency); - latencyTraceIfNeeded(cluster, cluster_config_unlink, latency); + if (!from_bio) latencyAddSampleIfNeeded("cluster-config-unlink", latency); + if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_unlink, latency); } sdsfree(tmpfilename); - sdsfree(ci); + sdsfree(content); return retval; } +/* Save cluster config file. + * + * This function writes the node config and returns C_OK, on error C_ERR + * is returned. It is possible to use bio, which can move I/O latency into + * the bio thread. If bio is used, it always returns C_OK. */ +int clusterSaveConfig(int bio, int do_fsync) { + server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG; + if (do_fsync) server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG; + + sds content = clusterGenNodesConfContent(); + if (bio) { + /* We can actually always fsync the file in bio, but anyway lets follow the old code. */ + bioCreateClusterConfigSaveJob(content, do_fsync); + return C_OK; + } else { + return clusterSaveConfigImpl(content, 0, do_fsync); + } +} + +/* Save the cluster file, it is called from the bio thread. */ +int clusterSaveConfigFromBio(sds content, int do_fsync) { + return clusterSaveConfigImpl(content, 1, do_fsync); +} + +/* Save the cluster file, if save fails, the process will exit. */ void clusterSaveConfigOrDie(int do_fsync) { - if (clusterSaveConfig(do_fsync) == C_ERR) { + if (clusterSaveConfig(0, do_fsync) == C_ERR) { serverLog(LL_WARNING, "Fatal: can't update cluster config file."); exit(1); } @@ -1417,7 +1444,8 @@ void clusterHandleServerShutdown(bool auto_failover) { /* The error logs have been logged in the save function if the save fails. */ serverLog(LL_NOTICE, "Saving the cluster configuration file before exiting."); - clusterSaveConfig(1); + bioDrainWorker(BIO_CLUSTER_SAVE); + clusterSaveConfig(0, 1); #if !defined(__sun) /* Unlock the cluster config file before shutdown, see clusterLockConfig. @@ -5907,7 +5935,7 @@ void clusterBeforeSleep(void) { /* Save the config, possibly using fsync. */ if (flags & CLUSTER_TODO_SAVE_CONFIG) { int fsync = flags & CLUSTER_TODO_FSYNC_CONFIG; - clusterSaveConfigOrDie(fsync); + clusterSaveConfig(1, fsync); } if (flags & CLUSTER_TODO_BROADCAST_ALL) { @@ -6309,7 +6337,10 @@ int verifyClusterConfigWithData(void) { delKeysInSlot(j, server.lazyfree_lazy_server_del, true, false); } } - if (update_config) clusterSaveConfigOrDie(1); + if (update_config) { + bioDrainWorker(BIO_CLUSTER_SAVE); + clusterSaveConfigOrDie(1); + } return C_OK; } @@ -7538,7 +7569,8 @@ int clusterCommandSpecial(client *c) { (unsigned long long)myself->configEpoch); addReplySds(c, reply); } else if (!strcasecmp(c->argv[1]->ptr, "saveconfig") && c->argc == 2) { - int retval = clusterSaveConfig(1); + bioDrainWorker(BIO_CLUSTER_SAVE); + int retval = clusterSaveConfig(0, 1); if (retval == C_OK) addReply(c, shared.ok); diff --git a/tests/support/cluster_util.tcl b/tests/support/cluster_util.tcl index d69a195409..e35b2b6a20 100644 --- a/tests/support/cluster_util.tcl +++ b/tests/support/cluster_util.tcl @@ -263,7 +263,7 @@ proc start_cluster {masters replicas options code {slot_allocator continuous_slo # Configure the starting of multiple servers. Set cluster node timeout # aggressively since many tests depend on ping/pong messages. - set cluster_options [list overrides [list cluster-enabled yes cluster-ping-interval 100 cluster-node-timeout 3000 cluster-databases 16 cluster-slot-stats-enabled yes]] + set cluster_options [list overrides [list cluster-enabled yes cluster-ping-interval 100 cluster-node-timeout 3000 cluster-databases 16 cluster-slot-stats-enabled yes latency-monitor-threshold 1]] set options [concat $cluster_options $options] # Cluster mode only supports a single database, so before executing the tests From 0e37d99c388902fdf122643988843d0e17a111eb Mon Sep 17 00:00:00 2001 From: Binbin Date: Fri, 29 Aug 2025 10:37:45 +0800 Subject: [PATCH 2/2] Apply suggestions from code review Co-authored-by: Harkrishn Patro Signed-off-by: Binbin --- src/bio.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/bio.c b/src/bio.c index 56f0fff7d5..aae1d34c50 100644 --- a/src/bio.c +++ b/src/bio.c @@ -88,7 +88,7 @@ static bio_worker_data bio_workers[] = { {"bio_aof"}, {"bio_lazy_free"}, {"bio_rdb_save"}, - {"bio_cluster_save_save"}, + {"bio_cluster_config_save"}, }; static const bio_worker_data *const bio_worker_end = bio_workers + (sizeof bio_workers / sizeof *bio_workers); @@ -240,7 +240,6 @@ void bioCreateClusterConfigSaveJob(sds content, int do_fsync) { bio_job *job = zmalloc(sizeof(*job)); job->cluster_save_args.content = content; job->cluster_save_args.do_fsync = do_fsync; - bioSubmitJob(BIO_CLUSTER_SAVE, job); } @@ -323,7 +322,7 @@ void *bioProcessBackgroundJobs(void *arg) { replicaReceiveRDBFromPrimaryToDisk(job->save_to_disk_args.conn, job->save_to_disk_args.is_dual_channel); } else if (job_type == BIO_CLUSTER_SAVE) { if (clusterSaveConfigFromBio(job->cluster_save_args.content, job->cluster_save_args.do_fsync) == C_ERR) { - serverLog(LL_WARNING, "Fail to save the cluster config file in bio."); + serverLog(LL_WARNING, "Failed to save the cluster config file in background."); } } else { serverPanic("Wrong job type in bioProcessBackgroundJobs().");