Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Es fix scribe test #3587

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
461687f
check that the stored blob is at least 1 prefix byte close to peer id
shyba Feb 11, 2022
76bd59d
extract min_prefix_colliding_bits to a contanst
shyba Feb 19, 2022
dc7cd54
extract method and avoid using hash builtin name
shyba Feb 21, 2022
f0e47aa
add get_colliding_prefix_bits, docs and tests
shyba Feb 22, 2022
ca65c1e
replace duplicated code
shyba Feb 22, 2022
8019f4b
stop after finding what to download
shyba Feb 22, 2022
1aa4d9d
simplify, genaralize to any size and fix tests
shyba Feb 23, 2022
3fdadee
dont probe and ignore bad peers
shyba Jan 25, 2022
d762d67
closest peer is only ready when it was contacted and isn't known to b…
shyba Feb 4, 2022
511e57c
fix distance sorting and improve logging
shyba Feb 4, 2022
0faa2d3
bump split index to 2
shyba Feb 4, 2022
fb6e342
add peers from shortlist regardless, but check from other nodes
shyba Feb 7, 2022
b78929f
reset closest peer on failure
shyba Feb 8, 2022
af1a6ed
only return good (contacted) peers
shyba Feb 8, 2022
9a79b33
wait until k peers are ready. do not double add peers
shyba Feb 8, 2022
0b2b10f
bump bottom out limit of peer search so people can use 100 concurrent…
shyba Feb 8, 2022
3876e03
log bottom out of peer search in debug, show short key id for find value
shyba Feb 8, 2022
7d4966e
use a dict for the active queue
shyba Feb 8, 2022
c14915d
don't probe peers too far from the top closest
shyba Feb 8, 2022
6ff867e
bottoming out is now warning and no results for peer search
shyba Feb 8, 2022
5d71372
no stop condition, let it exhaust
shyba Feb 10, 2022
b3614d9
remove all references to bottoming out
shyba Feb 11, 2022
612dbcb
allow running some extra probes for k replacements
shyba Feb 12, 2022
a0e34b0
make timeout handler immune to asyncio time tricks
shyba Feb 18, 2022
868a620
add a way to wait announcements to finish so tests are reliable
shyba Feb 18, 2022
8b10091
better representation of kademliapeer on debug logs
shyba Feb 18, 2022
a76a0ac
simplify dht mock and restore clock after accelerating
shyba Feb 18, 2022
441cc95
fix and enable test_blob_announcer
shyba Feb 18, 2022
f69747b
timeout is now supported on dht tests
shyba Feb 18, 2022
c2478d4
remove unused search rounds
shyba Feb 19, 2022
7ded8a1
make active an explicit ordered dict
shyba Feb 19, 2022
f05943f
implement announcer as a consumer task on gather
shyba Feb 15, 2022
ca4ba19
fixes #3577
eukreign Mar 14, 2022
bb54190
fix tests
eukreign Mar 14, 2022
ad489ed
Merge pull request #3581 from lbryio/deterministic_channel_keys_post_…
eukreign Mar 14, 2022
1134a5a
tests for es should fix
jessopb Mar 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lbry/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ class Config(CLIConfig):
"Routing table bucket index below which we always split the bucket if given a new key to add to it and "
"the bucket is full. As this value is raised the depth of the routing table (and number of peers in it) "
"will increase. This setting is used by seed nodes, you probably don't want to change it during normal "
"use.", 1
"use.", 2
)

