Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 94 additions & 28 deletions cdb2api/cdb2api.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ static int cdb2_comdb2db_timeout_set_from_env = 0;
static int CDB2_API_CALL_TIMEOUT = 120000; /* defaults to 2 minute */
static int cdb2_api_call_timeout_set_from_env = 0;

static int CDB2_ENFORCE_API_CALL_TIMEOUT = 0;
static int cdb2_enforce_api_call_timeout_set_from_env = 0;

static int CDB2_SOCKET_TIMEOUT = 5000;
static int cdb2_socket_timeout_set_from_env = 0;

Expand Down Expand Up @@ -856,6 +859,37 @@ static int is_sql_read(const char *sqlstr)
#define HAVE_MSGHDR_MSG_CONTROL
#endif

static int is_api_call_timedout(cdb2_hndl_tp *hndl) {
struct timeval tv;
gettimeofday(&tv, NULL);
long long current_time = tv.tv_sec*1000 + tv.tv_usec/1000;
if (hndl->max_call_time && (hndl->max_call_time < current_time)) {
return 1;
}
return 0;
}

static long long get_call_timeout(const cdb2_hndl_tp *hndl, long long timeout) {
if (!hndl)
return timeout;
struct timeval tv;
gettimeofday(&tv, NULL);
long long current_time = tv.tv_sec*1000 + tv.tv_usec/1000;
long long time_left = hndl->max_call_time - current_time;
if (hndl->max_call_time && time_left <= 0)
time_left = 1;
if (time_left > 0 && (time_left < timeout))
return time_left;
return timeout;
}

static void set_max_call_time(cdb2_hndl_tp *hndl) {
struct timeval tv;
gettimeofday(&tv, NULL);
if (hndl)
hndl->max_call_time = tv.tv_sec*1000 + tv.tv_usec/1000 + hndl->api_call_timeout;
}

