Skip to content

Allow creating a connection with ufrag and pwd #82

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 54 additions & 4 deletions src/aioice/ice.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import ipaddress
import logging
import random
import re
import secrets
import socket
import threading
Expand Down Expand Up @@ -136,6 +137,16 @@ def pair_priority(pair: CandidatePair) -> int:
pairs.sort(key=pair_priority)


def validate_password(value: str) -> None:
"""
Check the password is well-formed.

See RFC 5245 - 15.4. "ice-ufrag" and "ice-pwd" Attributes
"""
if not re.match("^[a-z0-9+/]{22,256}$", value):
raise ValueError("Password must satisfy 22*256ice-char")


def validate_remote_candidate(candidate: Candidate) -> Candidate:
"""
Check the remote candidate is supported.
Expand All @@ -146,6 +157,16 @@ def validate_remote_candidate(candidate: Candidate) -> Candidate:
return candidate


def validate_username(value: str) -> None:
"""
Check the username is well-formed.

See RFC 5245 - 15.4. "ice-ufrag" and "ice-pwd" Attributes
"""
if not re.match("^[a-z0-9+/]{4,256}$", value):
raise ValueError("Username must satisfy 4*256ice-char")


class CandidatePair:
def __init__(self, protocol, remote_candidate: Candidate) -> None:
self.task: Optional[asyncio.Task] = None
Expand Down Expand Up @@ -295,6 +316,10 @@ class Connection:
:param use_ipv4: Whether to use IPv4 candidates.
:param use_ipv6: Whether to use IPv6 candidates.
:param transport_policy: Transport policy.
:param local_username: An optional local username, otherwise a random one
will be generated.
:param local_password: An optional local password, otherwise a random one
will be generated.
"""

def __init__(
Expand All @@ -310,12 +335,21 @@ def __init__(
use_ipv4: bool = True,
use_ipv6: bool = True,
transport_policy: TransportPolicy = TransportPolicy.ALL,
local_username: Optional[str] = None,
local_password: Optional[str] = None,
) -> None:
self.ice_controlling = ice_controlling
#: Local username, automatically set to a random value.
self.local_username = random_string(4)
#: Local password, automatically set to a random value.
self.local_password = random_string(22)

if local_username is None:
local_username = random_string(4)
else:
validate_username(local_username)

if local_password is None:
local_password = random_string(22)
else:
validate_password(local_password)

#: Whether the remote party is an ICE Lite implementation.
self.remote_is_lite = False
#: Remote username, which you need to set.
Expand Down Expand Up @@ -345,6 +379,8 @@ def __init__(
self._local_candidates: List[Candidate] = []
self._local_candidates_end = False
self._local_candidates_start = False
self._local_password = local_password
self._local_username = local_username
self._nominated: Dict[int, CandidatePair] = {}
self._nominating: Set[int] = set()
self._protocols: List[StunProtocol] = []
Expand Down Expand Up @@ -374,6 +410,20 @@ def local_candidates(self) -> List[Candidate]:
"""
return self._local_candidates[:]

@property
def local_password(self) -> str:
"""
Local password, set at construction time.
"""
return self._local_password

@property
def local_username(self) -> str:
"""
Local username, set at construction time.
"""
return self._local_username

@property
def remote_candidates(self) -> List[Candidate]:
"""
Expand Down
26 changes: 26 additions & 0 deletions tests/test_ice.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,32 @@ def tearDown(self):
ice.CONSENT_INTERVAL = 5
stun.RETRY_MAX = 6

@asynctest
async def test_local_username_and_password(self):
# No username or password.
connection = ice.Connection(ice_controlling=True)
self.assertEqual(len(connection.local_username), 4)
self.assertEqual(len(connection.local_password), 22)

# Valid username and password.
connection = ice.Connection(
ice_controlling=True,
local_username="test+user",
local_password="some+password/that+is/long+enough",
)
self.assertEqual(connection.local_username, "test+user")
self.assertEqual(connection.local_password, "some+password/that+is/long+enough")

# Invalid username.
with self.assertRaises(ValueError) as cm:
ice.Connection(ice_controlling=True, local_username="a")
self.assertEqual(str(cm.exception), "Username must satisfy 4*256ice-char")

# Invalid password.
with self.assertRaises(ValueError) as cm:
ice.Connection(ice_controlling=True, local_password="aaaaaa")
self.assertEqual(str(cm.exception), "Password must satisfy 22*256ice-char")

@mock.patch("ifaddr.get_adapters")
def test_get_host_addresses(self, mock_get_adapters):
mock_get_adapters.return_value = [
Expand Down
Loading