Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Significantly reduce file descriptors consumption #3085

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ extern int ipc_shared_fd_read;
#define IPC_TYPE_NONE (-1)
#define ipc_bad_handler_type(htype) ((htype) < 0)

#define IPC_FD_READ(_proc_no) pt[_proc_no].ipc_pipe[0]
#define IPC_FD_WRITE(_proc_no) pt[_proc_no].ipc_pipe[1]
#define IPC_FD_READ(_proc_no) pt[_proc_no].ipc_pipe
#define IPC_FD_WRITE(_proc_no) pt[_proc_no].ipc_pipe
#define IPC_FD_READ_SELF IPC_FD_READ(process_no)
#define IPC_FD_READ_SHARED ipc_shared_fd_read
#define IPC_FD_SYNC_READ(_proc_no) pt[_proc_no].ipc_sync_pipe[0]
#define IPC_FD_SYNC_WRITE(_proc_no) pt[_proc_no].ipc_sync_pipe[1]
#define IPC_FD_SYNC_READ(_proc_no) pt[_proc_no].ipc_sync_pipe
#define IPC_FD_SYNC_WRITE(_proc_no) pt[_proc_no].ipc_sync_pipe
#define IPC_FD_SYNC_READ_SELF IPC_FD_SYNC_READ(process_no)

/* prototype of IPC handler - function called by the IPC engine
Expand Down
97 changes: 79 additions & 18 deletions pt.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ int init_multi_proc_support(void)
/* reset fds to prevent bogus ops */
pt[i].unix_sock = -1;
pt[i].pid = -1;
pt[i].ipc_pipe[0] = pt[i].ipc_pipe[1] = -1;
pt[i].ipc_sync_pipe[0] = pt[i].ipc_sync_pipe[1] = -1;
pt[i].ipc_pipe = -1;
pt[i].ipc_sync_pipe = -1;
}

/* create the load-related stats (initially marked as hidden */
Expand Down Expand Up @@ -235,8 +235,8 @@ void reset_process_slot( int p_id )
pt[p_id].desc[0] = 0;
pt[p_id].flags = 0;

pt[p_id].ipc_pipe[0] = pt[p_id].ipc_pipe[1] = -1;
pt[p_id].ipc_sync_pipe[0] = pt[p_id].ipc_sync_pipe[1] = -1;
pt[p_id].ipc_pipe = -1;
pt[p_id].ipc_sync_pipe = -1;
pt[p_id].unix_sock = -1;

pt[p_id].log_level = pt[p_id].default_log_level = 0; /*not really needed*/
Expand All @@ -257,6 +257,63 @@ void reset_process_slot( int p_id )
#endif
}

static int close_unused_pipes(int proc_no, int idx, int is_parent)
{
int *fd;

fd = &pt[proc_no].ipc_pipe_holder[idx];
if (is_parent || *fd != -1) {
if (close(*fd) != 0) {
LM_BUG("failed to close pt[%d].ipc_pipe_holder[%d]"
" = %d, errno = %d\n", proc_no, idx, *fd, errno);
return -1;
}
if (is_parent)
*fd = -1;
}
fd = &pt[proc_no].ipc_sync_pipe_holder[idx];
if (is_parent || *fd != -1) {
if (close(*fd) != 0) {
LM_BUG("failed to close pt[%d].ipc_sync_pipe_holder[%d]"
" = %d, errno = %d\n", proc_no, idx, *fd, errno);
return -1;
}
if (is_parent)
*fd = -1;
}
return 0;
}

static int setup_child_ipc_pipes(int proc_no)
{
int fd1, fd2, eval = 0;

fd2 = pt[proc_no].ipc_pipe;
if (fd2 != -1) {
fd1 = pt[proc_no].ipc_pipe_holder[0];
if (dup2(fd1, fd2) < 0) {
LM_BUG("failed to dup2(%d, pt[%d].ipc_pipe"
" = %d), errno = %d\n", fd1, proc_no, fd2,
errno);
eval = -1;
}
}
fd2 = pt[proc_no].ipc_sync_pipe;
if (fd2 != -1) {
fd1 = pt[proc_no].ipc_sync_pipe_holder[0];
if (dup2(fd1, fd2) < 0) {
LM_BUG("failed to dup2(%d, pt[%d].ipc_sync_pipe"
" = %d), errno = %d\n", fd1, proc_no, fd2,
errno);
eval = -1;
}
}
for (int i = 0; i < counted_max_processes; i++) {
if (close_unused_pipes(i, 0, 0) != 0)
eval = -1;
}
return eval;
}

