Skip to content

Commit 694b3ec

Browse files
committed
fix: when kill a connection, it have a change make the killed thread wait forever
the killer thread: shutdown the killed thread vio, this action will close the connection fd and remove the fd from the red-black tree of the epoll_wait fd the killed thread: first run the start_io function, this function will bind the connection fd to the epoll_wait fd. second the connection wait from the epoll_wait when worker thread process a connection, first bind the connection fd to the epoll_wait fd, then killer thread close the connection fd, which will make the killed connection have no change to run any more.
1 parent cc63c04 commit 694b3ec

File tree

3 files changed

+32
-0
lines changed

3 files changed

+32
-0
lines changed

include/violite.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,10 @@ struct Vio {
407407
std::atomic_flag kevent_wakeup_flag = ATOMIC_FLAG_INIT;
408408
#endif
409409

410+
#ifdef HAVE_POOL_OF_THREADS
411+
std::atomic_flag epoll_shutdown_flag = ATOMIC_FLAG_INIT;
412+
#endif
413+
410414
#ifdef HAVE_SETNS
411415
/**
412416
Socket network namespace.

sql/threadpool_unix.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,8 @@ static connection_t *listener(thread_group_t *thread_group) {
712712
*/
713713
for (int i = (listener_picks_event) ? 1 : 0; i < cnt; i++) {
714714
connection_t *c = (connection_t *)native_event_get_userdata(&ev[i]);
715+
Vio* vio = c->thd->get_protocol_classic()->get_vio();
716+
vio->epoll_shutdown_flag.clear();
715717
if (connection_is_high_prio(*c)) {
716718
c->tickets--;
717719
thread_group->high_prio_queue.push_back(c);
@@ -724,6 +726,8 @@ static connection_t *listener(thread_group_t *thread_group) {
724726
if (listener_picks_event) {
725727
/* Handle the first event. */
726728
retval = (connection_t *)native_event_get_userdata(&ev[0]);
729+
Vio* vio = retval->thd->get_protocol_classic()->get_vio();
730+
vio->epoll_shutdown_flag.clear();
727731
mysql_mutex_unlock(&thread_group->mutex);
728732
break;
729733
}
@@ -1403,6 +1407,14 @@ static int start_io(connection_t *connection) {
14031407
Bind to poll descriptor if not yet done.
14041408
*/
14051409
Vio *vio = connection->thd->get_protocol_classic()->get_vio();
1410+
if (vio && vio->epoll_shutdown_flag.test_and_set()) {
1411+
/*
1412+
connection in the shutdown progress,
1413+
return -1 make sure abort this connection
1414+
*/
1415+
vio->epoll_shutdown_flag.clear();
1416+
return -1;
1417+
}
14061418
int fd = mysql_socket_getfd(vio->mysql_socket);
14071419
if (!connection->bound_to_poll_descriptor) {
14081420
connection->bound_to_poll_descriptor = true;

vio/viosocket.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,16 @@ static void vio_wait_until_woken(Vio *vio) {
527527
}
528528
#endif
529529

530+
531+
#ifdef HAVE_POOL_OF_THREADS
532+
static void vio_wait_until_epoll_woken(Vio *vio) {
533+
while (vio->epoll_shutdown_flag.test_and_set()) {
534+
// make connection wake up from epoll_wait in the worker thread.
535+
vio_cancel(vio, SHUT_RD);
536+
}
537+
}
538+
#endif
539+
530540
int vio_shutdown(Vio *vio, int how) {
531541
DBUG_TRACE;
532542

@@ -553,6 +563,12 @@ int vio_shutdown(Vio *vio, int how) {
553563
vio_wait_until_woken(vio);
554564
#endif
555565

566+
#ifdef HAVE_POOL_OF_THREADS
567+
if (vio->thread_id.value() != 0 && vio->epoll_shutdown_flag.test_and_set()) {
568+
vio_wait_until_epoll_woken(vio);
569+
}
570+
#endif
571+
556572
if (mysql_socket_close(vio->mysql_socket)) r = -1;
557573
#ifdef HAVE_KQUEUE
558574
if (vio->kq_fd == -1 || close(vio->kq_fd)) r = -1;

0 commit comments

Comments
 (0)