# protocol timeouts
Expand Down
44 changes: 25 additions & 19 deletions lbry/dht/blob_announcer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,24 @@ def __init__(self, loop: asyncio.AbstractEventLoop, node: 'Node', storage: 'SQLi
self.storage = storage
self.announce_task: asyncio.Task = None
self.announce_queue: typing.List[str] = []
self._done = asyncio.Event()
self.announced = set()

async def _submit_announcement(self, blob_hash):
try:

peers = len(await self.node.announce_blob(blob_hash))
self.announcements_sent_metric.labels(peers=peers, error=False).inc()
if peers > 4:
return blob_hash
else:
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
except Exception as err:
self.announcements_sent_metric.labels(peers=0, error=True).inc()
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
async def _run_consumer(self):
while self.announce_queue:
try:
blob_hash = self.announce_queue.pop()
peers = len(await self.node.announce_blob(blob_hash))
self.announcements_sent_metric.labels(peers=peers, error=False).inc()
if peers > 4:
self.announced.add(blob_hash)
else:
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
except Exception as err:
self.announcements_sent_metric.labels(peers=0, error=True).inc()
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err))

async def _announce(self, batch_size: typing.Optional[int] = 10):
while batch_size:
Expand All @@ -56,14 +59,14 @@ async def _announce(self, batch_size: typing.Optional[int] = 10):
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
while len(self.announce_queue) > 0:
log.info("%i blobs to announce", len(self.announce_queue))
announced = await asyncio.gather(*[
self._submit_announcement(
self.announce_queue.pop()) for _ in range(batch_size) if self.announce_queue
], loop=self.loop)
announced = list(filter(None, announced))
await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)], loop=self.loop)
announced = list(filter(None, self.announced))
if announced:
await self.storage.update_last_announced_blobs(announced)
log.info("announced %i blobs", len(announced))
self.announced.clear()
self._done.set()
self._done.clear()

def start(self, batch_size: typing.Optional[int] = 10):
assert not self.announce_task or self.announce_task.done(), "already running"
Expand All @@ -72,3 +75,6 @@ def start(self, batch_size: typing.Optional[int] = 10):
def stop(self):
if self.announce_task and not self.announce_task.done():
self.announce_task.cancel()

def wait(self):
return self._done.wait()
1 change: 0 additions & 1 deletion lbry/dht/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
CHECK_REFRESH_INTERVAL = REFRESH_INTERVAL / 5
RPC_ID_LENGTH = 20
PROTOCOL_VERSION = 1
BOTTOM_OUT_LIMIT = 3
MSG_SIZE_LIMIT = 1400


Expand Down
10 changes: 4 additions & 6 deletions lbry/dht/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,25 +202,23 @@ def start(self, interface: str, known_node_urls: typing.Optional[typing.List[typ
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls))

def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
bottom_out_limit: int = constants.BOTTOM_OUT_LIMIT,
max_results: int = constants.K) -> IterativeNodeFinder:

return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
key, bottom_out_limit, max_results, None, shortlist)
key, max_results, None, shortlist)

def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
bottom_out_limit: int = 40,
max_results: int = -1) -> IterativeValueFinder:

return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
key, bottom_out_limit, max_results, None, shortlist)
key, max_results, None, shortlist)

async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2,
bottom_out_limit=20, shortlist: typing.Optional[typing.List['KademliaPeer']] = None
shortlist: typing.Optional[typing.List['KademliaPeer']] = None
) -> typing.List['KademliaPeer']:
peers = []
async for iteration_peers in self.get_iterative_node_finder(
node_id, shortlist=shortlist, bottom_out_limit=bottom_out_limit, max_results=max_results):
node_id, shortlist=shortlist, max_results=max_results):
peers.extend(iteration_peers)
distance = Distance(node_id)
peers.sort(key=lambda peer: distance(peer.node_id))
Expand Down
3 changes: 3 additions & 0 deletions lbry/dht/peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,6 @@ def compact_address_tcp(self) -> bytearray:

def compact_ip(self):
return make_compact_ip(self.address)

def __str__(self):
return f"{self.__class__.__name__}({self.node_id.hex()[:8]}@{self.address}:{self.udp_port}-{self.tcp_port})"
122 changes: 46 additions & 76 deletions lbry/dht/protocol/iterative_find.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from itertools import chain
from collections import defaultdict
from collections import defaultdict, OrderedDict
import typing
import logging
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -74,7 +74,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes,
class IterativeFinder:
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
if len(key) != constants.HASH_LENGTH:
Expand All @@ -85,28 +85,22 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
self.protocol = protocol

