Skip to content

Commit ecb7356

Browse files
fix assemble + rename
1 parent c433948 commit ecb7356

File tree

3 files changed

+17
-16
lines changed

3 files changed

+17
-16
lines changed

networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PPeer.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import org.apache.logging.log4j.LogManager;
3030
import org.apache.logging.log4j.Logger;
3131
import tech.pegasys.teku.infrastructure.async.SafeFuture;
32-
import tech.pegasys.teku.networking.p2p.libp2p.rpc.PeerRpcHandler;
3332
import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler;
33+
import tech.pegasys.teku.networking.p2p.libp2p.rpc.ThrottlingRpcHandler;
3434
import tech.pegasys.teku.networking.p2p.network.PeerAddress;
3535
import tech.pegasys.teku.networking.p2p.peer.DisconnectReason;
3636
import tech.pegasys.teku.networking.p2p.peer.DisconnectRequestHandler;
@@ -47,7 +47,7 @@
4747
public class LibP2PPeer implements Peer {
4848
private static final Logger LOG = LogManager.getLogger();
4949

50-
private final Map<RpcMethod<?, ?, ?>, PeerRpcHandler<?, ?, ?>> rpcHandlers;
50+
private final Map<RpcMethod<?, ?, ?>, ThrottlingRpcHandler<?, ?, ?>> rpcHandlers;
5151
private final ReputationManager reputationManager;
5252
private final Function<PeerId, Double> peerScoreFunction;
5353
private final Connection connection;
@@ -74,7 +74,7 @@ public LibP2PPeer(
7474
this.connection = connection;
7575
this.rpcHandlers =
7676
rpcHandlers.stream()
77-
.collect(Collectors.toMap(RpcHandler::getRpcMethod, PeerRpcHandler::new));
77+
.collect(Collectors.toMap(RpcHandler::getRpcMethod, ThrottlingRpcHandler::new));
7878
this.reputationManager = reputationManager;
7979
this.peerScoreFunction = peerScoreFunction;
8080
this.peerId = connection.secureSession().getRemoteId();
@@ -207,8 +207,8 @@ SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequest(
207207
final TRequest request,
208208
final RespHandler responseHandler) {
209209
@SuppressWarnings("unchecked")
210-
final PeerRpcHandler<TOutgoingHandler, TRequest, RespHandler> rpcHandler =
211-
(PeerRpcHandler<TOutgoingHandler, TRequest, RespHandler>) rpcHandlers.get(rpcMethod);
210+
final ThrottlingRpcHandler<TOutgoingHandler, TRequest, RespHandler> rpcHandler =
211+
(ThrottlingRpcHandler<TOutgoingHandler, TRequest, RespHandler>) rpcHandlers.get(rpcMethod);
212212
if (rpcHandler == null) {
213213
throw new IllegalArgumentException(
214214
"Unknown rpc method invoked: " + String.join(",", rpcMethod.getIds()));

networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/PeerRpcHandler.java networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/ThrottlingRpcHandler.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,22 @@
2121
import tech.pegasys.teku.networking.p2p.rpc.RpcStreamController;
2222
import tech.pegasys.teku.spec.constants.NetworkConstants;
2323

24-
public class PeerRpcHandler<
24+
public class ThrottlingRpcHandler<
2525
TOutgoingHandler extends RpcRequestHandler,
2626
TRequest,
2727
TRespHandler extends RpcResponseHandler<?>> {
2828

2929
private final RpcHandler<TOutgoingHandler, TRequest, TRespHandler> delegate;
30-
private final ThrottlingTaskQueue outgoingQueue =
30+
private final ThrottlingTaskQueue requestsQueue =
3131
ThrottlingTaskQueue.create(NetworkConstants.MAX_CONCURRENT_REQUESTS);
3232

33-
public PeerRpcHandler(final RpcHandler<TOutgoingHandler, TRequest, TRespHandler> delegate) {
33+
public ThrottlingRpcHandler(final RpcHandler<TOutgoingHandler, TRequest, TRespHandler> delegate) {
3434
this.delegate = delegate;
3535
}
3636

3737
public SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequest(
3838
final Connection connection, final TRequest request, final TRespHandler responseHandler) {
39-
return outgoingQueue.queueTask(
39+
return requestsQueue.queueTask(
4040
() -> delegate.sendRequest(connection, request, responseHandler));
4141
}
4242
}

networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/PeerRpcHandlerTest.java networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/rpc/ThrottlingRpcHandlerTest.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import tech.pegasys.teku.networking.p2p.rpc.RpcStreamController;
3030
import tech.pegasys.teku.spec.constants.NetworkConstants;
3131

32-
public class PeerRpcHandlerTest {
32+
public class ThrottlingRpcHandlerTest {
3333

3434
private final Connection connection = mock(Connection.class);
3535

@@ -41,15 +41,16 @@ public class PeerRpcHandlerTest {
4141
private final RpcMethod<RpcRequestHandler, Object, RpcResponseHandler<Void>> rpcMethod =
4242
mock(RpcMethod.class);
4343

44-
private PeerRpcHandler<RpcRequestHandler, Object, RpcResponseHandler<Void>> peerRpcHandler;
44+
private ThrottlingRpcHandler<RpcRequestHandler, Object, RpcResponseHandler<Void>>
45+
throttlingRpcHandler;
4546

4647
@BeforeEach
4748
public void init() {
4849
when(delegate.getRpcMethod()).thenReturn(rpcMethod);
49-
peerRpcHandler = new PeerRpcHandler<>(delegate);
50+
throttlingRpcHandler = new ThrottlingRpcHandler<>(delegate);
5051
}
5152

52-
@SuppressWarnings("unchecked")
53+
@SuppressWarnings({"unchecked", "FutureReturnValueIgnored"})
5354
@Test
5455
public void sendRequest_throttlesRequests() {
5556

@@ -61,16 +62,16 @@ public void sendRequest_throttlesRequests() {
6162
final SafeFuture<RpcStreamController<RpcRequestHandler>> future =
6263
new SafeFuture<>();
6364
when(delegate.sendRequest(connection, null, null)).thenReturn(future);
64-
peerRpcHandler.sendRequest(connection, null, null);
65+
throttlingRpcHandler.sendRequest(connection, null, null);
6566
return future;
6667
})
6768
.toList();
6869

69-
when(peerRpcHandler.sendRequest(connection, null, null))
70+
when(throttlingRpcHandler.sendRequest(connection, null, null))
7071
.thenReturn(SafeFuture.completedFuture(mock(RpcStreamController.class)));
7172

7273
final SafeFuture<RpcStreamController<RpcRequestHandler>> throttledRequest =
73-
peerRpcHandler.sendRequest(connection, null, null);
74+
throttlingRpcHandler.sendRequest(connection, null, null);
7475

7576
// completed request should be throttled
7677
assertThat(throttledRequest).isNotDone();

0 commit comments

Comments
 (0)