Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.1.1 Release #367

Merged
merged 12 commits into from
May 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
name: Bug Report
description: Create a bug report for jvm-libp2p

body:
- type: markdown
attributes:
value: |
Thank you for filing a bug report!
- type: textarea
attributes:
label: Summary
description: Please provide a short summary of the bug, along with any information you feel relevant to replicate the bug.
validations:
required: true
- type: textarea
attributes:
label: Expected behavior
description: Describe what you expect to happen.
validations:
required: true
- type: textarea
attributes:
label: Actual behavior
description: Describe what actually happens.
validations:
required: true
- type: textarea
attributes:
label: Relevant log output
description: Please copy and paste any relevant log output. This will be automatically formatted into code, so no need for backticks.
render: shell
validations:
required: false
- type: textarea
attributes:
label: Possible Solution
description: Suggest a fix/reason for the bug, or ideas how to implement the addition or change.
validations:
required: false
- type: textarea
attributes:
label: Version
description: Which version of libp2p are you using? libp2p version (version number, commit, or branch)
validations:
required: false
- type: dropdown
attributes:
label: Would you like to work on fixing this bug ?
description: Any contribution towards fixing the bug is greatly appreciated. We are more than happy to provide help on the process.
options:
- "Yes"
- "No"
- Maybe
validations:
required: true
8 changes: 8 additions & 0 deletions .github/ISSUE_TEMPLATE/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
blank_issues_enabled: true
contact_links:
- name: Technical Questions
url: https://github.com/libp2p/jvm-libp2p/discussions/new?category=q-a
about: Please ask technical questions in the jvm-libp2p Github Discussions forum.
- name: Community-wide libp2p Discussion
url: https://discuss.libp2p.io
about: Discussions and questions about the libp2p community.
31 changes: 31 additions & 0 deletions .github/ISSUE_TEMPLATE/enhancement.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: Enhancement
description: Suggest an improvement to an existing jvm-libp2p feature.
body:
- type: textarea
attributes:
label: Description
description: Describe the enhancement that you are proposing.
validations:
required: true
- type: textarea
attributes:
label: Motivation
description: Explain why this enhancement is beneficial.
validations:
required: true
- type: textarea
attributes:
label: Current Implementation
description: Describe the current implementation.
validations:
required: true
- type: dropdown
attributes:
label: Are you planning to do it yourself in a pull request ?
description: Any contribution is greatly appreciated. We are more than happy to provide help on the process.
options:
- "Yes"
- "No"
- Maybe
validations:
required: true
42 changes: 42 additions & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
name: Feature request
description: Suggest a new feature in jvm-libp2p
body:
- type: markdown
attributes:
value: |
If you'd like to suggest a feature related to libp2p but not specifically related to the JVM implementation, please file an issue at https://github.com/libp2p/specs instead.
- type: textarea
attributes:
label: Description
description: Briefly describe the feature that you are requesting.
validations:
required: true
- type: textarea
attributes:
label: Motivation
description: Explain why this feature is needed.
validations:
required: true
- type: textarea
attributes:
label: Requirements
description: Write a list of what you want this feature to do.
placeholder: "1."
validations:
required: true
- type: textarea
attributes:
label: Open questions
description: Use this section to ask any questions that are related to the feature.
validations:
required: false
- type: dropdown
attributes:
label: Are you planning to do it yourself in a pull request ?
description: Any contribution is greatly appreciated. We are more than happy to provide help on the process.
options:
- "Yes"
- "No"
- Maybe
validations:
required: true
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ List of components in the Libp2p spec and their JVM implementation status
| **Stream Multiplexing** | [yamux](https://github.com/libp2p/specs/blob/master/yamux/README.md) | :lemon: |
| | [mplex](https://github.com/libp2p/specs/blob/master/mplex/README.md) | :green_apple: |
| **NAT Traversal** | [circuit-relay-v2](https://github.com/libp2p/specs/blob/master/relay/circuit-v2.md) | :lemon: |
| | [autonat](https://github.com/libp2p/specs/tree/master/autonat) | |
| | [autonat](https://github.com/libp2p/specs/tree/master/autonat) | :lemon: |
| | [hole-punching](https://github.com/libp2p/specs/blob/master/connections/hole-punching.md) | |
| **Discovery** | [bootstrap](https://github.com/libp2p/specs/blob/master/kad-dht/README.md#bootstrap-process) | |
| | random-walk | |
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ plugins {
id("io.gitlab.arturbosch.detekt").version("1.22.0")
id("java")
id("maven-publish")
id("org.jetbrains.dokka").version("1.9.0")
id("org.jetbrains.dokka").version("1.9.20")
id("com.diffplug.spotless").version("6.21.0")
id("java-test-fixtures")
id("io.spring.dependency-management").version("1.1.3")
@@ -37,7 +37,7 @@ configure(
}
) {
group = "io.libp2p"
version = "1.1.0-RELEASE"
version = "1.1.1-RELEASE"

apply(plugin = "kotlin")
apply(plugin = "idea")
172 changes: 172 additions & 0 deletions libp2p/src/main/java/io/libp2p/protocol/autonat/AutonatProtocol.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package io.libp2p.protocol.autonat;

import com.google.protobuf.*;
import io.libp2p.core.*;
import io.libp2p.core.Stream;
import io.libp2p.core.multiformats.*;
import io.libp2p.core.multistream.*;
import io.libp2p.protocol.*;
import io.libp2p.protocol.autonat.pb.*;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
import org.jetbrains.annotations.*;

public class AutonatProtocol extends ProtobufProtocolHandler<AutonatProtocol.AutoNatController> {

public static class Binding extends StrictProtocolBinding<AutoNatController> {
public Binding() {
super("/libp2p/autonat/v1.0.0", new AutonatProtocol());
}
}

public interface AutoNatController {
CompletableFuture<Autonat.Message> rpc(Autonat.Message req);

default CompletableFuture<Autonat.Message.DialResponse> requestDial(
PeerId ourId, List<Multiaddr> us) {
if (us.isEmpty())
throw new IllegalStateException("Requested autonat dial with no addresses!");
return rpc(Autonat.Message.newBuilder()
.setType(Autonat.Message.MessageType.DIAL)
.setDial(
Autonat.Message.Dial.newBuilder()
.setPeer(
Autonat.Message.PeerInfo.newBuilder()
.addAllAddrs(
us.stream()
.map(a -> ByteString.copyFrom(a.serialize()))
.collect(Collectors.toList()))
.setId(ByteString.copyFrom(ourId.getBytes()))))
.build())
.thenApply(msg -> msg.getDialResponse());
}
}

public static class Sender implements ProtocolMessageHandler<Autonat.Message>, AutoNatController {
private final Stream stream;
private final LinkedBlockingDeque<CompletableFuture<Autonat.Message>> queue =
new LinkedBlockingDeque<>();

public Sender(Stream stream) {
this.stream = stream;
}

@Override
public void onMessage(@NotNull Stream stream, Autonat.Message msg) {
queue.poll().complete(msg);
}

public CompletableFuture<Autonat.Message> rpc(Autonat.Message req) {
CompletableFuture<Autonat.Message> res = new CompletableFuture<>();
queue.add(res);
stream.writeAndFlush(req);
return res;
}
}

private static boolean sameIP(Multiaddr a, Multiaddr b) {
if (a.has(Protocol.IP4))
return a.getFirstComponent(Protocol.IP4).equals(b.getFirstComponent(Protocol.IP4));
if (a.has(Protocol.IP6))
return a.getFirstComponent(Protocol.IP6).equals(b.getFirstComponent(Protocol.IP6));
return false;
}

private static boolean reachableIP(Multiaddr a) {
try {
if (a.has(Protocol.IP4))
return InetAddress.getByName(a.getFirstComponent(Protocol.IP4).getStringValue())
.isReachable(1000);
if (a.has(Protocol.IP6))
return InetAddress.getByName(a.getFirstComponent(Protocol.IP6).getStringValue())
.isReachable(1000);
} catch (IOException e) {
}
return false;
}

public static class Receiver
implements ProtocolMessageHandler<Autonat.Message>, AutoNatController {
private final Stream p2pstream;

public Receiver(Stream p2pstream) {
this.p2pstream = p2pstream;
}

@Override
public void onMessage(@NotNull Stream stream, Autonat.Message msg) {
switch (msg.getType()) {
case DIAL:
{
Autonat.Message.Dial dial = msg.getDial();
PeerId peerId = new PeerId(dial.getPeer().getId().toByteArray());
List<Multiaddr> requestedDials =
dial.getPeer().getAddrsList().stream()
.map(s -> Multiaddr.deserialize(s.toByteArray()))
.collect(Collectors.toList());
PeerId streamPeerId = stream.remotePeerId();
if (!peerId.equals(streamPeerId)) {
p2pstream.close();
return;
}

Multiaddr remote = stream.getConnection().remoteAddress();
Optional<Multiaddr> reachable =
requestedDials.stream()
.filter(a -> sameIP(a, remote))
.filter(a -> !a.has(Protocol.P2PCIRCUIT))
.filter(a -> reachableIP(a))
.findAny();
Autonat.Message.Builder resp =
Autonat.Message.newBuilder().setType(Autonat.Message.MessageType.DIAL_RESPONSE);
if (reachable.isPresent()) {
resp =
resp.setDialResponse(
Autonat.Message.DialResponse.newBuilder()
.setStatus(Autonat.Message.ResponseStatus.OK)
.setAddr(ByteString.copyFrom(reachable.get().serialize())));
} else {
resp =
resp.setDialResponse(
Autonat.Message.DialResponse.newBuilder()
.setStatus(Autonat.Message.ResponseStatus.E_DIAL_ERROR));
}
p2pstream.writeAndFlush(resp);
}
default:
{
}
}
}

public CompletableFuture<Autonat.Message> rpc(Autonat.Message msg) {
return CompletableFuture.failedFuture(
new IllegalStateException("Cannot send form a receiver!"));
}
}

private static final int TRAFFIC_LIMIT = 2 * 1024;

public AutonatProtocol() {
super(Autonat.Message.getDefaultInstance(), TRAFFIC_LIMIT, TRAFFIC_LIMIT);
}

@NotNull
@Override
protected CompletableFuture<AutoNatController> onStartInitiator(@NotNull Stream stream) {
Sender replyPropagator = new Sender(stream);
stream.pushHandler(replyPropagator);
return CompletableFuture.completedFuture(replyPropagator);
}

@NotNull
@Override
protected CompletableFuture<AutoNatController> onStartResponder(@NotNull Stream stream) {
Receiver dialer = new Receiver(stream);
stream.pushHandler(dialer);
return CompletableFuture.completedFuture(dialer);
}
}
9 changes: 8 additions & 1 deletion libp2p/src/main/kotlin/io/libp2p/discovery/MDnsDiscovery.kt
Original file line number Diff line number Diff line change
@@ -76,7 +76,14 @@ class MDnsDiscovery(
val address = host.listenAddresses().find {
it.has(Protocol.IP4)
}
val str = address?.getFirstComponent(Protocol.TCP)?.stringValue!!
val ipv6OnlyAddress = if (address == null) {
host.listenAddresses().find {
it.has(Protocol.IP6)
}
} else {
address
}
val str = ipv6OnlyAddress?.getFirstComponent(Protocol.TCP)?.stringValue!!
return Integer.parseInt(str)
}

6 changes: 5 additions & 1 deletion libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt
Original file line number Diff line number Diff line change
@@ -323,7 +323,11 @@ abstract class AbstractRouter(

override fun getPeerTopics(): CompletableFuture<Map<PeerId, Set<Topic>>> {
return submitOnEventThread {
peersTopics.asFirstToSecondMap().mapKeys { it.key.peerId }
peersTopics.asFirstToSecondMap()
.map { (key, value) ->
key.peerId to value.toSet()
}
.toMap()
}
}

10 changes: 7 additions & 3 deletions libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt
Original file line number Diff line number Diff line change
@@ -317,6 +317,10 @@ open class GossipRouter(
}

private fun handleIHave(msg: Rpc.ControlIHave, peer: PeerHandler) {
// we ignore IHAVE gossip for unknown topics
if (msg.hasTopicID() && !mesh.containsKey(msg.topicID)) {
return
}
val peerScore = score.score(peer.peerId)
// we ignore IHAVE gossip from any peer whose score is below the gossip threshold
if (peerScore < scoreParams.gossipThreshold) return
@@ -544,7 +548,7 @@ open class GossipRouter(

peers.shuffled(random)
.take(max((params.gossipFactor * peers.size).toInt(), params.DLazy))
.forEach { enqueueIhave(it, shuffledMessageIds) }
.forEach { enqueueIhave(it, shuffledMessageIds, topic) }
}

private fun graft(peer: PeerHandler, topic: Topic) {
@@ -587,8 +591,8 @@ open class GossipRouter(
private fun enqueueIwant(peer: PeerHandler, messageIds: List<MessageId>) =
pendingRpcParts.getQueue(peer).addIWants(messageIds)

private fun enqueueIhave(peer: PeerHandler, messageIds: List<MessageId>) =
pendingRpcParts.getQueue(peer).addIHaves(messageIds)
private fun enqueueIhave(peer: PeerHandler, messageIds: List<MessageId>, topic: Topic) =
pendingRpcParts.getQueue(peer).addIHaves(messageIds, topic)

data class AcceptRequestsWhitelistEntry(val whitelistedTill: Long, val messagesAccepted: Int = 0) {
fun incrementMessageCount() = AcceptRequestsWhitelistEntry(whitelistedTill, messagesAccepted + 1)
Original file line number Diff line number Diff line change
@@ -10,8 +10,8 @@ import pubsub.pb.Rpc

interface GossipRpcPartsQueue : RpcPartsQueue {

fun addIHave(messageId: MessageId)
fun addIHaves(messageIds: Collection<MessageId>) = messageIds.forEach { addIHave(it) }
fun addIHave(messageId: MessageId, topic: Topic)
fun addIHaves(messageIds: Collection<MessageId>, topic: Topic) = messageIds.forEach { addIHave(it, topic) }
fun addIWant(messageId: MessageId)
fun addIWants(messageIds: Collection<MessageId>) = messageIds.forEach { addIWant(it) }

@@ -37,14 +37,13 @@ open class DefaultGossipRpcPartsQueue(
private val params: GossipParams
) : DefaultRpcPartsQueue(), GossipRpcPartsQueue {

protected data class IHavePart(val messageId: MessageId) : AbstractPart {
protected data class IHavePart(val messageId: MessageId, val topic: Topic) : AbstractPart {
override fun appendToBuilder(builder: Rpc.RPC.Builder) {
val ctrlBuilder = builder.controlBuilder
val iHaveBuilder = if (ctrlBuilder.ihaveBuilderList.isEmpty()) {
ctrlBuilder.addIhaveBuilder()
} else {
ctrlBuilder.getIhaveBuilder(0)
}
val iHaveBuilder = ctrlBuilder.ihaveBuilderList
.find { it.topicID == topic }
?: ctrlBuilder.addIhaveBuilder().setTopicID(topic)

iHaveBuilder.addMessageIDs(messageId.toProtobuf())
}
}
@@ -82,8 +81,8 @@ open class DefaultGossipRpcPartsQueue(
}
}

override fun addIHave(messageId: MessageId) {
addPart(IHavePart(messageId))
override fun addIHave(messageId: MessageId, topic: Topic) {
addPart(IHavePart(messageId, topic))
}

override fun addIWant(messageId: MessageId) {
Original file line number Diff line number Diff line change
@@ -97,7 +97,8 @@ class NoiseIoHandshake(
private var sentNoiseKeyPayload = false
private var instancePayload: ByteArray? = null
private var activated = false
private var remotePeerId: PeerId? = null
private var remotePubKey: PubKey? = null
private val remotePeerId: PeerId? get() = remotePubKey?.let { PeerId.fromPubKey(it) }
private var expectedRemotePeerId: PeerId? = null

init {
@@ -139,7 +140,7 @@ class NoiseIoHandshake(
// the remote public key has been provided by the XX protocol
val derivedRemotePublicKey = handshakeState.remotePublicKey
if (derivedRemotePublicKey.hasPublicKey()) {
remotePeerId = verifyPayload(ctx, instancePayload!!, derivedRemotePublicKey)
remotePubKey = verifyPayload(ctx, instancePayload!!, derivedRemotePublicKey)
if (role == Role.INIT && expectedRemotePeerId != remotePeerId) {
throw InvalidRemotePubKey()
}
@@ -248,7 +249,7 @@ class NoiseIoHandshake(
ctx: ChannelHandlerContext,
payload: ByteArray,
remotePublicKeyState: DHState
): PeerId {
): PubKey {
log.debug("Verifying noise static key payload")

val (pubKeyFromMessage, signatureFromMessage) = unpackKeyAndSignature(payload)
@@ -264,7 +265,7 @@ class NoiseIoHandshake(
handshakeFailed(ctx, InvalidRemotePubKey())
}

return PeerId.fromPubKey(pubKeyFromMessage)
return pubKeyFromMessage
} // verifyPayload

private fun unpackKeyAndSignature(payload: ByteArray): Pair<PubKey, ByteArray> {
@@ -287,7 +288,7 @@ class NoiseIoHandshake(
val secureSession = NoiseSecureChannelSession(
PeerId.fromPubKey(localKey.publicKey()),
remotePeerId!!,
localKey.publicKey(),
remotePubKey!!,
aliceSplit,
bobSplit
)
37 changes: 37 additions & 0 deletions libp2p/src/main/proto/autonat.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
syntax = "proto2";

package io.libp2p.protocol.autonat.pb;

message Message {
enum MessageType {
DIAL = 0;
DIAL_RESPONSE = 1;
}

enum ResponseStatus {
OK = 0;
E_DIAL_ERROR = 100;
E_DIAL_REFUSED = 101;
E_BAD_REQUEST = 200;
E_INTERNAL_ERROR = 300;
}

message PeerInfo {
optional bytes id = 1;
repeated bytes addrs = 2;
}

message Dial {
optional PeerInfo peer = 1;
}

message DialResponse {
optional ResponseStatus status = 1;
optional string statusText = 2;
optional bytes addr = 3;
}

optional MessageType type = 1;
optional Dial dial = 2;
optional DialResponse dialResponse = 3;
}
73 changes: 73 additions & 0 deletions libp2p/src/test/java/io/libp2p/core/AutonatTestJava.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.libp2p.core;

import io.libp2p.core.dsl.*;
import io.libp2p.core.multiformats.*;
import io.libp2p.core.mux.*;
import io.libp2p.protocol.*;
import io.libp2p.protocol.autonat.*;
import io.libp2p.protocol.autonat.pb.*;
import io.libp2p.security.noise.*;
import io.libp2p.transport.tcp.*;
import java.util.concurrent.*;
import org.junit.jupiter.api.*;

public class AutonatTestJava {

@Test
void autonatDial() throws Exception {
Host clientHost =
new HostBuilder()
.transport(TcpTransport::new)
.secureChannel(NoiseXXSecureChannel::new)
.muxer(StreamMuxerProtocol::getYamux)
.protocol(new Ping())
.protocol(new AutonatProtocol.Binding())
.listen("/ip4/127.0.0.1/tcp/0")
.build();

Host serverHost =
new HostBuilder()
.transport(TcpTransport::new)
.secureChannel(NoiseXXSecureChannel::new)
.muxer(StreamMuxerProtocol::getYamux)
.protocol(new Ping())
.protocol(new AutonatProtocol.Binding())
.listen("/ip4/127.0.0.1/tcp/0")
.build();

CompletableFuture<Void> clientStarted = clientHost.start();
CompletableFuture<Void> serverStarted = serverHost.start();
clientStarted.get(5, TimeUnit.SECONDS);
System.out.println("Client started");
serverStarted.get(5, TimeUnit.SECONDS);
System.out.println("Server started");

StreamPromise<AutonatProtocol.AutoNatController> autonat =
clientHost
.getNetwork()
.connect(serverHost.getPeerId(), serverHost.listenAddresses().get(0))
.thenApply(it -> it.muxerSession().createStream(new AutonatProtocol.Binding()))
.get(5, TimeUnit.SECONDS);

Stream autonatStream = autonat.getStream().get(5, TimeUnit.SECONDS);
System.out.println("Autonat stream created");
AutonatProtocol.AutoNatController autonatCtr = autonat.getController().get(5, TimeUnit.SECONDS);
System.out.println("Autonat controller created");

Autonat.Message.DialResponse resp =
autonatCtr
.requestDial(clientHost.getPeerId(), clientHost.listenAddresses())
.get(5, TimeUnit.SECONDS);
Assertions.assertEquals(resp.getStatus(), Autonat.Message.ResponseStatus.OK);
Multiaddr received = Multiaddr.deserialize(resp.getAddr().toByteArray());
Assertions.assertEquals(received, clientHost.listenAddresses().get(0));

autonatStream.close().get(5, TimeUnit.SECONDS);
System.out.println("Autonat stream closed");

clientHost.stop().get(5, TimeUnit.SECONDS);
System.out.println("Client stopped");
serverHost.stop().get(5, TimeUnit.SECONDS);
System.out.println("Server stopped");
}
}
10 changes: 3 additions & 7 deletions libp2p/src/test/java/io/libp2p/core/RelayTestJava.java
Original file line number Diff line number Diff line change
@@ -31,8 +31,6 @@ private static void enableRelay(BuilderJ b, List<RelayTransport.CandidateRelay>

@Test
void pingOverLocalRelay() throws Exception {
String localListenAddress = "/ip4/127.0.0.1/tcp/40002";

Host relayHost =
new HostBuilder()
.builderModifier(b -> enableRelay(b, Collections.emptyList()))
@@ -79,7 +77,7 @@ void pingOverLocalRelay() throws Exception {
.secureChannel(NoiseXXSecureChannel::new)
.muxer(StreamMuxerProtocol::getYamux)
.protocol(new Ping())
.listen(localListenAddress)
.listen("/ip4/127.0.0.1/tcp/0")
.listen(relayAddr + "/p2p-circuit")
.build();
serverHost.getNetwork().getTransports().stream()
@@ -130,8 +128,6 @@ void pingOverLocalRelay() throws Exception {

@Test
void relayStreamsAreLimited() throws Exception {
String localListenAddress = "/ip4/127.0.0.1/tcp/40002";

Host relayHost =
new HostBuilder()
.builderModifier(b -> enableRelay(b, Collections.emptyList()))
@@ -181,7 +177,7 @@ void relayStreamsAreLimited() throws Exception {
.secureChannel(NoiseXXSecureChannel::new)
.muxer(StreamMuxerProtocol::getYamux)
.protocol(new Blob(blobSize))
.listen(localListenAddress)
.listen("/ip4/127.0.0.1/tcp/0")
.listen(relayAddr + "/p2p-circuit")
.build();
serverHost.getNetwork().getTransports().stream()
@@ -215,7 +211,7 @@ void relayStreamsAreLimited() throws Exception {
System.out.println("Blob controller created");

Assertions.assertThrows(
ExecutionException.class, () -> blobCtr.blob().get(5, TimeUnit.SECONDS));
ExecutionException.class, () -> blobCtr.blob().get(30, TimeUnit.SECONDS));

clientHost.stop().get(5, TimeUnit.SECONDS);
System.out.println("Client stopped");
21 changes: 21 additions & 0 deletions libp2p/src/test/kotlin/io/libp2p/discovery/MDnsDiscoveryTest.kt
Original file line number Diff line number Diff line change
@@ -24,6 +24,18 @@ class MDnsDiscoveryTest {
}
}

val hostIpv6 = object : NullHost() {
override val peerId: PeerId = PeerId.fromPubKey(
generateEcdsaKeyPair().second
)

override fun listenAddresses(): List<Multiaddr> {
return listOf(
Multiaddr("/ip6/::/tcp/4001")
)
}
}

val otherHost = object : NullHost() {
override val peerId: PeerId = PeerId.fromPubKey(
generateEcdsaKeyPair().second
@@ -47,6 +59,15 @@ class MDnsDiscoveryTest {
discoverer.stop().get(1, TimeUnit.SECONDS)
}

@Test
fun `start and stop discovery ipv6`() {
val discoverer = MDnsDiscovery(hostIpv6, testServiceTag)

discoverer.start().get(1, TimeUnit.SECONDS)
TimeUnit.MILLISECONDS.sleep(100)
discoverer.stop().get(1, TimeUnit.SECONDS)
}

@Test
fun `start discovery and listen for self`() {
var peerInfo: PeerInfo? = null
65 changes: 63 additions & 2 deletions libp2p/src/test/kotlin/io/libp2p/pubsub/PubsubRouterTest.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package io.libp2p.pubsub

import io.libp2p.core.pubsub.*
import io.libp2p.core.pubsub.MessageApi
import io.libp2p.core.pubsub.RESULT_INVALID
import io.libp2p.core.pubsub.RESULT_VALID
import io.libp2p.core.pubsub.Subscriber
import io.libp2p.core.pubsub.Topic
import io.libp2p.core.pubsub.ValidationResult
import io.libp2p.core.pubsub.Validator
import io.libp2p.core.pubsub.createPubsubApi
import io.libp2p.etc.types.seconds
import io.libp2p.etc.types.toByteBuf
import io.libp2p.etc.types.toBytesBigEndian
@@ -10,6 +16,7 @@ import io.libp2p.pubsub.gossip.GossipRouter
import io.libp2p.tools.TestChannel.TestConnection
import io.netty.handler.logging.LogLevel
import io.netty.util.ResourceLeakDetector
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import pubsub.pb.Rpc
@@ -279,7 +286,10 @@ abstract class PubsubRouterTest(val routerFactory: DeterministicFuzzRouterFactor
doTenNeighborsTopology()
}

fun doTenNeighborsTopology(randomSeed: Int = 0, routerFactory: DeterministicFuzzRouterFactory = this.routerFactory) {
fun doTenNeighborsTopology(
randomSeed: Int = 0,
routerFactory: DeterministicFuzzRouterFactory = this.routerFactory
) {
val fuzz = DeterministicFuzz().also {
it.randomSeed = randomSeed.toLong()
}
@@ -398,6 +408,7 @@ abstract class PubsubRouterTest(val routerFactory: DeterministicFuzzRouterFactor
routers[1].connectSemiDuplex(routers[2], pubsubLogs = LogLevel.ERROR)

val apis = routers.map { createPubsubApi(it.router) }

class RecordingSubscriber : Subscriber {
var count = 0
override fun accept(t: MessageApi) {
@@ -460,4 +471,54 @@ abstract class PubsubRouterTest(val routerFactory: DeterministicFuzzRouterFactor
Assertions.assertEquals(2, subs2[2].count)
Assertions.assertEquals(0, subs2[3].count)
}

@Test
fun `getPeerTopics() should return immutable snapshot`() {
val fuzz = DeterministicFuzz()

fun <T> executeAsyncNow(asyncTask: () -> CompletableFuture<T>): T {
val future = asyncTask()
fuzz.timeController.addTime(Duration.ofMillis(1))
if (!future.isDone) throw AssertionError("Async task was not complete within virtual 1ms")
return future.join()
}

val router1 = fuzz.createTestRouter(routerFactory)
val router2 = fuzz.createTestRouter(routerFactory)
router2.router.subscribe("topic1")

router1.connectSemiDuplex(router2, LogLevel.DEBUG, LogLevel.DEBUG)

val peerTopics1 = executeAsyncNow { router1.router.getPeerTopics() }
val peerTopics1MapIt = peerTopics1.entries.iterator()
val peerTopics1SetIt = peerTopics1.entries.first().value.iterator()

router2.router.subscribe("topic2")

val router3 = fuzz.createTestRouter(routerFactory)
router3.router.subscribe("topic3")
router1.connectSemiDuplex(router3, LogLevel.DEBUG, LogLevel.DEBUG)

val peerTopics2 = executeAsyncNow { router1.router.getPeerTopics() }

assertThat(peerTopics2)
.containsExactlyInAnyOrderEntriesOf(
mapOf(
router2.peerId to setOf("topic1", "topic2"),
router3.peerId to setOf("topic3")
)
)

assertThat(peerTopics1)
.containsExactlyInAnyOrderEntriesOf(
mapOf(
router2.peerId to setOf("topic1")
)
)

assertThat(peerTopics1MapIt.next().key).isEqualTo(router2.peerId)
assertThat(peerTopics1MapIt.hasNext()).isFalse()
assertThat(peerTopics1SetIt.next()).isEqualTo("topic1")
assertThat(peerTopics1SetIt.hasNext()).isFalse()
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.libp2p.pubsub.gossip

import io.libp2p.pubsub.Topic
import io.libp2p.pubsub.gossip.builders.GossipParamsBuilder
import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder
import io.libp2p.tools.protobuf.RpcBuilder
@@ -35,6 +36,8 @@ class GossipRouterListLimitsTest {
private val routerWithLimits = GossipRouterBuilder(params = gossipParamsWithLimits).build()
private val routerWithNoLimits = GossipRouterBuilder(params = gossipParamsNoLimits).build()

private val topic: Topic = "topic1"

@Test
fun validateProtobufLists_validMessage() {
val msg = fullMsgBuilder().build()
@@ -96,7 +99,7 @@ class GossipRouterListLimitsTest {
@Test
fun validateProtobufLists_tooManyIHaves() {
val builder = fullMsgBuilder()
builder.addIHaves(maxIHaveLength, 1)
builder.addIHaves(maxIHaveLength, 1, topic)
val msg = builder.build()

Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isFalse()
@@ -105,7 +108,7 @@ class GossipRouterListLimitsTest {
@Test
fun validateProtobufLists_tooManyIHaveMsgIds() {
val builder = fullMsgBuilder()
builder.addIHaves(1, maxIHaveLength)
builder.addIHaves(1, maxIHaveLength, topic)
val msg = builder.build()

Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isFalse()
@@ -186,7 +189,7 @@ class GossipRouterListLimitsTest {
@Test
fun validateProtobufLists_maxIHaves() {
val builder = fullMsgBuilder()
builder.addIHaves(maxIHaveLength - 1, 1)
builder.addIHaves(maxIHaveLength - 1, 1, topic)
val msg = builder.build()

Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue()
@@ -195,7 +198,7 @@ class GossipRouterListLimitsTest {
@Test
fun validateProtobufLists_maxIHaveMsgIds() {
val builder = fullMsgBuilder()
builder.addIHaves(1, maxIHaveLength - 1)
builder.addIHaves(1, maxIHaveLength - 1, topic)
val msg = builder.build()

Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue()
@@ -256,7 +259,7 @@ class GossipRouterListLimitsTest {
// Add some data to all possible fields
builder.addSubscriptions(listSize)
builder.addPublishMessages(listSize, listSize)
builder.addIHaves(listSize, listSize)
builder.addIHaves(listSize, listSize, topic)
builder.addIWants(listSize, listSize)
builder.addGrafts(listSize)
builder.addPrunes(listSize, listSize)
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package io.libp2p.pubsub.gossip
import io.libp2p.core.PeerId
import io.libp2p.etc.types.toProtobuf
import io.libp2p.etc.types.toWBytes
import io.libp2p.pubsub.Topic
import io.libp2p.pubsub.gossip.builders.GossipParamsBuilder
import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder
import org.assertj.core.api.Assertions.assertThat
@@ -49,7 +50,7 @@ class GossipRpcPartsQueueTest {
queue.addPublish(createRpcMessage("topic-$it", "data"))
}
(1..iHaves).forEach {
queue.addIHave(byteArrayOf(it.toByte()).toWBytes())
queue.addIHave(byteArrayOf(it.toByte()).toWBytes(), "topic-$it")
}
(1..iWants).forEach {
queue.addIWant(byteArrayOf(it.toByte()).toWBytes())
@@ -259,4 +260,50 @@ class GossipRpcPartsQueueTest {
assertThat(msgs).hasSize(3)
assertThat(msgs.merge()).isEqualTo(single)
}

@Test
fun `check that resulting IHAVE sets the topic ID`() {
val topic1: Topic = "topic1"
val messageId1 = "1111".toWBytes()
val topic2: Topic = "topic2"
val messageId2 = "2222".toWBytes()
val partsQueue = TestGossipQueue(gossipParamsWithLimits)
partsQueue.addIHave(messageId1, topic1)
partsQueue.addIHave(messageId2, topic2)
val res = partsQueue.takeMerged().first()

val serialized = res.toByteArray()
val deserializedRpc = Rpc.RPC.parseFrom(serialized)
assertThat(deserializedRpc.control.ihaveList).containsExactlyInAnyOrder(
Rpc.ControlIHave.newBuilder().setTopicID(topic1).addMessageIDs(messageId1.toProtobuf()).build(),
Rpc.ControlIHave.newBuilder().setTopicID(topic2).addMessageIDs(messageId2.toProtobuf()).build(),
)
}

@Test
fun `check that resulting IHAVE correctly groups topics`() {
val partsQueue = TestGossipQueue(gossipParamsWithLimits)

partsQueue.addIHave("1111".toWBytes(), "topic1")
partsQueue.addIHave("2222".toWBytes(), "topic2")
partsQueue.addIHave("3333".toWBytes(), "topic1")

val res = partsQueue.takeMerged().first()

val serialized = res.toByteArray()
val deserializedRpc = Rpc.RPC.parseFrom(serialized)
assertThat(deserializedRpc.control.ihaveList).containsExactlyInAnyOrder(
Rpc.ControlIHave.newBuilder()
.setTopicID("topic1")
.addAllMessageIDs(
listOf(
"1111".toWBytes().toProtobuf(),
"3333".toWBytes().toProtobuf()
)
).build(),
Rpc.ControlIHave.newBuilder()
.setTopicID("topic2")
.addMessageIDs("2222".toWBytes().toProtobuf()).build(),
)
}
}
Original file line number Diff line number Diff line change
@@ -16,6 +16,33 @@ import java.util.concurrent.TimeUnit.SECONDS
abstract class CipherSecureChannelTest(secureChannelCtor: SecureChannelCtor, muxers: List<StreamMuxer>, announce: String) :
SecureChannelTestBase(secureChannelCtor, muxers, announce) {

@Test
fun `verify secure session`() {
val (privKey1, pubKey1) = generateKeyPair(KeyType.ECDSA)
val (privKey2, pubKey2) = generateKeyPair(KeyType.ECDSA)

val protocolSelect1 = makeSelector(privKey1, muxerIds)
val protocolSelect2 = makeSelector(privKey2, muxerIds)

val eCh1 = makeDialChannel("#1", protocolSelect1, PeerId.fromPubKey(pubKey2))
val eCh2 = makeListenChannel("#2", protocolSelect2)

logger.debug("Connecting channels...")
val connection = TestChannel.interConnect(eCh1, eCh2)

val secSession1 = protocolSelect1.selectedFuture.join()
assertThat(secSession1.localId).isEqualTo(PeerId.fromPubKey(pubKey1))
assertThat(secSession1.remoteId).isEqualTo(PeerId.fromPubKey(pubKey2))
assertThat(secSession1.remotePubKey).isEqualTo(pubKey2)

val secSession2 = protocolSelect2.selectedFuture.join()
assertThat(secSession2.localId).isEqualTo(PeerId.fromPubKey(pubKey2))
assertThat(secSession2.remoteId).isEqualTo(PeerId.fromPubKey(pubKey1))
assertThat(secSession2.remotePubKey).isEqualTo(pubKey1)

logger.debug("Connection made: $connection")
}

@Test
fun `incorrect initiator remote PeerId should throw`() {
val (privKey1, _) = generateKeyPair(KeyType.ECDSA)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.libp2p.tools.protobuf

import io.libp2p.etc.types.toProtobuf
import io.libp2p.pubsub.Topic
import pubsub.pb.Rpc
import kotlin.random.Random

@@ -28,9 +29,9 @@ class RpcBuilder {
}
}

fun addIHaves(iHaveCount: Int, messageIdCount: Int) {
fun addIHaves(iHaveCount: Int, messageIdCount: Int, topic: Topic) {
for (i in 0 until iHaveCount) {
val iHaveBuilder = Rpc.ControlIHave.newBuilder()
val iHaveBuilder = Rpc.ControlIHave.newBuilder().setTopicID(topic)
for (j in 0 until messageIdCount) {
iHaveBuilder.addMessageIDs(Random.nextBytes(6).toProtobuf())
}
2 changes: 1 addition & 1 deletion versions.gradle
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@ dependencyManagement {
entry 'protobuf-java'
entry 'protoc'
}
dependencySet(group: "io.netty", version: "4.1.97.Final") {
dependencySet(group: "io.netty", version: "4.1.108.Final") {
entry 'netty-common'
entry 'netty-handler'
entry 'netty-transport'