Skip to content

Commit

Permalink
fix shutdown/close
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengyao-lin committed Dec 4, 2018
1 parent 853f722 commit ebd7bf3
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 23 deletions.
2 changes: 1 addition & 1 deletion app/vmecs/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ load_relay_config(toml_object_t *config)

void sigpipe_handler(int sig)
{
const char msg[] = "pipe broken\n";
const char msg[] = "broken pipe\n";
fd_write(STDERR_FILENO, (byte_t *)msg, sizeof(msg) - 1);
}

Expand Down
8 changes: 5 additions & 3 deletions proto/buf.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,16 @@ vbuffer_drain(vbuffer_t *vbuf)
mutex_unlock(vbuf->mut);
}

size_t
ssize_t
vbuffer_try_read(vbuffer_t *vbuf, byte_t *buf, size_t buf_size)
{
size_t n_read;
ssize_t n_read;

mutex_lock(vbuf->mut);

if (vbuf->w_idx) {
if (vbuf->eof) {
n_read = -1;
} else if (vbuf->w_idx) {
n_read = buf_size > vbuf->w_idx ? vbuf->w_idx : buf_size;

memcpy(buf, vbuf->buf, n_read);
Expand Down
2 changes: 1 addition & 1 deletion proto/buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ vbuffer_wait_drain(vbuffer_t *vbuf);
void
vbuffer_drain(vbuffer_t *vbuf);

size_t
ssize_t // return -1 if closed
vbuffer_try_read(vbuffer_t *vbuf, byte_t *buf, size_t buf_size);

// may block if no data is ready
Expand Down
8 changes: 6 additions & 2 deletions proto/native/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,18 @@ static int
_native_tcp_socket_close(tcp_socket_t *_sock)
{
native_tcp_socket_t *sock = (native_tcp_socket_t *)_sock;
return close(sock->sock);
return socket_shutdown_write(sock->sock);
}

static void
_native_tcp_socket_free(tcp_socket_t *_sock)
{
native_tcp_socket_t *sock = (native_tcp_socket_t *)_sock;
free(sock);

if (sock) {
close(sock->sock);
free(sock);
}
}

native_tcp_socket_t *
Expand Down
41 changes: 33 additions & 8 deletions proto/relay/etcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ typedef struct etcp_relay_conn_t_tag {

struct etcp_relay_conn_t_tag *pair;

// int nclosed;
int nclosed;
} etcp_relay_conn_t;

typedef struct {
Expand All @@ -42,7 +42,7 @@ etcp_relay_conn_new(fd_t in_fd, fd_t out_fd, tcp_socket_t *in, tcp_socket_t *out

ret->pair = NULL;

// ret->nclosed = 0;
ret->nclosed = 0;

return ret;
}
Expand All @@ -66,8 +66,10 @@ etcp_remove_conn(epoll_t epfd, etcp_relay_conn_t *conn)
fd_epoll_ctl(epfd, FD_EPOLL_DEL, conn->in_fd, NULL);
fd_epoll_ctl(epfd, FD_EPOLL_DEL, conn->out_fd, NULL);

tcp_socket_close(conn->in_sock);
tcp_socket_close(conn->out_sock);
if (!conn->nclosed) {
tcp_socket_close(conn->in_sock);
tcp_socket_close(conn->out_sock);
} // else already closed

tcp_socket_free(conn->in_sock);
tcp_socket_free(conn->out_sock);
Expand Down Expand Up @@ -129,7 +131,7 @@ etcp_handle(epoll_t epfd, tcp_outbound_t *outbound, etcp_relay_conn_t *conn, siz
etcp_relay_conn_link(conn1, conn2);

event = (epoll_event_t) {
.events = FD_EPOLL_READ | FD_EPOLL_ET,
.events = FD_EPOLL_READ | FD_EPOLL_RDHUP | FD_EPOLL_ET,
.data = {
.ptr = conn1
}
Expand All @@ -138,7 +140,7 @@ etcp_handle(epoll_t epfd, tcp_outbound_t *outbound, etcp_relay_conn_t *conn, siz
fd_epoll_ctl(epfd, FD_EPOLL_ADD, in_fd, &event);

event = (epoll_event_t) {
.events = FD_EPOLL_READ | FD_EPOLL_ET,
.events = FD_EPOLL_READ | FD_EPOLL_RDHUP | FD_EPOLL_ET,
.data = {
.ptr = conn2
}
Expand All @@ -161,8 +163,31 @@ etcp_handle(epoll_t epfd, tcp_outbound_t *outbound, etcp_relay_conn_t *conn, siz

free(buf);

if (res != -2) {
etcp_remove_conn(epfd, conn);
switch (res) {
case 0:
// read end cloesd writing
// shutdown write to out_sock

tcp_socket_close(conn->out_sock);

conn->nclosed++;
conn->pair->nclosed++;

if (conn->nclosed == 2) {
TRACE("conn closed %p %p", (void *)conn, (void *)conn->pair);
etcp_remove_conn(epfd, conn);
} else {
TRACE("conn in sock write closed %p", (void *)conn);
}

break;

case -2: break; // EAGAIN

case -1:
default:
// res > 0, but write failed
etcp_remove_conn(epfd, conn);
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion proto/socks/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ static int
_socks_tcp_socket_close(tcp_socket_t *_sock)
{
socks_tcp_socket_t *sock = (socks_tcp_socket_t *)_sock;
return close(sock->sock);
return socket_shutdown_write(sock->sock);
}

static void
Expand All @@ -352,6 +352,11 @@ _socks_tcp_socket_free(tcp_socket_t *_sock)

if (sock) {
target_id_free(sock->addr.proxy);

if (close(sock->sock)) {
perror("close");
}

free(sock);
}
}
Expand Down
4 changes: 2 additions & 2 deletions proto/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ typedef fd_t (*tcp_socket_revent_t)(struct tcp_socket_t_tag *sock);

typedef target_id_t *(*tcp_socket_target_t)(struct tcp_socket_t_tag *sock);

typedef int (*tcp_socket_close_t)(struct tcp_socket_t_tag *sock);
typedef void (*tcp_socket_free_t)(struct tcp_socket_t_tag *sock);
typedef int (*tcp_socket_close_t)(struct tcp_socket_t_tag *sock); // NOTE: close is similar to shutdown(SHUT_WR)
typedef void (*tcp_socket_free_t)(struct tcp_socket_t_tag *sock); // and free is similar to close

// an abstract layer for tcp connection
typedef struct tcp_socket_t_tag {
Expand Down
20 changes: 15 additions & 5 deletions proto/vmess/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ _vmess_tcp_socket_read_c(tcp_socket_t *_sock, byte_t *buf, size_t size, bool blo

ssize_t ret = vbuffer_try_read(sock->read_buf, buf, size);

if (ret == -1) {
// buffer closed
return 0;
}

if (!ret) {
if (block) {
res = rbuffer_read(sock->raw_buf, sock->sock, vmess_data_decoder,
Expand All @@ -35,6 +40,7 @@ _vmess_tcp_socket_read_c(tcp_socket_t *_sock, byte_t *buf, size_t size, bool blo
if (trunk.size == 0) {
// remote sent end signal
// no more read is needed
vbuffer_close(sock->read_buf);
ret = 0;
} else {
if (trunk.size > size) {
Expand Down Expand Up @@ -128,6 +134,12 @@ _vmess_tcp_socket_free(tcp_socket_t *_sock)
{
vmess_tcp_socket_t *sock = (vmess_tcp_socket_t *)_sock;

vbuffer_close(sock->read_buf);

if (close(sock->sock)) {
perror("close");
}

vbuffer_free(sock->read_buf);
rbuffer_free(sock->raw_buf);
vmess_config_free(sock->config);
Expand All @@ -145,13 +157,11 @@ _vmess_tcp_socket_close(tcp_socket_t *_sock)
size_t size;
const byte_t *trunk = vmess_serial_end(&size);

vbuffer_close(sock->read_buf);

// write ending trunk
fd_write(sock->sock, trunk, size);

if (close(sock->sock)) {
perror("close");
if (socket_shutdown_write(sock->sock)) {
perror("shutdown");
}

return 0;
Expand Down
2 changes: 2 additions & 0 deletions pub/epoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ fd_epoll_create()
#define FD_EPOLL_READ EPOLLIN
#define FD_EPOLL_WRITE EPOLLOUT
#define FD_EPOLL_ET EPOLLET
#define FD_EPOLL_HUP EPOLLHUP
#define FD_EPOLL_RDHUP EPOLLRDHUP

#endif
7 changes: 7 additions & 0 deletions pub/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
#include "pub/time.h"
#include "pub/fd.h"

// shutdown read
INLINE int
socket_shutdown_write(fd_t sock)
{
return shutdown(sock, SHUT_WR);
}

INLINE int
socket_set_block(fd_t fd, bool blocking)
{
Expand Down

0 comments on commit ebd7bf3

Please sign in to comment.