enum {
PASSFD_SUCCESS = 0,
PASSFD_RECVMSG = -1, /* error with recvmsg() */
Expand Down Expand Up @@ -995,8 +1029,7 @@ static int recv_fd_int(int sockfd, void *data, size_t nbytes, int *fd_recvd, int
static int recv_fd(const cdb2_hndl_tp *hndl, int sockfd, void *data, size_t nbytes, int *fd_recvd)
{
int rc, timeoutms = hndl ? hndl->sockpool_recv_timeoutms : CDB2_SOCKPOOL_RECV_TIMEOUTMS;
if (hndl && hndl->api_call_timeout && (timeoutms > hndl->api_call_timeout))
timeoutms = hndl->api_call_timeout;
timeoutms = get_call_timeout(hndl, timeoutms);
rc = recv_fd_int(sockfd, data, nbytes, fd_recvd, timeoutms);
if (rc != 0 && *fd_recvd != -1) {
int errno_save = errno;
Expand Down Expand Up @@ -1116,8 +1149,7 @@ static int send_fd_to(int sockfd, const void *data, size_t nbytes,
static int send_fd(const cdb2_hndl_tp *hndl, int sockfd, const void *data, size_t nbytes, int fd_to_send)
{
int timeoutms = hndl ? hndl->sockpool_send_timeoutms : CDB2_SOCKPOOL_SEND_TIMEOUTMS;
if (hndl && hndl->api_call_timeout && (timeoutms > hndl->api_call_timeout))
timeoutms = hndl->api_call_timeout;
timeoutms = get_call_timeout(hndl, timeoutms);
return send_fd_to(sockfd, data, nbytes, fd_to_send, timeoutms);
}

Expand Down Expand Up @@ -1591,6 +1623,8 @@ static void read_comdb2db_environment_cfg(cdb2_hndl_tp *hndl, const char *comdb2
&cdb2_sockpool_recv_timeoutms_set_from_env);
process_env_var_int("COMDB2_CONFIG_API_CALL_TIMEOUT", &CDB2_API_CALL_TIMEOUT,
&cdb2_api_call_timeout_set_from_env);
process_env_var_int("COMDB2_CONFIG_ENFORCE_API_CALL_TIMEOUT", &CDB2_ENFORCE_API_CALL_TIMEOUT,
&cdb2_enforce_api_call_timeout_set_from_env);
process_env_var_int("COMDB2_CONFIG_COMDB2DB_TIMEOUT", &COMDB2DB_TIMEOUT, &cdb2_comdb2db_timeout_set_from_env);
process_env_var_int("COMDB2_CONFIG_SOCKET_TIMEOUT", &CDB2_SOCKET_TIMEOUT, &cdb2_socket_timeout_set_from_env);
process_env_var_int("COMDB2_CONFIG_PROTOBUF_SIZE", &CDB2_PROTOBUF_SIZE, &cdb2_protobuf_size_set_from_env);
Expand Down Expand Up @@ -1841,6 +1875,9 @@ static void read_comdb2db_cfg(cdb2_hndl_tp *hndl, SBUF2 *s, const char *comdb2db
hndl->api_call_timeout = atoi(tok);
else if (tok)
CDB2_API_CALL_TIMEOUT = atoi(tok);
} else if (!cdb2_api_call_timeout_set_from_env && (strcasecmp("enforce_api_call_timeout",tok) == 0)) {
tok = strtok_r(NULL, " :,", &last);
CDB2_ENFORCE_API_CALL_TIMEOUT = value_on_off(tok, &err);
} else if (strcasecmp("auto_consume_timeout", tok) == 0) {
tok = strtok_r(NULL, " :,", &last);
if (tok)
Expand Down Expand Up @@ -2125,6 +2162,8 @@ static void set_cdb2_timeouts(cdb2_hndl_tp *hndl)
hndl->comdb2db_timeout = hndl->api_call_timeout;
if (hndl->socket_timeout > hndl->api_call_timeout)
hndl->socket_timeout = hndl->api_call_timeout;

set_max_call_time(hndl);
}

/* Read all available comdb2 configuration files.
Expand Down Expand Up @@ -3688,8 +3727,22 @@ static int cdb2portmux_get(cdb2_hndl_tp *hndl, const char *type,

debugprint("name %s\n", name);

int connect_timeout = hndl->connect_timeout;

if (CDB2_ENFORCE_API_CALL_TIMEOUT) {
# ifdef CDB2API_TEST
printf("RETRY with timeout %d\n", get_call_timeout(hndl, connect_timeout));
# endif
if (is_api_call_timedout(hndl)) {
snprintf(hndl->errstr, sizeof(hndl->errstr), "%s:%d Timed out connecting to db\n", __func__, __LINE__);
port = -1;
goto after_callback;
}
connect_timeout = get_call_timeout(hndl, connect_timeout);
}

fd = cdb2_tcpconnecth_to(hndl, remote_host, CDB2_PORTMUXPORT, 0,
hndl->connect_timeout);
connect_timeout);
if (fd < 0) {
debugprint("cdb2_tcpconnecth_to returns fd=%d'\n", fd);
if (errno == EINPROGRESS) {
Expand All @@ -3715,7 +3768,18 @@ static int cdb2portmux_get(cdb2_hndl_tp *hndl, const char *type,
port = -1;
goto after_callback;
}
sbuf2settimeout(ss, hndl->connect_timeout, hndl->connect_timeout);
if (CDB2_ENFORCE_API_CALL_TIMEOUT) {
# ifdef CDB2API_TEST
printf("RETRY with timeout %d\n", get_call_timeout(hndl, connect_timeout));
# endif
if (is_api_call_timedout(hndl)) {
snprintf(hndl->errstr, sizeof(hndl->errstr), "%s:%d Timed out connecting to db\n", __func__, __LINE__);
port = -1;
goto after_callback;
}
connect_timeout = get_call_timeout(hndl, connect_timeout);
}
sbuf2settimeout(ss, connect_timeout, connect_timeout);
sbuf2printf(ss, "get %s\n", name);
sbuf2flush(ss);
res[0] = 0;
Expand Down Expand Up @@ -3904,6 +3968,17 @@ static int cdb2_read_record(cdb2_hndl_tp *hndl, uint8_t **buf, int *len, int *ty
goto after_callback;

retry:
if (CDB2_ENFORCE_API_CALL_TIMEOUT) {
int socket_timeout = get_call_timeout(hndl, hndl->socket_timeout);
# ifdef CDB2API_TEST
printf("GOT HEARTBEAT || Set timeout to %d\n", socket_timeout);
# endif
if (is_api_call_timedout(hndl)) {
snprintf(hndl->errstr, sizeof(hndl->errstr), "%s:%d Timed out reading response from the db\n", __func__, __LINE__);
rc = -1;
}
sbuf2settimeout(sb, socket_timeout, socket_timeout);
}
b_read = sbuf2fread((char *)&hdr, 1, sizeof(hdr), sb);
debugprint("READ HDR b_read=%d, sizeof(hdr)=(%zu):\n", b_read, sizeof(hdr));

Expand Down Expand Up @@ -4068,6 +4143,10 @@ static int cdb2_read_record(cdb2_hndl_tp *hndl, uint8_t **buf, int *len, int *ty

rc = 0;
after_callback:
// reset here
if (CDB2_ENFORCE_API_CALL_TIMEOUT) {
sbuf2settimeout(sb, hndl->socket_timeout, hndl->socket_timeout);
}
while ((e = cdb2_next_callback(hndl, CDB2_AFTER_READ_RECORD, e)) != NULL) {
callbackrc =
cdb2_invoke_callback(hndl, e, 1, CDB2_RETURN_VALUE, (intptr_t)rc);
Expand Down Expand Up @@ -5954,10 +6033,8 @@ static int cdb2_run_statement_typed_int(cdb2_hndl_tp *hndl, const char *sql, int
hndl->retry_all = 1;
int run_last = 1;

time_t max_time =
time(NULL) + (hndl->api_call_timeout - hndl->connect_timeout) / 1000;
if (max_time < 0)
max_time = 0;
set_max_call_time(hndl);

retry_queries:
debugprint(
"retry_queries: hndl->host=%d (%s)\n", hndl->connected_host,
Expand Down Expand Up @@ -5985,18 +6062,11 @@ static int cdb2_run_statement_typed_int(cdb2_hndl_tp *hndl, const char *sql, int
cdb2_get_dbhosts(hndl);
}

int tmsec = 0;

// Add wait if we have already tried on all the nodes.
if (!hndl->sb && (retries_done > hndl->num_hosts)) {
tmsec = (retries_done - hndl->num_hosts) * 100;
}

if (hndl->sslerr != 0)
PRINT_AND_RETURN(CDB2ERR_CONNECT_ERROR);

if ((retries_done > 1) && ((retries_done > hndl->max_retries) ||
((time(NULL) + (tmsec / 1000)) >= max_time))) {
(is_api_call_timedout(hndl)))) {
sprintf(hndl->errstr, "%s: Maximum number of retries done.", __func__);
if (is_hasql_commit) {
cleanup_query_list(hndl, &commit_query_list, __LINE__);
Expand Down Expand Up @@ -7873,17 +7943,13 @@ static int cdb2_get_dbhosts(cdb2_hndl_tp *hndl)
}
}

time_t max_time =
time(NULL) +
(hndl->api_call_timeout - (CDB2_POLL_TIMEOUT + hndl->connect_timeout)) /
1000;
if (max_time < 0)
max_time = 0;
if (!hndl->max_call_time)
set_max_call_time(hndl);

use_bmsd = cdb2_use_bmsd && (*cdb2_bmssuffix != '\0') && !hndl->num_shards; // cannot find shards via bmsd yet
retry:
if (rc) {
if (num_retry >= MAX_RETRIES || time(NULL) > max_time)
if (num_retry >= MAX_RETRIES || is_api_call_timedout(hndl))
goto after_callback;

num_retry++;
Expand All @@ -7901,7 +7967,7 @@ static int cdb2_get_dbhosts(cdb2_hndl_tp *hndl)
hndl, cdb2_default_cluster, comdb2db_name, comdb2db_num,
comdb2db_hosts[i], comdb2db_hosts, comdb2db_ports, &master,
&num_comdb2db_hosts, NULL);
if (rc == 0 || time(NULL) >= max_time) {
if (rc == 0 || is_api_call_timedout(hndl)) {
break;
}
}
Expand All @@ -7918,15 +7984,15 @@ static int cdb2_get_dbhosts(cdb2_hndl_tp *hndl)
hndl->hosts, &hndl->num_hosts, hndl->dbname, hndl->cluster, &hndl->dbnum,
&hndl->num_hosts_sameroom, num_retry, use_bmsd, hndl->shards, &hndl->num_shards,
&hndl->num_shards_sameroom);
if (rc == 0 || time(NULL) >= max_time) {
if (rc == 0 || is_api_call_timedout(hndl)) {
break;
} else if (use_bmsd) {
if (cdb2_comdb2db_fallback)
use_bmsd = 0;
goto retry;
}
}
if (rc == -1 && time(NULL) < max_time) {
if (rc == -1 && !is_api_call_timedout(hndl)) {
rc = comdb2db_get_dbhosts(hndl, comdb2db_name, comdb2db_num, comdb2db_hosts[master], comdb2db_ports[master],
hndl->hosts, &hndl->num_hosts, hndl->dbname, hndl->cluster, &hndl->dbnum,
&hndl->num_hosts_sameroom, num_retry, use_bmsd, hndl->shards, &hndl->num_shards,
Expand Down
1 change: 1 addition & 0 deletions cdb2api/cdb2api_hndl.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ struct cdb2_hndl {
int is_hasql;
int sent_client_info;
void *user_arg;
long long max_call_time;
int api_call_timeout;
int connect_timeout;
int comdb2db_timeout;
Expand Down
Loading