Skip to content

Commit 4f42133

Browse files
committed
A major change specific to OkCupid and DSDC. Add the ability for Redis
to be able to take part in a DSDC cluster. This is accomplished by successfully implementing the protocol for registering with the dsdc_master controller, as well as the corresponding heartbeat protocol.
1 parent 5ba47b5 commit 4f42133

File tree

8 files changed

+2554
-1
lines changed

8 files changed

+2554
-1
lines changed

src/Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ endif
113113

114114
REDIS_SERVER_NAME=redis-server
115115
REDIS_SENTINEL_NAME=redis-sentinel
116-
REDIS_SERVER_OBJ=adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o
116+
REDIS_SERVER_OBJ=adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o dsdc.o dsdc_sunrpc.o
117117
REDIS_CLI_NAME=redis-cli
118118
REDIS_CLI_OBJ=anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o
119119
REDIS_BENCHMARK_NAME=redis-benchmark

src/config.c

+10
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,16 @@ void loadServerConfigFromString(char *config) {
160160
} else if (argc == 2 && !strcasecmp(argv[1],"")) {
161161
resetServerSaveParams();
162162
}
163+
} else if (!strcasecmp(argv[0], "dsdc_masters") && argc >= 2) {
164+
int j, dsdcs = argc-1;
165+
if (dsdcs > REDIS_DSDC_MAX_MASTERS) {
166+
err = "Too many DSDC masters specified"; goto loaderr;
167+
}
168+
for (j = 0; j < dsdcs; j++)
169+
server.dsdc_masters[j] = zstrdup(argv[j+1]);
170+
server.dsdc_master_count = dsdcs;
171+
} else if (!strcasecmp(argv[0], "dsdc_num_keys") && argc == 2) {
172+
server.dsdc_num_keys = atoi(argv[1]);
163173
} else if (!strcasecmp(argv[0],"dir") && argc == 2) {
164174
if (chdir(argv[1]) == -1) {
165175
redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",

src/dsdc.c

+114
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
#include "redis.h"
2+
#include "dsdc_sunrpc.h"
3+
#include "sha1.h"
4+
5+
//-----------------------------------------------------------------------------
6+
7+
void* dsdcHeartbeatLoop(void* arg);
8+
CLIENT* dsdcMasterConnection(int index);
9+
void dsdcMakekey(dsdc_key_t* out, const char* hostname, int port, int id);
10+
void dsdcSpawnMasterThread(int index);
11+
12+
//-----------------------------------------------------------------------------
13+
14+
void dsdcInit(void) {
15+
for (int i = 0; i < server.dsdc_master_count; i++) {
16+
dsdcSpawnMasterThread(i);
17+
}
18+
}
19+
20+
//-----------------------------------------------------------------------------
21+
22+
void dsdcSpawnMasterThread(int index) {
23+
pthread_attr_t attr;
24+
pthread_t thread;
25+
26+
CLIENT* clnt = dsdcMasterConnection(index);
27+
28+
pthread_attr_init(&attr);
29+
if (pthread_create(&thread,&attr,dsdcHeartbeatLoop,clnt) != 0) {
30+
redisLog(REDIS_WARNING,"Fatal: Can't initialize DSDC Heartbeat.");
31+
exit(1);
32+
}
33+
}
34+
35+
//-----------------------------------------------------------------------------
36+
37+
CLIENT* dsdcMasterConnection(int index) {
38+
char myname[256];
39+
char* master_host = server.dsdc_masters[index];
40+
int num_keys = server.dsdc_num_keys;
41+
42+
CLIENT* clnt = clnt_create(master_host, DSDC_PROG, DSDC_VERS, "tcp");
43+
44+
if (clnt) {
45+
printf("dsdc: connected to master: %s\n", master_host);
46+
} else {
47+
clnt_pcreateerror(master_host);
48+
exit(1);
49+
}
50+
51+
dsdc_register2_arg_t reg_arg;
52+
reg_arg.primary = TRUE;
53+
reg_arg.lock_server = FALSE;
54+
55+
gethostname(myname, 256);
56+
57+
dsdcx_slave2_t slave;
58+
slave.hostname = myname;
59+
slave.port = server.port;
60+
slave.slave_type = DSDC_REDIS_SLAVE;
61+
62+
dsdc_keyset_t keys;
63+
keys.dsdc_keyset_t_val = zmalloc(sizeof(dsdc_key_t) * num_keys);
64+
keys.dsdc_keyset_t_len = num_keys;
65+
66+
for (int i = 0; i < num_keys; i++) {
67+
dsdcMakekey(&keys.dsdc_keyset_t_val[i], myname, server.port, i);
68+
}
69+
70+
slave.keys = keys;
71+
reg_arg.slave = slave;
72+
73+
dsdc_res_t* res = dsdc_register2_1(&reg_arg, clnt);
74+
if (!(res && *res == DSDC_OK)) {
75+
clnt_perror(clnt, "dsdc: registration failed");
76+
exit(1);
77+
}
78+
79+
return clnt;
80+
}
81+
82+
//-----------------------------------------------------------------------------
83+
84+
void* dsdcHeartbeatLoop(void* arg) {
85+
86+
CLIENT* clnt = (CLIENT*) arg;
87+
88+
while (TRUE) {
89+
dsdc_res_t* res = dsdc_heartbeat_1(NULL, clnt);
90+
if (!res) {
91+
printf("dsdc: heartbeat failed\n");
92+
}
93+
94+
sleep(1);
95+
}
96+
97+
return NULL;
98+
}
99+
100+
//-----------------------------------------------------------------------------
101+
102+
void dsdcMakekey(dsdc_key_t* out, const char* hostname, int port,
103+
int index) {
104+
105+
char buffer[1024];
106+
SHA1_CTX ctx;
107+
108+
int len = snprintf(buffer, 1024, "%s-%d-%d", hostname, port, index);
109+
110+
SHA1Init(&ctx);
111+
SHA1Update(&ctx, (unsigned char*) buffer, len);
112+
SHA1Final( (unsigned char*) out, &ctx);
113+
114+
}

src/dsdc.h

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
2+
void dsdcInit(void);

0 commit comments

Comments
 (0)