Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/bio.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

#include "server.h"
#include "connection.h"
#include "cluster.h"
#include "bio.h"
#include <stdatomic.h>

Expand All @@ -71,6 +72,7 @@ static unsigned int bio_job_to_worker[] = {
[BIO_CLOSE_AOF] = 1,
[BIO_LAZY_FREE] = 2,
[BIO_RDB_SAVE] = 3,
[BIO_CLUSTER_SAVE] = 4,
};

typedef struct {
Expand All @@ -86,6 +88,7 @@ static bio_worker_data bio_workers[] = {
{"bio_aof"},
{"bio_lazy_free"},
{"bio_rdb_save"},
{"bio_cluster_config_save"},
};
static const bio_worker_data *const bio_worker_end = bio_workers + (sizeof bio_workers / sizeof *bio_workers);

Expand Down Expand Up @@ -128,6 +131,12 @@ typedef union bio_job {
connection *conn; /* Connection to download the RDB from */
int is_dual_channel; /* Single vs dual channel */
} save_to_disk_args;

struct {
int type;
sds content; /* Cluster config file content. */
unsigned do_fsync : 1; /* A flag to indicate that a fsync is required. */
} cluster_save_args;
} bio_job;

void *bioProcessBackgroundJobs(void *arg);
Expand Down Expand Up @@ -227,6 +236,13 @@ void bioCreateSaveRDBToDiskJob(connection *conn, int is_dual_channel) {
bioSubmitJob(BIO_RDB_SAVE, job);
}

void bioCreateClusterConfigSaveJob(sds content, int do_fsync) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who owns/free’s content? bioCreateClusterConfigSaveJob(sds content, int do_fsync) enqueues an sds into job->cluster_save_args.content, then the BIO worker calls clusterSaveConfigFromBio(...). is there a clear free path for content afterwards—Would you please document ownership and ensure a single, deterministic free site to avoid leaks or double-frees.

bio_job *job = zmalloc(sizeof(*job));
job->cluster_save_args.content = content;
job->cluster_save_args.do_fsync = do_fsync;
bioSubmitJob(BIO_CLUSTER_SAVE, job);
}

void *bioProcessBackgroundJobs(void *arg) {
bio_worker_data *const bwd = arg;
bio_job *job;
Expand Down Expand Up @@ -304,6 +320,10 @@ void *bioProcessBackgroundJobs(void *arg) {
job->free_args.free_fn(job->free_args.free_args);
} else if (job_type == BIO_RDB_SAVE) {
replicaReceiveRDBFromPrimaryToDisk(job->save_to_disk_args.conn, job->save_to_disk_args.is_dual_channel);
} else if (job_type == BIO_CLUSTER_SAVE) {
if (clusterSaveConfigFromBio(job->cluster_save_args.content, job->cluster_save_args.do_fsync) == C_ERR) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to introduce exponential backoff and retry to mitigate transient errors. Can we include observability and metrics around the type of error we notice while trying to save the file/directory

serverLog(LL_WARNING, "Failed to save the cluster config file in background.");
}
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
Expand Down
2 changes: 2 additions & 0 deletions src/bio.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache);
void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache);
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...);
void bioCreateSaveRDBToDiskJob(connection *conn, int is_dual_channel);
void bioCreateClusterConfigSaveJob(sds content, int do_fsync);
int inBioThread(void);

/* Background job opcodes */
Expand All @@ -51,6 +52,7 @@ enum {
BIO_LAZY_FREE, /* Deferred objects freeing. */
BIO_CLOSE_AOF, /* Deferred close for AOF files. */
BIO_RDB_SAVE, /* Deferred save RDB to disk on replica */
BIO_CLUSTER_SAVE, /* Deferred cluster config file save and fsync. */
BIO_NUM_OPS
};

Expand Down
2 changes: 1 addition & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ sds aggregateClientOutputBuffer(client *c);
void resetClusterStats(void);
unsigned int delKeysInSlot(unsigned int hashslot, int lazy, bool propagate_del, bool send_del_event);

unsigned int propagateSlotDeletionByKeys(unsigned int hashslot);
void clusterUpdateState(void);
int clusterSaveConfigFromBio(sds content, int do_fsync);
void clusterSaveConfigOrDie(int do_fsync);
int clusterDelSlot(int slot);
int clusterAddSlot(clusterNode *n, int slot);
Expand Down
98 changes: 65 additions & 33 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "endianconv.h"
#include "connection.h"
#include "module.h"
#include "bio.h"

#include <stdlib.h>
#include <sys/types.h>
Expand Down Expand Up @@ -886,6 +887,16 @@ int clusterLoadConfig(char *filename) {
serverPanic("Unrecoverable error: corrupted cluster config file \"%s\".", line);
}

