Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client is unable to receive a response after sending a message using the control stream #320

Open
funky-eyes opened this issue Jan 8, 2025 · 1 comment

Comments

@funky-eyes
Copy link

server

    private static Consumer<ChannelPipeline> configServerPipeline(URL url) {
        int heartbeat = UrlUtils.getHeartbeat(url);
        NettyHttp3ProtocolSelectorHandler selectorHandler =
                new NettyHttp3ProtocolSelectorHandler(url, ScopeModelUtil.getFrameworkModel(url.getScopeModel()));
        return pipeline -> {
            pipeline.addLast(new Http3ServerConnectionHandler(new ChannelInitializer<QuicStreamChannel>() {
                @Override
                protected void initChannel(QuicStreamChannel ch) {
                    ch.pipeline().addLast(new HttpWriteQueueHandler()).addLast(new FlushConsolidationHandler(64, true))
                        .addLast(NettyHttp3FrameCodec.INSTANCE).addLast(selectorHandler);
                }
            }, new TripleHttp3PingPongHandler(heartbeat), null, null, true));
            pipeline.addLast(new Http3TripleServerConnectionHandler());
        };

client

    private static Consumer<ChannelPipeline> configClientPipeline(URL url) {
        int heartbeat = UrlUtils.getHeartbeat(url);
        TripleHttp3PingPongHandler tripleHttp3PingPongHandler = new TripleHttp3PingPongHandler(heartbeat);
        return pipeline -> {
            pipeline.addLast(
                new Http3ClientConnectionHandler(tripleHttp3PingPongHandler, null, null, null, true));
            pipeline.addLast(Http3ClientFrameCodec.INSTANCE);
            pipeline.addLast(new IdleStateHandler(heartbeat, 0, 0, TimeUnit.MILLISECONDS));
            pipeline.addLast(tripleHttp3PingPongHandler);
        };

The server level shares the channehandle with the client side

public class TripleHttp3PingPongHandler extends TriplePingPongHandler {

    private static final ErrorTypeAwareLogger log = LoggerFactory.getErrorTypeAwareLogger(TripleHttp3PingPongHandler.class);

    private final AtomicBoolean alive = new AtomicBoolean(true);

    private static final int PING_PONG_TYPE = 0x45;

    private GracefulShutdown gracefulShutdown;

    public TripleHttp3PingPongHandler(long pingAckTimeout) {
        super(10000);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        QuicStreamChannel streamChannel = Http3.getLocalControlStream(ctx.channel());
        Optional.ofNullable(streamChannel).ifPresent(channel -> sendPingFrame(ctx, streamChannel));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof Http3UnknownFrame) {
            Http3UnknownFrame http3UnknownFrame = (Http3UnknownFrame)msg;
            if (http3UnknownFrame.type() == PING_PONG_TYPE) {
                sendPingFrame(ctx);
            }
        }
        if (msg instanceof Http3GoAwayFrame) {
            if (!alive.get()) {
                ctx.fireUserEventTriggered(new DefaultHttp2GoAwayFrame(((Http3GoAwayFrame)msg).id()));
            }
        }
        super.channelRead(ctx, msg);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        Optional.ofNullable(pingAckTimeoutFuture).ifPresent(future -> future.cancel(true));
        pingAckTimeoutFuture = null;
    }

    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        alive.set(false);
    }

    private void sendPingFrame(ChannelHandlerContext ctx) {
            sendPingFrame(ctx, ctx.channel());
    }

    private void sendPingFrame(ChannelHandlerContext ctx, Channel controlStream) {
        if (alive.get()) {
            pingAckTimeoutFuture = ctx.executor().schedule(new HealthCheckChannelTask(ctx, controlStream, alive),
                pingAckTimeout, TimeUnit.MILLISECONDS);
        } else if (gracefulShutdown == null) {
            gracefulShutdown = new GracefulShutdown(ctx, "app_requested", ctx.voidPromise());
            gracefulShutdown.gracefulHttp3Shutdown();
        }
    }

    private static class HealthCheckChannelTask implements Runnable {

        private final ChannelHandlerContext ctx;
        private final AtomicBoolean alive;
        private final Channel controlStream;
        public HealthCheckChannelTask(ChannelHandlerContext ctx,Channel controlStream, AtomicBoolean alive) {
            this.ctx = ctx;
            this.alive = alive;
            this.controlStream = controlStream;
        }

        @Override
        public void run() {
            Optional.ofNullable(controlStream).ifPresent(channel -> {
                DefaultHttp2PingFrame pingFrame = new DefaultHttp2PingFrame(0);
                Http2Flags flags = pingFrame.ack() ? new Http2Flags().ack(true) : new Http2Flags();
                ByteBuf buf = ctx.alloc().buffer(FRAME_HEADER_LENGTH + PING_FRAME_PAYLOAD_LENGTH);
                try {
                    buf.writeMedium(PING_FRAME_PAYLOAD_LENGTH);
                    buf.writeByte(PING);
                    buf.writeByte(flags.value());
                    buf.writeInt(0);
                    buf.writeLong(pingFrame.content());
                    Http3UnknownFrame frame = new DefaultHttp3UnknownFrame(PING_PONG_TYPE, buf);
                    channel.writeAndFlush(frame).addListener(future -> {
                        if (!future.isSuccess()) {
                            alive.compareAndSet(true, false);
                            ctx.close();
                        }
                        log.info("ping-pong");
                    });
                } catch (Exception e) {
                    log.error("Failed to send ping frame", e);
                }
            });
        }
    }

}

When the client's handle#channelActive is triggered, it obtains the controlStreamChannel to send a heartbeat message. After sending the heartbeat, the server's channelRead is able to receive the message and responds with a heartbeat. Based on the logs, the server successfully responds to the heartbeat. However, the client's handler is not triggered, and after debugging the ByteToMessageDecoder and Http3FrameCodec's channelRead methods, as well as the fireChannelRead method in DefaultChannelPipeline, none of the breakpoints are hit. As a result, after the client sends the first heartbeat, it is unable to send the subsequent heartbeat. How can I resolve this issue?

@funky-eyes
Copy link
Author

Here is an example for this issue: https://github.com/netty/netty-incubator-codec-http3/compare/main...funky-eyes:20251010?expand=1

In my fork branch, I have added the controlstream handler. As you can see, after the client sends the request, the server receives it and responds, but the client does not receive the response.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant