diff --git a/base/src/main/java/io/vproxy/base/connection/Connection.java b/base/src/main/java/io/vproxy/base/connection/Connection.java index 1a4e025c8..b4539300b 100644 --- a/base/src/main/java/io/vproxy/base/connection/Connection.java +++ b/base/src/main/java/io/vproxy/base/connection/Connection.java @@ -13,6 +13,7 @@ import java.nio.channels.CancelledKeyException; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; public class Connection implements NetFlowRecorder { /** @@ -495,10 +496,13 @@ public void UNSAFE_replaceBuffer(RingBuffer in, RingBuffer out, boolean cleanBuf this.outBuffer = out; } - public void runNoQuickWrite(Runnable r) { + public void runNoQuickWrite(Consumer r) { noQuickWrite = true; - r.run(); - noQuickWrite = false; + try { + r.accept(this); + } finally { + noQuickWrite = false; + } } public static Connection wrap(SocketFD fd, diff --git a/base/src/main/java/io/vproxy/base/util/ByteArray.java b/base/src/main/java/io/vproxy/base/util/ByteArray.java index 3126177e0..4eec6470f 100644 --- a/base/src/main/java/io/vproxy/base/util/ByteArray.java +++ b/base/src/main/java/io/vproxy/base/util/ByteArray.java @@ -40,9 +40,8 @@ static ByteArray from(ByteBuffer buf) { if (buf.hasArray()) { return from(buf.array()).sub(buf.position(), len); } else { - byte[] array = Utils.allocateByteArray(len); - buf.get(array); - return from(array); + var seg = MemorySegment.ofBuffer(buf); + return new MemorySegmentByteArray(seg); } } @@ -132,6 +131,10 @@ default ByteArrayChannel toFullChannel() { return ByteArrayChannel.fromFull(this); } + default ByteArrayChannel toEmptyChannel() { + return ByteArrayChannel.fromEmpty(this); + } + default int uint24(int offset) { return uint8(offset) << 16 | uint8(offset + 1) << 8 | uint8(offset + 2); } diff --git a/base/src/main/java/io/vproxy/base/util/RingBuffer.java b/base/src/main/java/io/vproxy/base/util/RingBuffer.java index c6e6de86d..0c6627c5c 100644 --- a/base/src/main/java/io/vproxy/base/util/RingBuffer.java +++ b/base/src/main/java/io/vproxy/base/util/RingBuffer.java @@ -29,6 +29,10 @@ default int storeBytesFrom(ByteArrayChannel channel) { } } + default int storeBytesFrom(ByteArray array) { + return storeBytesFrom(array.toFullChannel()); + } + int storeBytesFrom(ReadableByteStream channel) throws IOException; default int storeBytesFrom(ByteBuffer buf) { @@ -50,6 +54,10 @@ default int writeTo(ByteArrayChannel channel) { } } + default int writeTo(ByteArray array) { + return writeTo(array.toEmptyChannel()); + } + default int writeTo(WritableByteStream channel) throws IOException { return writeTo(channel, Integer.MAX_VALUE); } diff --git a/base/src/main/java/io/vproxy/base/util/Utils.java b/base/src/main/java/io/vproxy/base/util/Utils.java index 3c0084db0..1813bc186 100644 --- a/base/src/main/java/io/vproxy/base/util/Utils.java +++ b/base/src/main/java/io/vproxy/base/util/Utils.java @@ -765,7 +765,11 @@ public static ByteArray buildPseudoIPv6Header(Ipv6Packet ipv6, int upperType, in } public static int calculateChecksum(ByteArray array, int limit) { - int sum = 0; + var intermediate = calculateChecksumIntermediate(0, array, limit); + return calculateChecksumDoFinal(intermediate); + } + + public static int calculateChecksumIntermediate(int sum, ByteArray array, int limit) { for (int i = 0; i < limit / 2; ++i) { sum += array.uint16(i * 2); while (sum > 0xffff) { @@ -778,6 +782,10 @@ public static int calculateChecksum(ByteArray array, int limit) { sum = (sum & 0xffff) + 1; } } + return sum; + } + + public static int calculateChecksumDoFinal(int sum) { return 0xffff - sum; } diff --git a/base/src/main/java/io/vproxy/base/util/objectpool/ConcurrentObjectPool.java b/base/src/main/java/io/vproxy/base/util/objectpool/ConcurrentObjectPool.java index 66ad18015..8bdda45d1 100644 --- a/base/src/main/java/io/vproxy/base/util/objectpool/ConcurrentObjectPool.java +++ b/base/src/main/java/io/vproxy/base/util/objectpool/ConcurrentObjectPool.java @@ -86,23 +86,9 @@ public Partition(int capacity) { } public boolean add(E e) { - return add(e, 1); - } - - private boolean add(E e, int retry) { - if (retry > 10) { // max retry for 10 times - return false; // too many retries - } - - StorageArray read = this.read.get(); StorageArray write = this.write; - // read and write may be the same when they are swapping - if (read == write) { - // is swapping, try again - return add(e, retry + 1); - } - + // adding is always safe //noinspection RedundantIfStatement if (write.add(e)) { return true; @@ -123,12 +109,7 @@ private E poll(int retry) { StorageArray read = this.read.get(); StorageArray write = this.write; - // read and write may be the same when they are swapping - if (read == write) { - // is swapping, try again - return poll(retry + 1); - } - + // polling is always safe E ret = read.poll(); if (ret != null) { return ret; @@ -138,13 +119,12 @@ private E poll(int retry) { // check whether we can swap (whether $write is full) int writeEnd = write.end.get(); - int writeEndIndicator = write.endIndicator.get(); if (writeEnd < write.capacity) { return null; // capacity not reached, do not swap and return nothing // no retry here because the write array will not change until something written into it } // also we should check whether there are no elements being stored - if (writeEnd != writeEndIndicator) { // element is being stored into the array + if (write.storing.get() != 0) { // element is being stored into the array return poll(retry + 1); // try again } // now we can know that writing operations will not happen in this partition @@ -168,9 +148,9 @@ public int size() { private static class StorageArray { private final int capacity; private final AtomicReferenceArray array; - private final AtomicInteger start = new AtomicInteger(0); - private final AtomicInteger endIndicator = new AtomicInteger(0); - private final AtomicInteger end = new AtomicInteger(0); + private final AtomicInteger start = new AtomicInteger(-1); + private final AtomicInteger end = new AtomicInteger(-1); + private final AtomicInteger storing = new AtomicInteger(0); private StorageArray(int capacity) { this.capacity = capacity; @@ -178,46 +158,51 @@ private StorageArray(int capacity) { } boolean add(E e) { - if (end.get() >= capacity || endIndicator.get() >= capacity) { + storing.incrementAndGet(); + + if (end.get() >= capacity) { + storing.decrementAndGet(); return false; // exceeds capacity } - int index = endIndicator.getAndIncrement(); - // it could still have concurrency between the capacity check and actual $end increment or $endIndicator increment + int index = end.incrementAndGet(); if (index < capacity) { // storing should succeed array.set(index, e); - // increase $end after element actually stored - end.getAndIncrement(); + storing.decrementAndGet(); return true; } else { - // storing should fail - // decrease the endIndicator - endIndicator.getAndDecrement(); + // storing failed + storing.decrementAndGet(); return false; } } E poll() { - if (start.get() >= end.get()) { - return null; // no elements to retrieve + if (start.get() + 1 >= end.get() || start.get() + 1 >= capacity) { + return null; } - int idx = start.getAndIncrement(); - if (idx >= end.get()) { + int idx = start.incrementAndGet(); + if (idx >= end.get() || idx >= capacity) { return null; // concurrent polling } return array.get(idx); } int size() { - int n = endIndicator.get() - start.get(); - //noinspection ManualMinMaxCalculation - return n < 0 ? 0 : n; + int start = this.start.get() + 1; + if (start > capacity) { + return 0; + } + int cap = end.get(); + if (cap > capacity) { + cap = capacity; + } + return cap - start; } void reset() { - end.set(0); - endIndicator.set(0); - start.set(0); + end.set(-1); + start.set(-1); } } } diff --git a/core/src/main/java/io/vproxy/component/proxy/ProcessorConnectionHandler.java b/core/src/main/java/io/vproxy/component/proxy/ProcessorConnectionHandler.java index 3da3dec72..ab084a190 100644 --- a/core/src/main/java/io/vproxy/component/proxy/ProcessorConnectionHandler.java +++ b/core/src/main/java/io/vproxy/component/proxy/ProcessorConnectionHandler.java @@ -96,9 +96,9 @@ void utilWriteData(BackendConnectionHandler.ByteFlow flow, } else { assert Logger.lowLevelDebug("choose to run without zero copy"); - targetConnection.runNoQuickWrite(() -> { + targetConnection.runNoQuickWrite(c -> { int n = sourceConnection.getInBuffer() - .writeTo(targetConnection.getOutBuffer(), flow.currentSegment.bytesToProxy); + .writeTo(c.getOutBuffer(), flow.currentSegment.bytesToProxy); flow.currentSegment.bytesToProxy -= n; assert Logger.lowLevelDebug("proxied " + n + " bytes, still have " + flow.currentSegment.bytesToProxy + " left"); }); @@ -111,8 +111,8 @@ void utilWriteData(BackendConnectionHandler.ByteFlow flow, } else { assert flow.currentSegment.chnl != null; assert Logger.lowLevelDebug("sending bytes, flow.chnl.used = " + flow.currentSegment.chnl.used()); - targetConnection.runNoQuickWrite(() -> - targetConnection.getOutBuffer().storeBytesFrom(flow.currentSegment.chnl)); + targetConnection.runNoQuickWrite(c -> + c.getOutBuffer().storeBytesFrom(flow.currentSegment.chnl)); // check whether this batch sending is done assert Logger.lowLevelDebug("now flow.chnl.used == " + flow.currentSegment.chnl.used()); if (flow.currentSegment.chnl.used() == 0) { diff --git a/extended/src/main/java/io/vproxy/vproxyx/ProxyNexus.java b/extended/src/main/java/io/vproxy/vproxyx/ProxyNexus.java index d4429d17f..c2f883ece 100644 --- a/extended/src/main/java/io/vproxy/vproxyx/ProxyNexus.java +++ b/extended/src/main/java/io/vproxy/vproxyx/ProxyNexus.java @@ -16,7 +16,7 @@ import io.vproxy.pni.array.IntArray; import io.vproxy.vfd.IPPort; import io.vproxy.vproxyx.nexus.*; -import io.vproxy.vproxyx.nexus.entity.ConnectInfo; +import io.vproxy.vproxyx.nexus.entity.PeerAddressInfo; import io.vproxy.vproxyx.uot.FromTcpToUdp; import io.vproxy.vproxyx.uot.FromUdpToTcp; @@ -62,7 +62,7 @@ public static void main0(String[] args) throws Exception { int serverPort = 0; String loadPath = null; var existingConnectTargets = new HashSet(); - var connect = new ArrayList(); + var connect = new ArrayList(); var certificatePath = ""; var privateKeyPath = ""; var cacertPath = ""; @@ -103,22 +103,22 @@ public static void main0(String[] args) throws Exception { int uotPort = 0; if (s.startsWith("uot:")) { s = s.substring("uot:".length()); - } - if (s.contains(":")) { - var uotPortStr = s.substring(s.indexOf(":")); - if (!Utils.isPortInteger(uotPortStr)) { - throw new IllegalArgumentException(uotPortStr + " is not a valid port"); + if (s.contains(":")) { + var uotPortStr = s.substring(0, s.indexOf(":")); + if (!Utils.isPortInteger(uotPortStr)) { + throw new IllegalArgumentException(uotPortStr + " is not a valid port"); + } + uotPort = Integer.parseInt(uotPortStr); + s = s.substring(s.indexOf(":") + 1); + } else { + throw new IllegalArgumentException("uot listening port must be specified by `uot::<...>`"); } - uotPort = Integer.parseInt(uotPortStr); - s = s.substring(s.indexOf(":") + 1); - } else { - throw new IllegalArgumentException("uot listening port must be specified by `uot::<...>`"); } if (!IPPort.validL4AddrStr(s)) { throw new IllegalArgumentException(s + " is not valid ipport in `connect`"); } var ipport = new IPPort(s); - var info = new ConnectInfo(ipport, uotPort); + var info = new PeerAddressInfo(ipport, uotPort); if (existingConnectTargets.contains(ipport)) { throw new IllegalArgumentException(s + " is already specified in `connect`"); } @@ -244,10 +244,9 @@ public static void main0(String[] args) throws Exception { new FromUdpToTcp(loop, uotIPPort, ipport).start(); Logger.alert("uot udp->tcp server listens on " + uotIPPort.formatToIPPortString() + ", will redirect traffic to " + ipport.formatToIPPortString()); - ipport = uotIPPort; } - var peer = NexusPeer.create(nctx, ipport); + var peer = NexusPeer.create(nctx, target); peer.start(); } diff --git a/extended/src/main/java/io/vproxy/vproxyx/nexus/NexusPeer.java b/extended/src/main/java/io/vproxy/vproxyx/nexus/NexusPeer.java index 4578e1acc..31135aadc 100644 --- a/extended/src/main/java/io/vproxy/vproxyx/nexus/NexusPeer.java +++ b/extended/src/main/java/io/vproxy/vproxyx/nexus/NexusPeer.java @@ -13,13 +13,14 @@ import io.vproxy.pni.Allocator; import io.vproxy.pni.array.IntArray; import io.vproxy.vfd.IPPort; +import io.vproxy.vproxyx.nexus.entity.PeerAddressInfo; import io.vproxy.vproxyx.nexus.entity.LinkReq; import java.io.IOException; public class NexusPeer { public final NexusContext nctx; - public final IPPort remoteAddress; + public final PeerAddressInfo remoteAddress; private boolean isServer = false; private volatile boolean isInitialized = false; // controls whether quicConn is returned @@ -29,18 +30,18 @@ public class NexusPeer { private final RingQueue linkUpdateEvents = new RingQueue<>(); - private NexusPeer(NexusContext nctx, IPPort remoteAddress) { + private NexusPeer(NexusContext nctx, PeerAddressInfo remoteAddress) { this.nctx = nctx; this.remoteAddress = remoteAddress; } - public static NexusPeer create(NexusContext nctx, IPPort connectTo) { + public static NexusPeer create(NexusContext nctx, PeerAddressInfo connectTo) { return new NexusPeer(nctx, connectTo); } public static int createAccepted(NexusContext nctx, IPPort remote, QuicConnection connQ, Listener listener, QuicListenerEventNewConnection data, Allocator allocator) { - var peer = new NexusPeer(nctx, remote); + var peer = new NexusPeer(nctx, new PeerAddressInfo(remote, 0)); peer.isServer = true; ConnectionCallback cb = peer.new NexusNodeConnectionCallback(); if (nctx.debug) { @@ -101,14 +102,14 @@ private void doConnect() { if (nctx.debug) { conn.enableTlsSecretDebug(); } - int errcode = conn.start(nctx.clientConfiguration, remoteAddress); + int errcode = conn.start(nctx.clientConfiguration, remoteAddress.target()); if (errcode != 0) { Logger.error(LogType.CONN_ERROR, "starting quic connection to " + remoteAddress + " failed, errcode=" + errcode); conn.close(); return; } quicConn = conn; - Logger.warn(LogType.ALERT, "trying to connect to " + remoteAddress.formatToIPPortString() + " ..."); + Logger.warn(LogType.ALERT, "trying to connect to " + remoteAddress + " ..."); } public void linkUpdateEvent(LinkReq req) { @@ -147,9 +148,9 @@ public void terminate(Connection terminateQuicConn, String reason) { } if (quicConn != null) { if (quicConn.isConnected()) { - Logger.warn(LogType.ALERT, "quic connection " + remoteAddress.formatToIPPortString() + " terminated: " + reason); + Logger.warn(LogType.ALERT, "quic connection " + remoteAddress + " terminated: " + reason); } else { - Logger.warn(LogType.ALERT, "quic connection " + remoteAddress.formatToIPPortString() + " terminated before connected"); + Logger.warn(LogType.ALERT, "quic connection " + remoteAddress + " terminated before connected"); } quicConn.close(); } @@ -180,7 +181,7 @@ public void initialize(Connection quicConn, String nodeName) { nctx.nexus.addNode(self, node, Integer.MAX_VALUE); isInitialized = true; - Logger.alert("connection " + remoteAddress.formatToIPPortString() + " is initialized"); + Logger.alert("connection " + remoteAddress + " is initialized"); if (isServer) { initializeServerActiveControlStream(); @@ -207,10 +208,10 @@ private class NexusNodeConnectionCallback implements ConnectionCallback { @Override public int connected(Connection conn, QuicConnectionEventConnected data) { if (isServer) { - Logger.alert("connection from " + remoteAddress.formatToIPPortString() + " established"); + Logger.alert("connection from " + remoteAddress + " established"); return 0; } - Logger.alert("connected to " + remoteAddress.formatToIPPortString()); + Logger.alert("connected to " + remoteAddress); nctx.loop.getSelectorEventLoop().nextTick(() -> { QuicSocketFD fd; try { diff --git a/extended/src/main/java/io/vproxy/vproxyx/nexus/NexusUtils.java b/extended/src/main/java/io/vproxy/vproxyx/nexus/NexusUtils.java index eaca74b2c..62b045bea 100644 --- a/extended/src/main/java/io/vproxy/vproxyx/nexus/NexusUtils.java +++ b/extended/src/main/java/io/vproxy/vproxyx/nexus/NexusUtils.java @@ -1,5 +1,11 @@ package io.vproxy.vproxyx.nexus; +import io.vproxy.base.util.LogType; +import io.vproxy.base.util.Logger; +import io.vproxy.msquic.QuicStream; +import io.vproxy.pni.Allocator; +import io.vproxy.pni.array.ShortArray; + public class NexusUtils { private NexusUtils() { } @@ -19,4 +25,16 @@ public static boolean isNotValidNodeName(String name) { } return false; } + + public static void setControlStreamPriority(QuicStream stream) { + try (var allocator = Allocator.ofConfined()) { + short priority = (short) 0xffff; + var n = new ShortArray(allocator, 1); + n.set(0, priority); + int err = stream.setParam(0x08000003, 2, n.MEMORY); + if (err != 0) { + Logger.warn(LogType.SYS_ERROR, "setting priority to " + (priority & 0xffff) + " failed with errcode=" + err); + } + } + } } diff --git a/extended/src/main/java/io/vproxy/vproxyx/nexus/StreamHandlers.kt b/extended/src/main/java/io/vproxy/vproxyx/nexus/StreamHandlers.kt index c1c3b0e7c..a5bc47571 100644 --- a/extended/src/main/java/io/vproxy/vproxyx/nexus/StreamHandlers.kt +++ b/extended/src/main/java/io/vproxy/vproxyx/nexus/StreamHandlers.kt @@ -43,6 +43,7 @@ object StreamHandlers { val target: IPPort var nextNodeName = "" val req: Request? + var isControlStream = false try { req = httpconn.readRequest() if (req == null) { @@ -52,6 +53,8 @@ object StreamHandlers { } if (req.method != "CONNECT") { defer { peer.terminate(fd.stream.opts.connection, "passive control stream") } + isControlStream = true + NexusUtils.setControlStreamPriority(fd.stream.streamQ) handleControlRequest(nctx, httpconn, req) handlePassiveControlStreamLoop(nctx, httpconn) return@launch @@ -102,7 +105,11 @@ object StreamHandlers { Logger.access("proxy(" + traceId + "): client=$clientIP:$clientPort src=$srcNode next=$nextNodeName path=$pendingPath dst=$dstNode target=${target.formatToIPPortString()}") } } catch (e: Exception) { - Logger.warn(LogType.INVALID_EXTERNAL_DATA, "failed to handle passive stream from ${peer.remoteAddress}", e) + if (isControlStream) { + Logger.warn(LogType.INVALID_EXTERNAL_DATA, "failed to handle passive control stream from ${peer.remoteAddress}", e) + } else { + Logger.warn(LogType.INVALID_EXTERNAL_DATA, "failed to handle passive stream from ${peer.remoteAddress}", e) + } conn.close() return@launch } @@ -150,7 +157,7 @@ object StreamHandlers { val nextFD = QuicSocketFD.newStream(nctx.debug, quicConn) nextConn = ConnectableConnection.wrap( - nextFD, nextNode.peer.remoteAddress, ConnectionOpts().setTimeout(NexusContext.GENERAL_TIMEOUT), + nextFD, nextNode.peer.remoteAddress.target(), ConnectionOpts().setTimeout(NexusContext.GENERAL_TIMEOUT), RingBuffer.allocateDirect(4096), RingBuffer.allocateDirect(4096) ) nextCosock = CoroutineConnection(nctx.loop, nextConn) @@ -252,10 +259,12 @@ object StreamHandlers { } fun handleActiveControlStream(nctx: NexusContext, peer: NexusPeer, fd: QuicSocketFD, sendEstablishMsg: Boolean) { + NexusUtils.setControlStreamPriority(fd.stream.streamQ) + val conn: ConnectableConnection try { conn = ConnectableConnection.wrap( - fd, peer.remoteAddress, ConnectionOpts().setTimeout(NexusContext.CONTROL_STREAM_TIMEOUT), + fd, peer.remoteAddress.target(), ConnectionOpts().setTimeout(NexusContext.CONTROL_STREAM_TIMEOUT), RingBuffer.allocateDirect(4096), RingBuffer.allocateDirect(4096) ) } catch (e: IOException) { diff --git a/extended/src/main/java/io/vproxy/vproxyx/nexus/entity/ConnectInfo.java b/extended/src/main/java/io/vproxy/vproxyx/nexus/entity/ConnectInfo.java deleted file mode 100644 index a6657cb88..000000000 --- a/extended/src/main/java/io/vproxy/vproxyx/nexus/entity/ConnectInfo.java +++ /dev/null @@ -1,6 +0,0 @@ -package io.vproxy.vproxyx.nexus.entity; - -import io.vproxy.vfd.IPPort; - -public record ConnectInfo(IPPort ipport, int uotPort) { -} diff --git a/extended/src/main/java/io/vproxy/vproxyx/nexus/entity/PeerAddressInfo.java b/extended/src/main/java/io/vproxy/vproxyx/nexus/entity/PeerAddressInfo.java new file mode 100644 index 000000000..a84be880b --- /dev/null +++ b/extended/src/main/java/io/vproxy/vproxyx/nexus/entity/PeerAddressInfo.java @@ -0,0 +1,25 @@ +package io.vproxy.vproxyx.nexus.entity; + +import io.vproxy.vfd.IPPort; + +public record PeerAddressInfo(IPPort ipport, int uotPort) { + public boolean isUOT() { + return uotPort != 0; + } + + public IPPort target() { + if (isUOT()) { + return new IPPort("127.0.0.1", uotPort); + } else { + return ipport; + } + } + + @Override + public String toString() { + if (isUOT()) { + return "uot:" + uotPort + ":" + ipport.formatToIPPortString(); + } + return ipport.formatToIPPortString(); + } +} diff --git a/extended/src/main/java/io/vproxy/vproxyx/uot/ConnectionPair.java b/extended/src/main/java/io/vproxy/vproxyx/uot/ConnectionPair.java new file mode 100644 index 000000000..6f97691af --- /dev/null +++ b/extended/src/main/java/io/vproxy/vproxyx/uot/ConnectionPair.java @@ -0,0 +1,8 @@ +package io.vproxy.vproxyx.uot; + +import io.vproxy.base.connection.Connection; + +public class ConnectionPair { + public Connection small; + public Connection large; +} diff --git a/extended/src/main/java/io/vproxy/vproxyx/uot/FromTcpToUdp.java b/extended/src/main/java/io/vproxy/vproxyx/uot/FromTcpToUdp.java index 619891ce1..1dc9af847 100644 --- a/extended/src/main/java/io/vproxy/vproxyx/uot/FromTcpToUdp.java +++ b/extended/src/main/java/io/vproxy/vproxyx/uot/FromTcpToUdp.java @@ -6,17 +6,20 @@ import io.vproxy.base.util.*; import io.vproxy.base.util.coll.Tuple; import io.vproxy.base.util.direct.DirectMemoryUtils; -import io.vproxy.base.util.nio.ByteArrayChannel; import io.vproxy.vfd.*; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; public class FromTcpToUdp { private final NetEventLoop loop; private final IPPort fromIPPort; private final IPPort toIPPort; private final ServerSock serverSock; + private final Map connIdToDatagramFD = new HashMap<>(); public FromTcpToUdp(NetEventLoop loop, IPPort fromIPPort, IPPort toIPPort) throws IOException { this.loop = loop; @@ -38,45 +41,13 @@ public void acceptFail(ServerHandlerContext ctx, IOException err) { @Override public void connection(ServerHandlerContext ctx, Connection connection) { - DatagramFD udp = null; - try { - udp = FDProvider.get().getProvided().openDatagramFD(); - udp.connect(toIPPort); - } catch (IOException e) { - Logger.error(LogType.SOCKET_ERROR, "failed to open udp sock to " + toIPPort, e); - if (udp != null) { - try { - udp.close(); - } catch (IOException ignore) { - } - } - connection.close(); - return; - } - try { - loop.getSelectorEventLoop().add(udp, EventSet.read(), null, new UDPHandler(connection)); - } catch (IOException e) { - Logger.error(LogType.SYS_ERROR, "failed to add udp sock to event loop", e); - connection.close(); - try { - udp.close(); - } catch (IOException ignore) { - } - return; - } connection.setTimeout(60 * 1000); try { - loop.addConnection(connection, null, new TCPHandler(udp)); + loop.addConnection(connection, null, new TCPHandler()); } catch (IOException e) { Logger.error(LogType.SYS_ERROR, "failed to add connection to event loop", e); connection.close(); - try { - udp.close(); - } catch (IOException ignore) { - } - return; } - Logger.access("received connection from " + connection.remote); } @Override @@ -92,12 +63,16 @@ public void removed(ServerHandlerContext ctx) { } private class UDPHandler implements Handler { - private final Connection conn; - private final ByteBufferEx _buf = DirectMemoryUtils.allocateDirectBuffer(65536); + final DatagramFD udp; + private final long connId; + private final ConnectionPair pair = new ConnectionPair(); + private ByteBufferEx _buf = DirectMemoryUtils.allocateDirectBuffer(65536); private final ByteBuffer buf = _buf.realBuffer(); - private UDPHandler(Connection connection) { - this.conn = connection; + private UDPHandler(DatagramFD udp, long connId, Connection initialConnection) { + this.udp = udp; + this.connId = connId; + this.pair.small = initialConnection; } @Override @@ -112,11 +87,6 @@ public void connected(HandlerContext ctx) { @Override public void readable(HandlerContext ctx) { - if (conn.isClosed()) { - loop.getSelectorEventLoop().remove(ctx.getChannel()); - return; - } - buf.limit(buf.capacity()).position(0); int len; try { @@ -128,18 +98,47 @@ public void readable(HandlerContext ctx) { } buf.flip(); - if (conn.getOutBuffer().free() < TLVConsts.TL_LEN + len) { - Logger.warn(LogType.ALERT, "tcp buffer free size is " + conn.getOutBuffer().free() + - ", while receiving udp packet with length " + len + " from " + toIPPort.formatToIPPortString()); + if (pair.small == null && pair.large == null) { + assert Logger.lowLevelDebug("both small and large connections are null"); return; } - var tl = ByteArray.allocate(TLVConsts.TL_LEN); - tl.set(0, (byte) TLVConsts.TYPE_PACKET); - tl.int16(1, len); - var chnl = ByteArrayChannel.fromFull(tl); + String connRef; + Connection conn; + if (len >= UOTUtils.LARGE_PACKET_LIMIT) { + conn = pair.large; + connRef = "large"; + } else if (len < UOTUtils.SMALL_PACKET_LIMIT) { + conn = pair.small; + connRef = "small"; + } else if (pair.small == null) { + conn = pair.large; + connRef = "large"; + } else if (pair.large == null) { + conn = pair.small; + connRef = "small"; + } else if (pair.large.getOutBuffer().free() > pair.small.getOutBuffer().free()) { + conn = pair.large; + connRef = "large"; + } else { + conn = pair.small; + connRef = "small"; + } - conn.getOutBuffer().storeBytesFrom(chnl); + if (conn == null) { + Logger.warn(LogType.INVALID_STATE, "required connection for " + connRef + " packet is not established yet, udp: " + + getLocalAddress() + " -> " + getRemoteAddress()); + return; + } + + if (conn.getOutBuffer().free() < UOTUtils.HEADER_LEN + len) { + assert Logger.lowLevelDebug("tcp buffer free size is " + conn.getOutBuffer().free() + + ", while receiving udp packet with length " + len + " from " + toIPPort.formatToIPPortString()); + return; + } + + var tl = UOTUtils.buildHeader(UOTUtils.TYPE_PACKET, len); + conn.runNoQuickWrite(c -> c.getOutBuffer().storeBytesFrom(tl)); conn.getOutBuffer().storeBytesFrom(buf); } @@ -154,29 +153,73 @@ public void removed(HandlerContext ctx) { ctx.getChannel().close(); } catch (IOException ignore) { } - _buf.clean(); - conn.close(); + if (_buf != null) { + _buf.clean(); + _buf = null; + } + if (pair.small != null) { + pair.small.close(); + pair.small = null; + } + if (pair.large != null) { + pair.large.close(); + pair.large = null; + } + connIdToDatagramFD.remove(connId); + } + + private IPPort localAddress; + + public String getLocalAddress() { + if (localAddress != null) { + return localAddress.formatToIPPortString(); + } + try { + localAddress = udp.getLocalAddress(); + } catch (IOException e) { + return null; + } + return localAddress.formatToIPPortString(); + } + + private IPPort remoteAddress; + + public String getRemoteAddress() { + if (remoteAddress != null) { + return remoteAddress.formatToIPPortString(); + } + try { + remoteAddress = udp.getRemoteAddress(); + } catch (IOException e) { + return null; + } + return remoteAddress.formatToIPPortString(); } } private class TCPHandler implements ConnectableConnectionHandler { - final DatagramFD udp; - private final TLVParser parser = new TLVParser(); + UDPHandler udp; + private final UOTHeaderParser parser = new UOTHeaderParser(); - private TCPHandler(DatagramFD udp) { - this.udp = udp; + private TCPHandler() { } @Override public void connected(ConnectableConnectionHandlerContext ctx) { - if (!udp.isOpen()) { + if (!udp.udp.isOpen()) { closed(ctx); } } @Override public void readable(ConnectionHandlerContext ctx) { - if (!udp.isOpen()) { + while (ctx.connection.getInBuffer().used() > 0) { + doReadable(ctx); + } + } + + private void doReadable(ConnectionHandlerContext ctx) { + if (udp != null && !udp.udp.isOpen()) { closed(ctx); return; } @@ -185,22 +228,93 @@ public void readable(ConnectionHandlerContext ctx) { return; } - if (parser.type != TLVConsts.TYPE_PACKET) { - Logger.error(LogType.INVALID_EXTERNAL_DATA, "unknown type " + parser.type); + if (parser.type == UOTUtils.TYPE_CONN_ID) { + if (udp != null) { + Logger.warn(LogType.INVALID_EXTERNAL_DATA, + "received CONN_ID message from " + ctx.connection.remote + ", but the message is already received"); + return; + } + if (parser.len != 8) { + Logger.warn(LogType.INVALID_EXTERNAL_DATA, + "received CONN_ID message from " + ctx.connection.remote + ", but message len is not 8"); + return; + } + var connId = parser.buf.getLong(); + initUDPSock(connId, ctx.connection); + return; + } + if (parser.type != UOTUtils.TYPE_PACKET) { + parser.logInvalidExternalData("unknown type " + parser.type); + return; + } + if (udp == null) { + Logger.warn(LogType.INVALID_STATE, + "received PACKET message from " + ctx.connection.remote + ", but udp sock is not initialized yet"); return; } - parser.buf.position(0); try { - udp.write(parser.buf); + udp.udp.write(parser.buf); } catch (IOException e) { Logger.error(LogType.SOCKET_ERROR, "failed to send udp packet to " + toIPPort, e); } } + private void initUDPSock(long connId, Connection connection) { + var udpHandler = connIdToDatagramFD.get(connId); + if (udpHandler != null) { + udp = udpHandler; + if (udpHandler.pair.small == null) { + udpHandler.pair.small = connection; + Logger.access("received connection for small packet: " + connection + + " -> " + udp.getLocalAddress() + " -> " + udp.getRemoteAddress()); + } else if (udpHandler.pair.large == null) { + udpHandler.pair.large = connection; + Logger.access("received connection for large packet: " + connection + + " -> " + udp.getLocalAddress() + " -> " + udp.getRemoteAddress()); + } else { + Logger.access("received orphan connection: " + connection + + " -> " + udp.getLocalAddress() + " -> " + udp.getRemoteAddress()); + } + return; + } + DatagramFD udp = null; + try { + udp = FDProvider.get().getProvided().openDatagramFD(); + udp.configureBlocking(false); + udp.connect(toIPPort); + } catch (IOException e) { + Logger.error(LogType.SOCKET_ERROR, "failed to open udp sock to " + toIPPort, e); + if (udp != null) { + try { + udp.close(); + } catch (IOException ignore) { + } + } + connection.close(); + return; + } + udpHandler = new UDPHandler(udp, connId, connection); + try { + loop.getSelectorEventLoop().add(udp, EventSet.read(), null, udpHandler); + } catch (IOException e) { + Logger.error(LogType.SYS_ERROR, "failed to add udp sock to event loop", e); + connection.close(); + try { + udp.close(); + } catch (IOException ignore) { + } + return; + } + this.udp = udpHandler; + Logger.access("received connection for small packet from " + connection.remote + + " -> " + udpHandler.getLocalAddress() + " -> " + udpHandler.getRemoteAddress()); + connIdToDatagramFD.put(connId, udpHandler); + } + @Override public void writable(ConnectionHandlerContext ctx) { - if (!udp.isOpen()) { + if (!udp.udp.isOpen()) { closed(ctx); } } @@ -216,12 +330,31 @@ public void remoteClosed(ConnectionHandlerContext ctx) { closed(ctx); } + private final AtomicBoolean isClosed = new AtomicBoolean(false); + @Override public void closed(ConnectionHandlerContext ctx) { + if (!isClosed.compareAndSet(false, true)) { + return; + } + ctx.connection.close(); - try { - udp.close(); - } catch (IOException ignore) { + if (udp != null) { + if (udp.pair.small == ctx.connection) { + udp.pair.small = null; + Logger.warn(LogType.ACCESS, "connection for small packet is removed: " + ctx.connection + + " -> " + udp.getLocalAddress() + " -> " + udp.getRemoteAddress()); + } else if (udp.pair.large == ctx.connection) { + udp.pair.large = null; + Logger.warn(LogType.ACCESS, "connection for large packet is removed: " + ctx.connection + + " -> " + udp.getLocalAddress() + " -> " + udp.getRemoteAddress()); + } else { + Logger.warn(LogType.ACCESS, "orphan connection is removed: " + ctx.connection + + " -> " + udp.getLocalAddress() + " -> " + udp.getRemoteAddress()); + } + if (udp.pair.small == null && udp.pair.large == null) { + loop.getSelectorEventLoop().remove(udp.udp); + } } parser.clean(); } diff --git a/extended/src/main/java/io/vproxy/vproxyx/uot/FromUdpToTcp.java b/extended/src/main/java/io/vproxy/vproxyx/uot/FromUdpToTcp.java index 9fd5e02fc..5bccec347 100644 --- a/extended/src/main/java/io/vproxy/vproxyx/uot/FromUdpToTcp.java +++ b/extended/src/main/java/io/vproxy/vproxyx/uot/FromUdpToTcp.java @@ -8,7 +8,6 @@ import io.vproxy.base.util.LogType; import io.vproxy.base.util.Logger; import io.vproxy.base.util.direct.DirectMemoryUtils; -import io.vproxy.base.util.nio.ByteArrayChannel; import io.vproxy.vfd.DatagramFD; import io.vproxy.vfd.EventSet; import io.vproxy.vfd.FDProvider; @@ -18,6 +17,8 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; public class FromUdpToTcp { private final NetEventLoop loop; @@ -26,7 +27,7 @@ public class FromUdpToTcp { private final DatagramFD udp; // remote udp endpoint -> to server connection - private final Map connections = new HashMap<>(); + private final Map connections = new HashMap<>(); public FromUdpToTcp(NetEventLoop loop, IPPort fromIPPort, IPPort toIPPort) throws IOException { this.loop = loop; @@ -34,6 +35,7 @@ public FromUdpToTcp(NetEventLoop loop, IPPort fromIPPort, IPPort toIPPort) throw this.toIPPort = toIPPort; udp = FDProvider.get().getProvided().openDatagramFD(); + udp.configureBlocking(false); } public void start() throws IOException { @@ -42,7 +44,7 @@ public void start() throws IOException { } private class UDPHandler implements Handler { - private final ByteBufferEx _buf = DirectMemoryUtils.allocateDirectBuffer(65536); + private ByteBufferEx _buf = DirectMemoryUtils.allocateDirectBuffer(65536); private final ByteBuffer buf = _buf.realBuffer(); @Override @@ -72,36 +74,36 @@ public void readable(HandlerContext ctx) { buf.flip(); int len = buf.limit(); - var conn = connections.get(remote); - if (conn == null) { - try { - conn = ConnectableConnection.create(toIPPort); - } catch (IOException e) { - Logger.error(LogType.SOCKET_ERROR, "failed to connect to " + toIPPort + " when received udp packet from " + remote, e); - return; - } - connections.put(remote, conn); - conn.setTimeout(60 * 1000); - try { - loop.addConnectableConnection(conn, null, new TCPHandler(remote)); - } catch (IOException e) { - Logger.error(LogType.SYS_ERROR, "failed to add connection " + conn + " to event loop when received udp packet from " + remote, e); - conn.close(); - return; + var pair = connections.get(remote); + if (pair == null) { + pair = new ConnectionPair(); + initPair(pair, remote); + connections.put(remote, pair); + } else { + initPair(pair, remote); + } + + Connection conn; + if (len >= UOTUtils.LARGE_PACKET_LIMIT) { + conn = pair.large; + } else if (len <= UOTUtils.SMALL_PACKET_LIMIT) { + conn = pair.small; + } else { + if (pair.large.getOutBuffer().free() > pair.small.getOutBuffer().free()) { + conn = pair.large; + } else { + conn = pair.small; } } - if (conn.getOutBuffer().free() < TLVConsts.TL_LEN + len) { - Logger.warn(LogType.ALERT, "tcp buffer free size is " + conn.getOutBuffer().free() + - ", while receiving udp packet with length " + len + " from " + remote.formatToIPPortString()); + + if (conn.getOutBuffer().free() < UOTUtils.HEADER_LEN + len) { + assert Logger.lowLevelDebug("tcp buffer free size is " + conn.getOutBuffer().free() + + ", while receiving udp packet with length " + len + " from " + remote.formatToIPPortString()); return; } - var tl = ByteArray.allocate(TLVConsts.TL_LEN); - tl.set(0, (byte) TLVConsts.TYPE_PACKET); - tl.int16(1, len); - var chnl = ByteArrayChannel.fromFull(tl); - - conn.getOutBuffer().storeBytesFrom(chnl); + var tl = UOTUtils.buildHeader(UOTUtils.TYPE_PACKET, len); + conn.runNoQuickWrite(c -> c.getOutBuffer().storeBytesFrom(tl)); conn.getOutBuffer().storeBytesFrom(buf); } @@ -117,15 +119,78 @@ public void removed(HandlerContext ctx) { udp.close(); } catch (IOException ignore) { } - _buf.clean(); + if (_buf != null) { + _buf.clean(); + _buf = null; + } + } + } + + private void initPair(ConnectionPair pair, IPPort remote) { + if (pair.small != null && pair.large != null) { + return; + } + int needConn; + if (pair.small == null && pair.large == null) { + needConn = 2; + } else { + needConn = 1; + } + + var conns = new ConnectableConnection[needConn]; + var __prefaceValue = new byte[8]; + ThreadLocalRandom.current().nextBytes(__prefaceValue); + var prefaceValue = ByteArray.from(__prefaceValue); + + for (int i = 0; i < conns.length; ++i) { + ConnectableConnection conn; + try { + conn = ConnectableConnection.create(toIPPort); + } catch (IOException e) { + Logger.error(LogType.SOCKET_ERROR, "failed to connect to " + toIPPort + " when received udp packet from " + remote, e); + destroyConns(conns, i); + return; + } + conn.setTimeout(5 * 1000); + ByteArray prefaceTLV; + { + prefaceTLV = UOTUtils.buildHeader(UOTUtils.TYPE_CONN_ID, 8); + prefaceTLV = prefaceTLV.concat(prefaceValue); + } + conn.getOutBuffer().storeBytesFrom(prefaceTLV); + try { + loop.addConnectableConnection(conn, null, new TCPHandler(pair, remote)); + } catch (IOException e) { + Logger.error(LogType.SYS_ERROR, "failed to add connection " + conn + " to event loop when received udp packet from " + remote, e); + conn.close(); + destroyConns(conns, i); + return; + } + conns[i] = conn; + } + int idx = 0; + if (pair.small == null) { + pair.small = conns[idx++]; + } + if (pair.large == null) { + //noinspection UnusedAssignment + pair.large = conns[idx++]; + } + } + + private void destroyConns(ConnectableConnection[] conns, int endIndexExclusive) { + for (int i = 0; i < endIndexExclusive; ++i) { + conns[i].close(); } } private class TCPHandler implements ConnectableConnectionHandler { + private final ConnectionPair pair; final IPPort udpRemote; - private final TLVParser parser = new TLVParser(); + private final UOTHeaderParser parser = new UOTHeaderParser(); - private TCPHandler(IPPort udpRemote) { + private TCPHandler(ConnectionPair pair, IPPort udpRemote) { + this.pair = pair; this.udpRemote = udpRemote; } @@ -135,11 +200,24 @@ public void connected(ConnectableConnectionHandlerContext ctx) { closed(ctx); return; } - Logger.access("new connection established, initiated by udp stream from " + udpRemote.formatToIPPortString()); + ctx.connection.setTimeout(60 * 1000); + if (pair.small == ctx.connection) { + Logger.access("new connection for small packet established " + udpRemote.formatToIPPortString() + " -> " + ctx.connection); + } else if (pair.large == ctx.connection) { + Logger.access("new connection for large packet established " + udpRemote.formatToIPPortString() + " -> " + ctx.connection); + } else { + Logger.access("new orphan connection established " + udpRemote.formatToIPPortString() + " -> " + ctx.connection); + } } @Override public void readable(ConnectionHandlerContext ctx) { + while (ctx.connection.getInBuffer().used() > 0) { + doReadable(ctx); + } + } + + private void doReadable(ConnectionHandlerContext ctx) { if (!udp.isOpen()) { closed(ctx); return; @@ -149,12 +227,11 @@ public void readable(ConnectionHandlerContext ctx) { return; } - if (parser.type != TLVConsts.TYPE_PACKET) { - Logger.error(LogType.INVALID_EXTERNAL_DATA, "unknown type " + parser.type); + if (parser.type != UOTUtils.TYPE_PACKET) { + parser.logInvalidExternalData("unknown type " + parser.type); return; } - parser.buf.position(0); try { udp.send(parser.buf, udpRemote); } catch (IOException e) { @@ -180,11 +257,28 @@ public void remoteClosed(ConnectionHandlerContext ctx) { closed(ctx); } + private final AtomicBoolean isClosed = new AtomicBoolean(false); + @Override public void closed(ConnectionHandlerContext ctx) { + if (!isClosed.compareAndSet(false, true)) { + return; + } + ctx.connection.close(); - connections.remove(udpRemote); parser.clean(); + if (pair.small == ctx.connection) { + pair.small = null; + Logger.warn(LogType.ACCESS, "connection for small packet is removed: " + udpRemote.formatToIPPortString() + " -> " + ctx.connection); + } else if (pair.large == ctx.connection) { + pair.large = null; + Logger.warn(LogType.ACCESS, "connection for large packet is removed: " + udpRemote.formatToIPPortString() + " -> " + ctx.connection); + } else { + Logger.warn(LogType.ACCESS, "orphan connection is removed: " + udpRemote.formatToIPPortString() + " -> " + ctx.connection); + } + if (pair.small == null && pair.large == null) { + connections.remove(udpRemote); + } } @Override diff --git a/extended/src/main/java/io/vproxy/vproxyx/uot/TLVConsts.java b/extended/src/main/java/io/vproxy/vproxyx/uot/TLVConsts.java deleted file mode 100644 index 5f1cee89f..000000000 --- a/extended/src/main/java/io/vproxy/vproxyx/uot/TLVConsts.java +++ /dev/null @@ -1,9 +0,0 @@ -package io.vproxy.vproxyx.uot; - -public class TLVConsts { - public static final int TYPE_PACKET = 1; - public static final int TL_LEN = 1 + 2; - - private TLVConsts() { - } -} diff --git a/extended/src/main/java/io/vproxy/vproxyx/uot/TLVParser.java b/extended/src/main/java/io/vproxy/vproxyx/uot/TLVParser.java deleted file mode 100644 index a130a9f14..000000000 --- a/extended/src/main/java/io/vproxy/vproxyx/uot/TLVParser.java +++ /dev/null @@ -1,75 +0,0 @@ -package io.vproxy.vproxyx.uot; - -import io.vproxy.base.connection.ConnectionHandlerContext; -import io.vproxy.base.util.ByteBufferEx; -import io.vproxy.base.util.direct.DirectMemoryUtils; -import io.vproxy.base.util.nio.ByteArrayChannel; - -import java.nio.ByteBuffer; - -public class TLVParser { - private int state = 0; - // 0 -> idle, expecting type - // 1 -> type, expecting len - // 2 -> len[0], expecting len[1] - // 3 -> len[1], expecting data - private final ByteBufferEx _buf = DirectMemoryUtils.allocateDirectBuffer(65536); - public final ByteBuffer buf = _buf.realBuffer(); - private final ByteArrayChannel chnl = ByteArrayChannel.fromEmpty(1); - public int type; - public int len; - - public TLVParser() { - } - - public boolean parse(ConnectionHandlerContext ctx) { - int n; - switch (state) { - case 0: - chnl.reset(); - n = ctx.connection.getInBuffer().writeTo(chnl); - if (n == 0) { - // nothing read ... - return false; - } - type = chnl.getArray().get(0) & 0xff; - state = 1; - case 1: - chnl.reset(); - n = ctx.connection.getInBuffer().writeTo(chnl); - if (n == 0) { - // nothing read ... - return false; - } - len = chnl.getArray().get(0) & 0xff; - state = 2; - case 2: - chnl.reset(); - n = ctx.connection.getInBuffer().writeTo(chnl); - if (n == 0) { - // nothing read ... - return false; - } - len = len << 8; - len |= chnl.getArray().get(0) & 0xff; - state = 3; - case 3: - if (len == 0) { - state = 0; - break; - } - buf.limit(len).position(0); - ctx.connection.getInBuffer().writeTo(buf); - if (buf.limit() == buf.position()) { - state = 0; - break; - } - return false; - } - return true; - } - - public void clean() { - _buf.clean(); - } -} diff --git a/extended/src/main/java/io/vproxy/vproxyx/uot/UOTHeaderParser.java b/extended/src/main/java/io/vproxy/vproxyx/uot/UOTHeaderParser.java new file mode 100644 index 000000000..c10fa4e93 --- /dev/null +++ b/extended/src/main/java/io/vproxy/vproxyx/uot/UOTHeaderParser.java @@ -0,0 +1,79 @@ +package io.vproxy.vproxyx.uot; + +import io.vproxy.base.connection.ConnectionHandlerContext; +import io.vproxy.base.util.ByteArray; +import io.vproxy.base.util.ByteBufferEx; +import io.vproxy.base.util.LogType; +import io.vproxy.base.util.Logger; +import io.vproxy.base.util.direct.DirectMemoryUtils; +import io.vproxy.base.util.nio.ByteArrayChannel; + +import java.nio.ByteBuffer; + +public class UOTHeaderParser { + private int state; + // 0: idle, expecting header + // 1: reading header, expecting value + // 2: value + private ByteBufferEx _buf = DirectMemoryUtils.allocateDirectBuffer(65536); + public final ByteBuffer buf = _buf.realBuffer(); + private final ByteArrayChannel chnl = ByteArrayChannel.fromEmpty(UOTUtils.HEADER_LEN); + public byte type; + public int len; + + public UOTHeaderParser() { + } + + public boolean parse(ConnectionHandlerContext ctx) { + int n; + switch (state) { + case 0: + chnl.reset(); + state = 1; + case 1: + n = ctx.connection.getInBuffer().writeTo(chnl); + if (n == 0) { + // nothing read + return false; + } + if (chnl.free() != 0) { + // not fully read yet + return false; + } + type = chnl.getArray().get(0); + len = chnl.getArray().uint16(2); + buf.limit(len).position(0); + state = 2; + case 2: + if (len == 0) { + state = 0; + break; + } + ctx.connection.getInBuffer().writeTo(buf); + if (buf.limit() == buf.position()) { + if (buf.limit() != len) { + Logger.shouldNotHappen("buf.limit() == " + buf.limit() + ", but len == " + len); + } + state = 0; + buf.position(0); + break; + } + return false; + } + return true; + } + + public void logInvalidExternalData(String msg) { + Logger.error(LogType.INVALID_EXTERNAL_DATA, msg + + ", type=" + type + ", len=" + len + + ", data[" + buf.position() + ":" + buf.limit() + "]\n" + + ByteArray.from(buf).hexDump()); + } + + public void clean() { + if (_buf != null) { + _buf.clean(); + _buf = null; + } + } +} diff --git a/extended/src/main/java/io/vproxy/vproxyx/uot/UOTUtils.java b/extended/src/main/java/io/vproxy/vproxyx/uot/UOTUtils.java new file mode 100644 index 000000000..961bac9cf --- /dev/null +++ b/extended/src/main/java/io/vproxy/vproxyx/uot/UOTUtils.java @@ -0,0 +1,24 @@ +package io.vproxy.vproxyx.uot; + +import io.vproxy.base.util.ByteArray; + +public class UOTUtils { + public static final byte TYPE_CONN_ID = 1; + public static final byte TYPE_PACKET = 2; + + public static final int HEADER_LEN = 1 /*type*/ + 1 /*flags*/ + 2 /*len*/; + + public static final int LARGE_PACKET_LIMIT = 1000; + public static final int SMALL_PACKET_LIMIT = 200; + + private UOTUtils() { + } + + public static ByteArray buildHeader(byte type, int len) { + var ret = ByteArray.allocate(HEADER_LEN); + ret.set(0, type); + ret.int16(2, len); + + return ret; + } +}