enum {CHLD_STARTING, CHLD_OK, CHLD_FAILED};

Expand All @@ -268,6 +325,9 @@ static __attribute__((__noreturn__)) void child_startup_failed(void)

static int internal_fork_child_setup(const struct internal_fork_params *ifpp)
{
if (setup_child_ipc_pipes(process_no) != 0)
return -1;

init_log_level();

tcp_connect_proc_to_tcp_main(process_no, 1);
Expand Down Expand Up @@ -317,18 +377,16 @@ int internal_fork(const struct internal_fork_params *ifpp)
/* set the IPC pipes */
if ( (ifpp->flags & OSS_PROC_NO_IPC) ) {
/* advertise no IPC to the rest of the procs */
pt[new_idx].ipc_pipe[0] = -1;
pt[new_idx].ipc_pipe[1] = -1;
pt[new_idx].ipc_sync_pipe[0] = -1;
pt[new_idx].ipc_sync_pipe[1] = -1;
/* NOTE: the IPC fds will remain open in the other processes,
* but they will not be known */
pt[new_idx].ipc_pipe = -1;
pt[new_idx].ipc_sync_pipe = -1;
for (int i = 0; i < 2; i++) {
if (close_unused_pipes(new_idx, i, 1) != 0)
return -1;
}
} else {
/* activate the IPC pipes */
pt[new_idx].ipc_pipe[0]=pt[new_idx].ipc_pipe_holder[0];
pt[new_idx].ipc_pipe[1]=pt[new_idx].ipc_pipe_holder[1];
pt[new_idx].ipc_sync_pipe[0]=pt[new_idx].ipc_sync_pipe_holder[0];
pt[new_idx].ipc_sync_pipe[1]=pt[new_idx].ipc_sync_pipe_holder[1];
pt[new_idx].ipc_pipe=pt[new_idx].ipc_pipe_holder[1];
pt[new_idx].ipc_sync_pipe=pt[new_idx].ipc_sync_pipe_holder[1];
}

pt[new_idx].pid = 0;
Expand Down Expand Up @@ -402,6 +460,11 @@ int internal_fork(const struct internal_fork_params *ifpp)
goto child_is_down;
}
pt[new_idx].flags |= OSS_PROC_IS_RUNNING;
if ( (ifpp->flags & OSS_PROC_NO_IPC)==0 ) {
/* close the child's end of the pipes */
if (close_unused_pipes(new_idx, 0, 1) != 0)
return -1;
}
tcp_connect_proc_to_tcp_main( new_idx, 0);
return new_idx;
child_is_down:
Expand Down Expand Up @@ -475,10 +538,8 @@ int count_child_processes(void)
void dynamic_process_final_exit(void)
{
/* prevent any more IPC */
pt[process_no].ipc_pipe[0] = -1;
pt[process_no].ipc_pipe[1] = -1;
pt[process_no].ipc_sync_pipe[0] = -1;
pt[process_no].ipc_sync_pipe[1] = -1;
pt[process_no].ipc_pipe = -1;
pt[process_no].ipc_sync_pipe = -1;

/* clear the per-process connection from the DB queues */
ql_force_process_disconnect(process_no);
Expand Down
8 changes: 3 additions & 5 deletions pt.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,8 @@ struct process_table {
/* various flags describing properties of this process */
unsigned int flags;

/* pipe used by the process to receive designated jobs (used by IPC)
* [1] for writting into by other process,
* [0] to listen on by this process */
int ipc_pipe[2];
/* pipe used by the process to receive designated jobs (used by IPC) */
int ipc_pipe;
/* same as above, but the holder used when the corresponding process
* does not exist */
int ipc_pipe_holder[2];
Expand All @@ -61,7 +59,7 @@ struct process_table {
* this pipe should only be used by a process to synchronously receive a
* message after he knows that some other process will send it for sure,
* and there's no other job that can overlap in the meantime */
int ipc_sync_pipe[2];
int ipc_sync_pipe;
/* same as above, but holder for non-existing processes */
int ipc_sync_pipe_holder[2];

Expand Down