/* Get the nodes description and concatenate our "vars" directive to
* save currentEpoch and lastVoteEpoch. */
sds clusterGenNodesConfContent(void) {
sds content = clusterGenNodesDescription(NULL, CLUSTER_NODE_HANDSHAKE, 0);
content = sdscatfmt(content, "vars currentEpoch %U lastVoteEpoch %U\n",
(unsigned long long)server.cluster->currentEpoch,
(unsigned long long)server.cluster->lastVoteEpoch);
return content;
}

/* Cluster node configuration is exactly the same as CLUSTER NODES output.
*
* This function writes the node config and returns C_OK, on error C_ERR
Expand All @@ -898,23 +909,14 @@ int clusterLoadConfig(char *filename) {
* a single write to write the whole file. If the pre-existing file was
* bigger we pad our payload with newlines that are anyway ignored and truncate
* the file afterward. */
int clusterSaveConfig(int do_fsync) {
sds ci, tmpfilename;
int clusterSaveConfigImpl(sds content, int from_bio, int do_fsync) {
sds tmpfilename;
size_t content_size, offset = 0;
ssize_t written_bytes;
int fd = -1;
int retval = C_ERR;
mstime_t latency;

server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;

/* Get the nodes description and concatenate our "vars" directive to
* save currentEpoch and lastVoteEpoch. */
ci = clusterGenNodesDescription(NULL, CLUSTER_NODE_HANDSHAKE, 0);
ci = sdscatfmt(ci, "vars currentEpoch %U lastVoteEpoch %U\n",
(unsigned long long)server.cluster->currentEpoch,
(unsigned long long)server.cluster->lastVoteEpoch);
content_size = sdslen(ci);
content_size = sdslen(content);

/* Create a temp file with the new content. */
tmpfilename = sdscatfmt(sdsempty(), "%s.tmp-%i-%I", server.cluster_configfile, (int)getpid(), mstime());
Expand All @@ -924,11 +926,11 @@ int clusterSaveConfig(int do_fsync) {
goto cleanup;
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("cluster-config-open", latency);
latencyTraceIfNeeded(cluster, cluster_config_open, latency);
if (!from_bio) latencyAddSampleIfNeeded("cluster-config-open", latency);
if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_open, latency);
latencyStartMonitor(latency);
while (offset < content_size) {
written_bytes = write(fd, ci + offset, content_size - offset);
written_bytes = write(fd, content + offset, content_size - offset);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If multiple saves are enqueued, can older content overwrite newer ones and amplify I/O. Can we Coalesce jobs so only the latest content is written (drop older items). Do we need some sort of versioning/epoch so the older jobs are dropped.

if (written_bytes <= 0) {
if (errno == EINTR) continue;
serverLog(LL_WARNING, "Failed after writing (%zd) bytes to tmp cluster config file: %s", offset,
Expand All @@ -938,8 +940,8 @@ int clusterSaveConfig(int do_fsync) {
offset += written_bytes;
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("cluster-config-write", latency);
latencyTraceIfNeeded(cluster, cluster_config_write, latency);
if (!from_bio) latencyAddSampleIfNeeded("cluster-config-write", latency);
if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_write, latency);
if (do_fsync) {
latencyStartMonitor(latency);
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
Expand All @@ -948,8 +950,8 @@ int clusterSaveConfig(int do_fsync) {
goto cleanup;
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("cluster-config-fsync", latency);
latencyTraceIfNeeded(cluster, cluster_config_fsync, latency);
if (!from_bio) latencyAddSampleIfNeeded("cluster-config-fsync", latency);
if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_fsync, latency);
}

latencyStartMonitor(latency);
Expand All @@ -958,17 +960,17 @@ int clusterSaveConfig(int do_fsync) {
goto cleanup;
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("cluster-config-rename", latency);
latencyTraceIfNeeded(cluster, cluster_config_rename, latency);
if (!from_bio) latencyAddSampleIfNeeded("cluster-config-rename", latency);
if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_rename, latency);
if (do_fsync) {
latencyStartMonitor(latency);
if (fsyncFileDir(server.cluster_configfile) == -1) {
serverLog(LL_WARNING, "Could not sync cluster config file dir: %s", strerror(errno));
goto cleanup;
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("cluster-config-dir-fsync", latency);
latencyTraceIfNeeded(cluster, cluster_config_dir_fsync, latency);
if (!from_bio) latencyAddSampleIfNeeded("cluster-config-dir-fsync", latency);
if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_dir_fsync, latency);
}
retval = C_OK; /* If we reached this point, everything is fine. */

Expand All @@ -977,23 +979,48 @@ int clusterSaveConfig(int do_fsync) {
latencyStartMonitor(latency);
close(fd);
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("cluster-config-close", latency);
latencyTraceIfNeeded(cluster, cluster_config_close, latency);
if (!from_bio) latencyAddSampleIfNeeded("cluster-config-close", latency);
if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_close, latency);
}
if (retval == C_ERR) {
latencyStartMonitor(latency);
unlink(tmpfilename);
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("cluster-config-unlink", latency);
latencyTraceIfNeeded(cluster, cluster_config_unlink, latency);
if (!from_bio) latencyAddSampleIfNeeded("cluster-config-unlink", latency);
if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_unlink, latency);
}
sdsfree(tmpfilename);
sdsfree(ci);
sdsfree(content);
return retval;
}

/* Save cluster config file.
*
* This function writes the node config and returns C_OK, on error C_ERR
* is returned. It is possible to use bio, which can move I/O latency into
* the bio thread. If bio is used, it always returns C_OK. */
int clusterSaveConfig(int bio, int do_fsync) {
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
if (do_fsync) server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;

sds content = clusterGenNodesConfContent();
if (bio) {
/* We can actually always fsync the file in bio, but anyway lets follow the old code. */
bioCreateClusterConfigSaveJob(content, do_fsync);
return C_OK;
} else {
return clusterSaveConfigImpl(content, 0, do_fsync);
}
}

/* Save the cluster file, it is called from the bio thread. */
int clusterSaveConfigFromBio(sds content, int do_fsync) {
return clusterSaveConfigImpl(content, 1, do_fsync);
}

/* Save the cluster file, if save fails, the process will exit. */
void clusterSaveConfigOrDie(int do_fsync) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized clusterSaveConfigOrDie is always fsync, maybe we can get rid of the method param altogether.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can remove it once we make a decision at this one #2555 (comment)

if (clusterSaveConfig(do_fsync) == C_ERR) {
if (clusterSaveConfig(0, do_fsync) == C_ERR) {
serverLog(LL_WARNING, "Fatal: can't update cluster config file.");
exit(1);
}
Expand Down Expand Up @@ -1417,7 +1444,8 @@ void clusterHandleServerShutdown(bool auto_failover) {

/* The error logs have been logged in the save function if the save fails. */
serverLog(LL_NOTICE, "Saving the cluster configuration file before exiting.");
clusterSaveConfig(1);
bioDrainWorker(BIO_CLUSTER_SAVE);
clusterSaveConfig(0, 1);

#if !defined(__sun)
/* Unlock the cluster config file before shutdown, see clusterLockConfig.
Expand Down Expand Up @@ -5907,7 +5935,7 @@ void clusterBeforeSleep(void) {
/* Save the config, possibly using fsync. */
if (flags & CLUSTER_TODO_SAVE_CONFIG) {
int fsync = flags & CLUSTER_TODO_FSYNC_CONFIG;
clusterSaveConfigOrDie(fsync);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may require some adjustments. Previously, we synchronized saves and exited if they failed. Now, if we switch to bio, do we still need to retain the exit logic?

Also see #1032

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a comment on #1032

clusterSaveConfig(1, fsync);
}

if (flags & CLUSTER_TODO_BROADCAST_ALL) {
Expand Down Expand Up @@ -6309,7 +6337,10 @@ int verifyClusterConfigWithData(void) {
delKeysInSlot(j, server.lazyfree_lazy_server_del, true, false);
}
}
if (update_config) clusterSaveConfigOrDie(1);
if (update_config) {
bioDrainWorker(BIO_CLUSTER_SAVE);
clusterSaveConfigOrDie(1);
}
return C_OK;
}

Expand Down Expand Up @@ -7538,7 +7569,8 @@ int clusterCommandSpecial(client *c) {
(unsigned long long)myself->configEpoch);
addReplySds(c, reply);
} else if (!strcasecmp(c->argv[1]->ptr, "saveconfig") && c->argc == 2) {
int retval = clusterSaveConfig(1);
bioDrainWorker(BIO_CLUSTER_SAVE);
int retval = clusterSaveConfig(0, 1);

if (retval == C_OK)
addReply(c, shared.ok);
Expand Down
2 changes: 1 addition & 1 deletion tests/support/cluster_util.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ proc start_cluster {masters replicas options code {slot_allocator continuous_slo
# Configure the starting of multiple servers. Set cluster node timeout
# aggressively since many tests depend on ping/pong messages.

set cluster_options [list overrides [list cluster-enabled yes cluster-ping-interval 100 cluster-node-timeout 3000 cluster-databases 16 cluster-slot-stats-enabled yes]]
set cluster_options [list overrides [list cluster-enabled yes cluster-ping-interval 100 cluster-node-timeout 3000 cluster-databases 16 cluster-slot-stats-enabled yes latency-monitor-threshold 1]]
set options [concat $cluster_options $options]

# Cluster mode only supports a single database, so before executing the tests
Expand Down
Loading