Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* @param maxSocketAcceptThreads maximum amount of threads which will be spawned to handle incoming SSL socket
* accepts, needed because of length SSL handshake; at same time, we don't want it to
* be unlimited, to not run out of threads on some kind of DOS
* @param quickAck use TCP_QUICKACK on sync sockets after each read
*/
@ConfigData("socket")
public record SocketConfig(
Expand All @@ -37,4 +38,5 @@ public record SocketConfig(
@ConfigProperty(defaultValue = "true") boolean tcpNoDelay,
@ConfigProperty(defaultValue = "false") boolean gzipCompression,
@ConfigProperty(defaultValue = "10") int waitBetweenConnectionRetries,
@ConfigProperty(defaultValue = "30") int maxSocketAcceptThreads) {}
@ConfigProperty(defaultValue = "30") int maxSocketAcceptThreads,
@ConfigProperty(defaultValue = "true") boolean quickAck) {}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public interface Connection {

/**
* Is there currently a valid connection from me to the member at the given index in the address book?
*
* <p>
* If this method returns {@code true}, the underlying socket is guaranteed to be non-null.
*
* @return true is connected, false otherwise.
Expand All @@ -58,26 +58,23 @@ public interface Connection {
* Returns the current timeout value of this connection.
*
* @return the current timeout value in milliseconds
* @throws SocketException
* if there is an error in the underlying protocol, such as a TCP error.
* @throws SocketException if there is an error in the underlying protocol, such as a TCP error.
*/
int getTimeout() throws SocketException;

/**
* Sets the timeout of this connection.
*
* @param timeoutMillis
* The timeout value to set in milliseconds. A value of zero is treated as an infinite timeout.
* @throws SocketException
* if there is an error in the underlying protocol, such as a TCP error.
* @param timeoutMillis The timeout value to set in milliseconds. A value of zero is treated as an infinite
* timeout.
* @throws SocketException if there is an error in the underlying protocol, such as a TCP error.
*/
void setTimeout(final long timeoutMillis) throws SocketException;

/**
* Initialize {@code this} instance for a gossip session.
*
* @throws IOException
* if the connection is broken
* @throws IOException if the connection is broken
*/
void initForSync() throws IOException;

Expand All @@ -103,4 +100,10 @@ public interface Connection {
default String generateDescription() {
return String.format("%s %s %s", getSelfId(), isOutbound() ? "->" : "<-", getOtherId());
}

/**
* Optional call by client to indicate that read operation has finished. Some implementations of Connection might
* use it to optimize network communication (by sending quick ack for example)
*/
default void afterRead() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.net.SocketException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import jdk.net.ExtendedSocketOptions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hiero.base.io.exceptions.BadIOException;
Expand All @@ -36,6 +37,7 @@
private final boolean outbound;
private final String description;
private final Configuration configuration;
private final boolean quickAck;

/**
* @param connectionTracker tracks open connections
Expand Down Expand Up @@ -71,6 +73,7 @@
this.dis = dis;
this.dos = dos;
this.configuration = configuration;
this.quickAck = configuration.getConfigData(SocketConfig.class).quickAck();
Comment on lines 73 to +76
Copy link
Contributor

@mxtartaglia-sl mxtartaglia-sl Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we consolidate both config and quickACk and store
this.socketConfig = configuration.getConfigData(SocketConfig.class); instead?

i see many of these cases where we store both the configuration instance and one particular value of the configuration as a field and its is confusing IMO

}

/**
Expand Down Expand Up @@ -197,4 +200,20 @@
public String getDescription() {
return description;
}

/**
* {@inheritDoc}
*/
@Override
public void afterRead() {
try {
if (quickAck) {
// quick ack has to be set after each read - it just sends instant acknowledgement
// it is NOT implemented on Mac as of 2025, but shouldn't hurt, except for some wasted cpu cycles
getSocket().setOption(ExtendedSocketOptions.TCP_QUICKACK, true);

Check warning on line 213 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/network/SocketConnection.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/network/SocketConnection.java#L213

Added line #L213 was not covered by tests
}
} catch (final IOException e) {

Check warning on line 215 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/network/SocketConnection.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/network/SocketConnection.java#L215

Added line #L215 was not covered by tests
// we can silently ignore it, as it is optional
}
}

Check warning on line 218 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/network/SocketConnection.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/network/SocketConnection.java#L217-L218

Added lines #L217 - L218 were not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,8 @@ private void readMessages(@NonNull final Connection connection) throws IOExcepti
pingHandler.handleIncomingPingReply(pingReply);
break;
}

connection.afterRead();
}
}
} finally {
Expand Down
Loading