self.key = key
self.bottom_out_limit = bottom_out_limit
self.max_results = max_results
self.max_results = max(constants.K, max_results)
self.exclude = exclude or []

self.active: typing.Set['KademliaPeer'] = set()
self.active: typing.Dict['KademliaPeer', int] = OrderedDict() # peer: distance, sorted
self.contacted: typing.Set['KademliaPeer'] = set()
self.distance = Distance(key)

self.closest_peer: typing.Optional['KademliaPeer'] = None
self.prev_closest_peer: typing.Optional['KademliaPeer'] = None

self.iteration_queue = asyncio.Queue(loop=self.loop)

self.running_probes: typing.Set[asyncio.Task] = set()
self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {}
self.iteration_count = 0
self.bottom_out_count = 0
self.running = False
self.tasks: typing.List[asyncio.Task] = []
self.delayed_calls: typing.List[asyncio.Handle] = []
for peer in get_shortlist(routing_table, key, shortlist):
if peer.node_id:
self._add_active(peer)
self._add_active(peer, force=True)
else:
# seed nodes
self._schedule_probe(peer)
Expand Down Expand Up @@ -138,15 +132,14 @@ def get_initial_result(self) -> typing.List['KademliaPeer']: #pylint: disable=n
"""
return []

def _is_closer(self, peer: 'KademliaPeer') -> bool:
return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id)

def _add_active(self, peer):
def _add_active(self, peer, force=False):
if not force and self.peer_manager.peer_is_good(peer) is False:
return
if peer in self.contacted:
return
if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
self.active.add(peer)
if self._is_closer(peer):
self.prev_closest_peer = self.closest_peer
self.closest_peer = peer
self.active[peer] = self.distance(peer.node_id)
self.active = OrderedDict(sorted(self.active.items(), key=lambda item: item[1]))

async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
self._add_active(peer)
Expand All @@ -158,33 +151,43 @@ async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindRespons
log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address,
peer.udp_port, address, udp_port)
self.check_result_ready(response)
self._log_state()

def _reset_closest(self, peer):
if peer in self.active:
del self.active[peer]

async def _send_probe(self, peer: 'KademliaPeer'):
try:
response = await self.send_probe(peer)
except asyncio.TimeoutError:
self.active.discard(peer)
self._reset_closest(peer)
return
except ValueError as err:
log.warning(str(err))
self.active.discard(peer)
self._reset_closest(peer)
return
except TransportNotConnected:
return self.aclose()
except RemoteException:
self._reset_closest(peer)
return
return await self._handle_probe_result(peer, response)

async def _search_round(self):
def _search_round(self):
"""
Send up to constants.alpha (5) probes to closest active peers
"""

added = 0
to_probe = list(self.active - self.contacted)
to_probe.sort(key=lambda peer: self.distance(self.key))
for peer in to_probe:
if added >= constants.ALPHA:
for index, peer in enumerate(self.active.keys()):
if index == 0:
log.debug("closest to probe: %s", peer.node_id.hex()[:8])
if peer in self.contacted:
continue
if len(self.running_probes) >= constants.ALPHA:
break
if index > (constants.K + len(self.running_probes)):
break
origin_address = (peer.address, peer.udp_port)
if origin_address in self.exclude:
Expand All @@ -206,33 +209,22 @@ def _schedule_probe(self, peer: 'KademliaPeer'):
t = self.loop.create_task(self._send_probe(peer))

def callback(_):
self.running_probes.difference_update({
probe for probe in self.running_probes if probe.done() or probe == t
})
if not self.running_probes:
self.tasks.append(self.loop.create_task(self._search_task(0.0)))
self.running_probes.pop(peer, None)
if self.running:
self._search_round()

t.add_done_callback(callback)
self.running_probes.add(t)

async def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY):
try:
if self.running:
await self._search_round()
if self.running:
self.delayed_calls.append(self.loop.call_later(delay, self._search))
except (asyncio.CancelledError, StopAsyncIteration, TransportNotConnected):
if self.running:
self.loop.call_soon(self.aclose)
self.running_probes[peer] = t

def _search(self):
self.tasks.append(self.loop.create_task(self._search_task()))
def _log_state(self):
log.debug("[%s] check result: %i active nodes %i contacted",
self.key.hex()[:8], len(self.active), len(self.contacted))

def __aiter__(self):
if self.running:
raise Exception("already running")
self.running = True
self._search()
self.loop.call_soon(self._search_round)
return self

async def __anext__(self) -> typing.List['KademliaPeer']:
Expand All @@ -252,20 +244,19 @@ async def __anext__(self) -> typing.List['KademliaPeer']:
def aclose(self):
self.running = False
self.iteration_queue.put_nowait(None)
for task in chain(self.tasks, self.running_probes, self.delayed_calls):
for task in chain(self.tasks, self.running_probes.values()):
task.cancel()
self.tasks.clear()
self.running_probes.clear()
self.delayed_calls.clear()


class IterativeNodeFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
shortlist)
self.yielded_peers: typing.Set['KademliaPeer'] = set()

Expand All @@ -276,14 +267,14 @@ async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse:
return FindNodeResponse(self.key, response)

def search_exhausted(self):
self.put_result(self.active, finish=True)
self.put_result(self.active.keys(), finish=True)

def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False):
not_yet_yielded = [
peer for peer in from_iter
if peer not in self.yielded_peers
and peer.node_id != self.protocol.node_id
and self.peer_manager.peer_is_good(peer) is not False
and self.peer_manager.peer_is_good(peer) is True # return only peers who answered
]
not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id))
to_yield = not_yet_yielded[:max(constants.K, self.max_results)]
Expand All @@ -298,26 +289,16 @@ def check_result_ready(self, response: FindNodeResponse):

if found:
log.debug("found")
return self.put_result(self.active, finish=True)
if self.prev_closest_peer and self.closest_peer and not self._is_closer(self.prev_closest_peer):
# log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted),
# self.bottom_out_count, self.iteration_count)
self.bottom_out_count = 0
elif self.prev_closest_peer and self.closest_peer:
self.bottom_out_count += 1
log.info("bottom out %i %i %i", len(self.active), len(self.contacted), self.bottom_out_count)
if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
log.info("limit hit")
self.put_result(self.active, True)
return self.put_result(self.active.keys(), finish=True)


class IterativeValueFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
shortlist)
self.blob_peers: typing.Set['KademliaPeer'] = set()
# this tracks the index of the most recent page we requested from each peer
Expand Down Expand Up @@ -362,23 +343,12 @@ def check_result_ready(self, response: FindValueResponse):
blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
for compact_addr in response.found_compact_addresses]
to_yield = []
self.bottom_out_count = 0
for blob_peer in blob_peers:
if blob_peer not in self.blob_peers:
self.blob_peers.add(blob_peer)
to_yield.append(blob_peer)
if to_yield:
# log.info("found %i new peers for blob", len(to_yield))
self.iteration_queue.put_nowait(to_yield)
# if self.max_results and len(self.blob_peers) >= self.max_results:
# log.info("enough blob peers found")
# if not self.finished.is_set():
# self.finished.set()
elif self.prev_closest_peer and self.closest_peer:
self.bottom_out_count += 1
if self.bottom_out_count >= self.bottom_out_limit:
log.info("blob peer search bottomed out")
self.iteration_queue.put_nowait(None)

def get_initial_result(self) -> typing.List['KademliaPeer']:
if self.protocol.data_store.has_peers_for_blob(self.key):
Expand Down
Loading