Skip to content

Commit

Permalink
[nexus] fix proxy nexus and add debug option
Browse files Browse the repository at this point in the history
  • Loading branch information
wkgcass committed Jun 30, 2024
1 parent a97d625 commit 6690001
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 11 deletions.
21 changes: 19 additions & 2 deletions extended/src/main/java/io/vproxy/vproxyx/ProxyNexus.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import io.vproxy.msquic.MsQuicUpcall;
import io.vproxy.msquic.MsQuicUtils;
import io.vproxy.msquic.QuicCertificateFile;
import io.vproxy.msquic.callback.ListenerCallback;
import io.vproxy.msquic.callback.ListenerCallbackList;
import io.vproxy.msquic.wrap.Configuration;
import io.vproxy.msquic.wrap.Listener;
import io.vproxy.pni.Allocator;
Expand All @@ -34,6 +36,7 @@ public class ProxyNexus {
certificate=<cert-pem-path> Certificate used by the QUIC
privatekey=<key-pem-path> Private key used by the QUIC
cacert=<ca-cert-pem-path> Ca-certificate used by the QUIC
debug=<on|off> Enable or disable debug logging
Api:
curl -X POST /apis/v1.0/proxies --data '{"node":"{nodeName}", "target":"{host}:{port}", "listen":"{port}"}'
curl -X GET /apis/v1.0/proxies
Expand All @@ -57,6 +60,7 @@ public static void main0(String[] args) throws Exception {
var certificatePath = "";
var privateKeyPath = "";
var cacertPath = "";
var debug = false;
for (var arg : args) {
if (arg.equals("-h") || arg.equals("--help") || arg.equals("-help") || arg.equals("help")) {
System.out.println(HELP_STR);
Expand Down Expand Up @@ -105,6 +109,15 @@ public static void main0(String[] args) throws Exception {
privateKeyPath = arg.substring("privatekey=".length()).trim();
} else if (arg.startsWith("cacert=")) {
cacertPath = arg.substring("cacert=".length()).trim();
} else if (arg.startsWith("debug=")) {
var value = arg.substring("debug=".length()).trim();
if (value.equals("on")) {
debug = true;
} else if (value.equals("off")) {
debug = false;
} else {
throw new IllegalArgumentException("debug=" + value + " is not valid");
}
} else {
throw new IllegalArgumentException("unknown argument: " + arg);
}
Expand Down Expand Up @@ -197,7 +210,7 @@ public static void main0(String[] args) throws Exception {

var nexus = new Nexus();
var resources = new ResHolder();
var nctx = new NexusContext(nodeName, nexus, resources, loop, reg, clientConf, serverConf);
var nctx = new NexusContext(nodeName, nexus, resources, loop, reg, clientConf, serverConf, debug);

var self = new NexusNode(nodeName, null);
nexus.setSelfNode(self);
Expand All @@ -208,7 +221,11 @@ public static void main0(String[] args) throws Exception {

if (serverPort > 0) {
var listenerAllocator = Allocator.ofUnsafe();
var lsn = new Listener(new Listener.Options(reg, listenerAllocator, new NexusQuicListenerCallback(nctx),
ListenerCallback cb = new NexusQuicListenerCallback(nctx);
if (debug) {
cb = ListenerCallbackList.withLog(cb);
}
var lsn = new Listener(new Listener.Options(reg, listenerAllocator, cb,
ref -> reg.opts.registrationQ.openListener(
MsQuicUpcall.listenerCallback, ref.MEMORY, retCode, listenerAllocator
)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@ public class NexusContext {
public final Registration registration;
public final Configuration clientConfiguration;
public final Configuration serverConfiguration;
public final boolean debug;

public NexusContext(String selfNodeName, Nexus nexus, ResHolder resources, NetEventLoop loop, Registration registration,
Configuration clientConfiguration, Configuration serverConfiguration) {
Configuration clientConfiguration, Configuration serverConfiguration,
boolean debug) {
this.selfNodeName = selfNodeName;
this.nexus = nexus;
this.resources = resources;
this.loop = loop;
this.registration = registration;
this.clientConfiguration = clientConfiguration;
this.serverConfiguration = serverConfiguration;
this.debug = debug;
}
}
26 changes: 20 additions & 6 deletions extended/src/main/java/io/vproxy/vproxyx/nexus/NexusPeer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.vproxy.base.util.coll.RingQueue;
import io.vproxy.msquic.*;
import io.vproxy.msquic.callback.ConnectionCallback;
import io.vproxy.msquic.callback.ConnectionCallbackList;
import io.vproxy.msquic.wrap.Connection;
import io.vproxy.msquic.wrap.Listener;
import io.vproxy.pni.Allocator;
Expand Down Expand Up @@ -41,8 +42,15 @@ public static int createAccepted(NexusContext nctx, IPPort remote,
QuicConnection connQ, Listener listener, QuicListenerEventNewConnection data, Allocator allocator) {
var peer = new NexusPeer(nctx, remote);
peer.isServer = true;
peer.quicConn = new Connection(new Connection.Options(listener, allocator, peer.new NexusNodeConnectionCallback(), connQ));
ConnectionCallback cb = peer.new NexusNodeConnectionCallback();
if (nctx.debug) {
cb = ConnectionCallbackList.withLog(cb, true);
}
peer.quicConn = new Connection(new Connection.Options(listener, allocator, cb, connQ));
peer.quicConn.setConnectionInfo(data);
if (nctx.debug) {
peer.quicConn.enableTlsSecretDebug();
}
connQ.setCallbackHandler(MsQuicUpcall.connectionCallback, peer.quicConn.ref.MEMORY);
var err = connQ.setConfiguration(nctx.serverConfiguration.opts.configurationQ);
if (err != 0) {
Expand Down Expand Up @@ -76,8 +84,11 @@ private void doConnect() {
Connection conn;
try (var tmpAllocator = Allocator.ofConfined()) {
var returnStatus = new IntArray(tmpAllocator, 1);
conn = new Connection(new Connection.Options(nctx.registration, allocator,
new NexusNodeConnectionCallback(),
ConnectionCallback cb = new NexusNodeConnectionCallback();
if (nctx.debug) {
cb = ConnectionCallbackList.withLog(cb, true);
}
conn = new Connection(new Connection.Options(nctx.registration, allocator, cb,
ref -> nctx.registration.opts.registrationQ.openConnection(
MsQuicUpcall.connectionCallback, ref.MEMORY, returnStatus, allocator
)));
Expand All @@ -87,6 +98,9 @@ private void doConnect() {
return;
}
}
if (nctx.debug) {
conn.enableTlsSecretDebug();
}
int errcode = conn.start(nctx.clientConfiguration, remoteAddress);
if (errcode != 0) {
Logger.error(LogType.CONN_ERROR, "starting quic connection to " + remoteAddress + " failed, errcode=" + errcode);
Expand Down Expand Up @@ -180,7 +194,7 @@ private void initializeServerActiveControlStream() {
}
QuicSocketFD fd;
try {
fd = QuicSocketFD.newStream(quicConn);
fd = QuicSocketFD.newStream(nctx.debug, quicConn);
} catch (IOException e) {
Logger.error(LogType.CONN_ERROR, "failed to initiate quic stream to " + quicConn.getRemoteAddress(), e);
terminate(quicConn, "failed to initiate quic stream");
Expand All @@ -200,7 +214,7 @@ public int connected(Connection conn, QuicConnectionEventConnected data) {
nctx.loop.getSelectorEventLoop().nextTick(() -> {
QuicSocketFD fd;
try {
fd = QuicSocketFD.newStream(conn);
fd = QuicSocketFD.newStream(nctx.debug, conn);
} catch (IOException e) {
Logger.error(LogType.SOCKET_ERROR, "failed to create stream from " + conn, e);
conn.close();
Expand All @@ -219,7 +233,7 @@ public int shutdownComplete(Connection conn, QuicConnectionEventConnectionShutdo

@Override
public int peerStreamStarted(Connection conn, QuicConnectionEventPeerStreamStarted data) {
var fd = QuicSocketFD.wrapAcceptedStream(conn, data.getStream());
var fd = QuicSocketFD.wrapAcceptedStream(nctx.debug, conn, data.getStream());
if (isInitialized) {
StreamHandlers.INSTANCE.handleAccepted(nctx, NexusPeer.this, fd);
} else if (isServer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ object StreamHandlers {

val quicConn = nextNode.peer.quicConnection ?: return resultCallback(400, "node $nextNode is disconnected")

val nextFD = QuicSocketFD.newStream(quicConn)
val nextFD = QuicSocketFD.newStream(nctx.debug, quicConn)
nextConn = ConnectableConnection.wrap(
nextFD, nextNode.peer.remoteAddress, ConnectionOpts().setTimeout(NexusContext.GENERAL_TIMEOUT),
RingBuffer.allocateDirect(4096), RingBuffer.allocateDirect(4096)
Expand Down Expand Up @@ -406,7 +406,6 @@ object StreamHandlers {
Logger.warn(LogType.INVALID_EXTERNAL_DATA, "unexpected request from ${httpconn.conn.remote()}: ${req.method} ${req.uri}")
httpconn.response(404).send()
}
handleLinkStatusAdvertisement(nctx, httpconn, req)
}

private suspend fun handleLinkStatusAdvertisement(nctx: NexusContext, httpconn: CoroutineHttp1ServerConnection, req: Request) {
Expand Down

0 comments on commit 6690001

Please sign in to comment.