Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public interface EthProtocolConfiguration {
int DEFAULT_MAX_GET_RECEIPTS = 256;
int DEFAULT_MAX_GET_NODE_DATA = 384;
int DEFAULT_MAX_GET_POOLED_TRANSACTIONS = 256;
int DEFAULT_MAX_TRANSACTIONS_PER_MESSAGE = 4096;
int DEFAULT_MAX_CAPABILITY = Integer.MAX_VALUE;
int DEFAULT_MIN_CAPABILITY = 0;

Expand Down Expand Up @@ -73,6 +74,11 @@ default int getMaxGetPooledTransactions() {
return DEFAULT_MAX_GET_POOLED_TRANSACTIONS;
}

@Value.Default
default int getMaxTransactionsPerMessage() {
return DEFAULT_MAX_TRANSACTIONS_PER_MESSAGE;
}

@Value.Default
default int getMaxEthCapability() {
return DEFAULT_MAX_CAPABILITY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,13 @@ static TransactionPool createTransactionPool(
final TransactionsMessageHandler transactionsMessageHandler =
new TransactionsMessageHandler(
ethContext.getScheduler(),
new TransactionsMessageProcessor(transactionTracker, transactionPool, metrics),
transactionPoolConfiguration.getUnstable().getTxMessageKeepAliveSeconds());
new TransactionsMessageProcessor(
transactionTracker,
transactionPool,
metrics,
ethProtocolConfiguration.getMaxTransactionsPerMessage()),
transactionPoolConfiguration.getUnstable().getTxMessageKeepAliveSeconds(),
ethProtocolConfiguration.getMaxMessageSize());

final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler =
new NewPooledTransactionHashesMessageHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,34 @@ class TransactionsMessageHandler implements EthMessages.MessageCallback {
private final TransactionsMessageProcessor transactionsMessageProcessor;
private final EthScheduler scheduler;
private final Duration txMsgKeepAlive;
private final int maxMessageSize;
private final AtomicBoolean isEnabled = new AtomicBoolean(false);

public TransactionsMessageHandler(
final EthScheduler scheduler,
final TransactionsMessageProcessor transactionsMessageProcessor,
final int txMsgKeepAliveSeconds) {
final int txMsgKeepAliveSeconds,
final int maxMessageSize) {
this.scheduler = scheduler;
this.transactionsMessageProcessor = transactionsMessageProcessor;
this.txMsgKeepAlive = Duration.ofSeconds(txMsgKeepAliveSeconds);
this.maxMessageSize = maxMessageSize;
}

@Override
public void exec(final EthMessage message) {
if (isEnabled.get()) {
final MessageData rawMessage = message.getData();
if (rawMessage.getSize() > maxMessageSize) {
LOG.debug(
"Oversized transactions message received ({} bytes), disconnecting: {}",
rawMessage.getSize(),
message.getPeer());
message
.getPeer()
.disconnect(DisconnectReason.BREACH_OF_PROTOCOL_MALFORMED_MESSAGE_RECEIVED);
return;
}
final Instant startedAt = now();
scheduler.scheduleTxWorkerTask(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,17 @@ class TransactionsMessageProcessor {
private final TransactionPool transactionPool;

private final TransactionPoolMetrics metrics;
private final int maxTransactionsPerMessage;

public TransactionsMessageProcessor(
final PeerTransactionTracker transactionTracker,
final TransactionPool transactionPool,
final TransactionPoolMetrics metrics) {
final TransactionPoolMetrics metrics,
final int maxTransactionsPerMessage) {
this.transactionTracker = transactionTracker;
this.transactionPool = transactionPool;
this.metrics = metrics;
this.maxTransactionsPerMessage = maxTransactionsPerMessage;
metrics.initExpiredMessagesCounter(METRIC_LABEL);
}

Expand Down Expand Up @@ -76,6 +79,17 @@ private void processTransactionsMessage(
final EthPeer peer, final TransactionsMessage transactionsMessage) {
try {
final List<Transaction> incomingTransactions = transactionsMessage.transactions();

if (incomingTransactions.size() > maxTransactionsPerMessage) {
LOG.debug(
"Transactions message contains too many transactions ({} > {}), disconnecting: {}",
incomingTransactions.size(),
maxTransactionsPerMessage,
peer);
peer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL_MALFORMED_MESSAGE_RECEIVED);
return;
}

final Collection<Transaction> freshTransactions =
transactionTracker.receivedTransactions(peer, incomingTransactions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,15 @@

import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.messages.TransactionsMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.metrics.StubMetricsSystem;

import java.util.ArrayList;
import java.util.List;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -58,7 +63,10 @@ public void setup() {

messageHandler =
new TransactionsMessageProcessor(
transactionTracker, transactionPool, new TransactionPoolMetrics(metricsSystem));
transactionTracker,
transactionPool,
new TransactionPoolMetrics(metricsSystem),
EthProtocolConfiguration.DEFAULT_MAX_TRANSACTIONS_PER_MESSAGE);
}

@Test
Expand Down Expand Up @@ -124,4 +132,27 @@ peer1, asList(transaction1, transaction2, transaction3)))
verify(transactionPool).addRemoteTransactions(singletonList(transaction1));
verifyNoMoreInteractions(transactionPool);
}

@Test
public void shouldDisconnectPeerWhenTooManyTransactionsInMessage() {
final int maxPerMessage = 2;
final TransactionsMessageProcessor strictHandler =
new TransactionsMessageProcessor(
transactionTracker,
transactionPool,
new TransactionPoolMetrics(metricsSystem),
maxPerMessage);

final List<Transaction> tooMany = new ArrayList<>();
for (int i = 0; i < maxPerMessage + 1; i++) {
tooMany.add(generator.transaction());
}

strictHandler.processTransactionsMessage(
peer1, TransactionsMessage.create(tooMany), now(), ofMinutes(1));

verify(peer1).disconnect(DisconnectReason.BREACH_OF_PROTOCOL_MALFORMED_MESSAGE_RECEIVED);
verifyNoInteractions(transactionPool);
verifyNoInteractions(transactionTracker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import org.hyperledger.besu.util.NetworkUtility;
import org.hyperledger.besu.util.number.ByteUnits;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -24,12 +25,14 @@
import java.util.Optional;

public class RlpxConfiguration {
public static final int DEFAULT_MAX_MESSAGE_SIZE = 10 * ByteUnits.MEGABYTE;
public static final float DEFAULT_FRACTION_REMOTE_CONNECTIONS_ALLOWED = 0.6f;
private String clientId = "TestClient/1.0.0";
private String bindHost = NetworkUtility.INADDR_ANY;
private int bindPort = 30303;
private Optional<String> bindHostIpv6 = Optional.empty();
private Optional<Integer> bindPortIpv6 = Optional.empty();
private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
private List<SubProtocol> supportedProtocols = Collections.emptyList();

public static RlpxConfiguration create() {
Expand Down Expand Up @@ -90,6 +93,15 @@ public boolean isDualStackEnabled() {
return bindHostIpv6.isPresent() && bindPortIpv6.isPresent();
}

public int getMaxMessageSize() {
return maxMessageSize;
}

public RlpxConfiguration setMaxMessageSize(final int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
return this;
}

public String getClientId() {
return clientId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ public CompletableFuture<Integer> start(final int tcpPort) {
return CompletableFuture.failedFuture(e);
}

peerPermissions.subscribeUpdate(this::handlePermissionsUpdate);

return system
.start()
.thenApply(
Expand Down Expand Up @@ -496,7 +498,7 @@ private Stream<DiscoveryPeer> candidatePeers(final Collection<NodeRecord> newPee
* @param remotePeer the remote peer to check
* @return {@code true} if the peer is permitted
*/
private boolean isPeerPermitted(final Peer localNode, final DiscoveryPeer remotePeer) {
private boolean isPeerPermitted(final Peer localNode, final Peer remotePeer) {
if (localNode == null) {
// Local node not yet initialized — reject rather than bypass identity checks.
// The peer will be re-discovered on the next FINDNODE round.
Expand Down Expand Up @@ -572,4 +574,26 @@ private void registerMetrics(final MutableDiscoverySystem system) {
"Current number of total nodes tracked by the DiscV5 discovery system",
() -> system.getBucketStats().getTotalNodeCount());
}

private void handlePermissionsUpdate(
final boolean addRestrictions, final Optional<List<Peer>> affectedPeers) {
if (addRestrictions) {
nodeRecordManager
.getLocalNode()
.ifPresent(
((localNode) -> {
affectedPeers.ifPresentOrElse(
(peers) ->
peers.stream()
.filter((peer) -> !isPeerPermitted(localNode, peer))
.forEach(this::dropPeer),
() ->
discoverySystem.get().getNodeRecordBuckets().stream()
.flatMap(List::stream)
.map(nr -> DiscoveryPeerFactory.fromNodeRecord(nr, preferIpv6Outbound))
.filter((peer) -> !isPeerPermitted(localNode, peer))
.forEach(this::dropPeer));
}));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandler<Byte
private final FramerProvider framerProvider;
private final boolean inboundInitiated;
private final PeerLookup peerLookup;
private final int maxMessageSize;

AbstractHandshakeHandler(
final List<SubProtocol> subProtocols,
Expand All @@ -73,7 +74,8 @@ abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandler<Byte
final HandshakerProvider handshakerProvider,
final FramerProvider framerProvider,
final boolean inboundInitiated,
final PeerLookup peerLookup) {
final PeerLookup peerLookup,
final int maxMessageSize) {
this.subProtocols = subProtocols;
this.localNode = localNode;
this.expectedPeer = expectedPeer;
Expand All @@ -84,6 +86,7 @@ abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandler<Byte
this.framerProvider = framerProvider;
this.inboundInitiated = inboundInitiated;
this.peerLookup = peerLookup;
this.maxMessageSize = maxMessageSize;
}

/**
Expand Down Expand Up @@ -126,7 +129,8 @@ protected final void channelRead0(final ChannelHandlerContext ctx, final ByteBuf
connectionFuture,
metricsSystem,
inboundInitiated,
peerLookup);
peerLookup,
maxMessageSize);

ctx.channel()
.pipeline()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ final class DeFramer extends ByteToMessageDecoder {
private final PeerLookup peerLookup;
private boolean hellosExchanged;
private final LabelledMetric<Counter> outboundMessagesCounter;
private final int maxMessageSize;
private final LabelledMetric<Counter> outboundBytesCounter;

DeFramer(
Expand All @@ -87,7 +88,8 @@ final class DeFramer extends ByteToMessageDecoder {
final CompletableFuture<PeerConnection> connectFuture,
final MetricsSystem metricsSystem,
final boolean inboundInitiated,
final PeerLookup peerLookup) {
final PeerLookup peerLookup,
final int maxMessageSize) {
this.framer = framer;
this.subProtocols = subProtocols;
this.localNode = localNode;
Expand All @@ -96,6 +98,7 @@ final class DeFramer extends ByteToMessageDecoder {
this.connectionEventDispatcher = connectionEventDispatcher;
this.inboundInitiated = inboundInitiated;
this.peerLookup = peerLookup;
this.maxMessageSize = maxMessageSize;
this.outboundMessagesCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.NETWORK,
Expand All @@ -121,6 +124,24 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final L

if (hellosExchanged) {

if (message.getSize() > maxMessageSize) {
LOG.debug(
"Oversized message received ({} bytes > {} max), disconnecting peer {}",
message.getSize(),
maxMessageSize,
expectedPeer.map(Peer::getEnodeURLString).orElse("unknown"));
if (connectFuture.isDone() && !connectFuture.isCompletedExceptionally()) {
connectFuture
.join()
.disconnect(
DisconnectMessage.DisconnectReason
.BREACH_OF_PROTOCOL_MALFORMED_MESSAGE_RECEIVED);
} else {
ctx.close();
}
return;
}

out.add(message);

} else if (message.getCode() == WireMessageCodes.HELLO) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public HandshakeHandlerInbound(
final MetricsSystem metricsSystem,
final HandshakerProvider handshakerProvider,
final FramerProvider framerProvider,
final PeerLookup peerLookup) {
final PeerLookup peerLookup,
final int maxMessageSize) {
super(
subProtocols,
localNode,
Expand All @@ -53,7 +54,8 @@ public HandshakeHandlerInbound(
handshakerProvider,
framerProvider,
true,
peerLookup);
peerLookup,
maxMessageSize);
handshaker.prepareResponder(nodeKey);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public HandshakeHandlerOutbound(
final MetricsSystem metricsSystem,
final HandshakerProvider handshakerProvider,
final FramerProvider framerProvider,
final PeerLookup peerLookup) {
final PeerLookup peerLookup,
final int maxMessageSize) {
super(
subProtocols,
localNode,
Expand All @@ -63,7 +64,8 @@ public HandshakeHandlerOutbound(
handshakerProvider,
framerProvider,
false,
peerLookup);
peerLookup,
maxMessageSize);
handshaker.prepareInitiator(
nodeKey, SignatureAlgorithmFactory.getInstance().createPublicKey(peer.getId()));
this.first = handshaker.firstMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ private HandshakeHandlerInbound inboundHandler(
metricsSystem,
this,
this,
peerLookup);
peerLookup,
config.getMaxMessageSize());
}

@NotNull
Expand All @@ -365,7 +366,8 @@ private HandshakeHandlerOutbound outboundHandler(
metricsSystem,
this,
this,
peerLookup);
peerLookup,
config.getMaxMessageSize());
}

@NotNull
Expand Down
Loading
Loading