-
Notifications
You must be signed in to change notification settings - Fork 48
Add shard connection backoff policy #473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Add shard connection backoff policy #473
Conversation
0b80886
to
f62dfa3
Compare
dbb3ad1
to
cbb4719
Compare
Shouldn't we have some warning / info level log when backoff is taking place? |
I would rather not do it, it is not useful and can potentially pollute the log |
Do you know what caused the test failure?
it is a unit test that at the first glance should be fully deterministic. Failure is unexpected. |
It is known issue, conversion goes wrong somewhere |
a43ccd1
to
b0fd069
Compare
f47313f
to
9dfd9ec
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General comment: integration tests for new policies are definitely needed here.
aebc540
to
61668de
Compare
The patchset lacks documentation, which would have helped to understand the feature and when/how to use it. Is documentation a separate repo / commit? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds shard‐aware reconnection policies with support for scheduling constraints. Key changes include new policy implementations and schedulers in cassandra/policies.py, modifications to connection management in cassandra/pool.py and cassandra/cluster.py, and comprehensive tests in both unit and integration suites to validate the new behavior.
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated no comments.
Show a summary per file
File | Description |
---|---|
tests/unit/test_shard_aware.py | Adds tests for both immediate and delayed reconnection behavior using new policies. |
tests/unit/test_policies.py | Introduces extensive tests for scope bucket and scheduler behavior. |
tests/unit/test_host_connection_pool.py | Updates HostConnectionPool tests to integrate the new scheduler. |
tests/integration/long/test_policies.py | Validates backoff policies and correct connection formation across shards. |
tests/integration/init.py | Adds a marker for tests designed for Scylla-specific behavior. |
cassandra/pool.py | Refactors connection replacements to use the new scheduler instead of direct submission. |
cassandra/policies.py | Implements new scheduler classes and backoff policies for shard connections. |
cassandra/cluster.py | Exposes a new property and uses the scheduler for initializing shard connections. |
61668de
to
806aba9
Compare
806aba9
to
2584555
Compare
I have added documentation to all classes. |
I don't think it's such a small feature, and I think details might be missing. I did skim briefly over the code - so I might have missed it - where's the random jitter discussed, so multiple clients when do a concurrent backoff? (again - may have missed it!) |
Add abstract classes: `ShardReconnectionPolicy` and `ShardReconnectionScheduler` And implementations: `NoDelayShardReconnectionPolicy` - policy that represents old behavior of having no delay and no concurrency restriction. `NoConcurrentShardReconnectionPolicy` - policy that limits concurrent reconnections to 1 per scope and introduces delay between reconnections within the scope.
Inject shard reconnection policy into cluster, session, connection and host pool. Drop pending connections tracking logic, since policy does that. Fix some tests that mocks Cluster, session, connection or host pool.
2584555
to
8f3670e
Compare
ok, I will add it, jitter comes from |
@abstractmethod | ||
def schedule( | ||
self, | ||
host_id: str, | ||
shard_id: int, | ||
method: Callable[..., None], | ||
*args: List[Any], | ||
**kwargs: dict[Any, Any]) -> None: | ||
""" | ||
Schedules request to create connection to given host and shard according to the policy. | ||
At no point request is executed on the call, it is always running in a separate thread, | ||
this method is non-blocking in this regard. | ||
|
||
``host_id`` - an id of the host of the shard. | ||
``shard_id`` - an id of the shard. | ||
``method`` - a callable that creates connection and stores it in the connection pool. | ||
Currently, it is `HostConnection._open_connection_to_missing_shard`. | ||
``*args`` and ``**kwargs`` are passed to ``method`` when policy executes it. | ||
""" | ||
raise NotImplementedError() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment does not give me enough info to be able to implement the scheduler.
I understand that the main job of schedule
is to call method
with args
and kwargs
.
- Does it need to call it just once? I assume not, because connection creation could fail. Should the scheduler catch those exceptions and retry?
- Why does it get
args
andkwargs
? The interface would be simpler if the callable had them already bound. - What happens I I call
method
again after it successfully finishes? At no point request is executed on the call, it is always running in a separate thread, this method is non-blocking in this regard.
- I can't tell if this is some guarantee aboutmethod
(e.g that it is non-blocking) or a restriction for implementors of this method.
class ShardConnectionBackoffPolicy(ABC): | ||
""" | ||
Base class for shard connection backoff policies. | ||
These policies allow user to control pace of establishing new connections. | ||
|
||
On `new_scheduler` instantiate a scheduler that behaves according to the policy. | ||
""" | ||
|
||
@abstractmethod | ||
def new_scheduler(self, session: Session) -> ShardConnectionScheduler: | ||
raise NotImplementedError() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does a scheduler need a whole Session
?
If it is only to be able to schedule on executor, imo it is better to provide it with some callable or interface that enables just that instead of whole session. It would be easier to test, but also to understand the code.
class NoDelayShardConnectionBackoffPolicy(ShardConnectionBackoffPolicy): | ||
""" | ||
A shard connection backoff policy with no delay between attempts and no concurrency restrictions. | ||
Ensures that at most one pending connection per (host, shard) pair. | ||
If connection attempts for the same (host, shard) it is silently dropped. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensures that at most one pending connection per (host, shard) pair.
- this sentence is missing a verb.
and no concurrency restrictions
Ensures that at most one pending connection per (host, shard) pair.
Could you elaborate wdym by "no concurrency restrictions"? at most one pending connection per (host, shard) pair.
absolutely does sound like a concurrency restriction.
""" | ||
session: Session | ||
already_scheduled: dict[str, bool] | ||
lock: threading.Lock | ||
|
||
def __init__(self, session: Session): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you use dict with bool values? From the code I don't see a semantic difference between no value in dict and False
, so a set
should be a better fit here.
self.session = weakref.proxy(session) | ||
self.already_scheduled = {} | ||
self.lock = threading.Lock() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.session = weakref.proxy(session)
is another argument for both more descriptive interface docs, and for not using Session
here.
How is the dev implementing such policy after reading its doc comment supposed to know that they should use weakref.proxy
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commit: "feat(cluster): inject shard reconnection policy "
Commit message says that this commit fixes some tests, but that does not seem to be the case.
Instead the test fixes are done in previous commit. Let's fix the commit structure.
My proposal:
- Introduce only the APIs
- Introduce the policy that will be the default one (+ its unit tests)
- Perform the necessary plumbing in driver code to make it use the new policy
- Introduce other policies (one by one) (+ their unit tests if they have any)
- Integration test.
def _test_backoff(self, shard_connection_backoff_policy: ShardConnectionBackoffPolicy): | ||
backoff_policy = None | ||
if isinstance(shard_connection_backoff_policy, LimitedConcurrencyShardConnectionBackoffPolicy): | ||
backoff_policy = shard_connection_backoff_policy.backoff_policy | ||
|
||
cluster = TestCluster( | ||
shard_connection_backoff_policy=shard_connection_backoff_policy, | ||
reconnection_policy=ConstantReconnectionPolicy(0), | ||
) | ||
session = cluster.connect() | ||
sharding_info = get_sharding_info(session) | ||
|
||
# even if backoff is set and there is no sharding info | ||
# behavior should be the same as if there is no backoff policy | ||
if not backoff_policy or not sharding_info: | ||
time.sleep(2) | ||
expected_connections = 1 | ||
if sharding_info: | ||
expected_connections = sharding_info.shards_count | ||
for host_id, connections_count in get_connections_per_host(session).items(): | ||
self.assertEqual(connections_count, expected_connections) | ||
return | ||
|
||
sleep_time = 0 | ||
schedule = backoff_policy.new_schedule() | ||
# Calculate total time it will need to establish all connections | ||
if shard_connection_backoff_policy.scope == ShardConnectionBackoffScope.Cluster: | ||
for _ in session.hosts: | ||
for _ in range(sharding_info.shards_count - 1): | ||
sleep_time += next(schedule) | ||
sleep_time /= shard_connection_backoff_policy.max_concurrent | ||
elif shard_connection_backoff_policy.scope == ShardConnectionBackoffScope.Host: | ||
for _ in range(sharding_info.shards_count - 1): | ||
sleep_time += next(schedule) | ||
sleep_time /= shard_connection_backoff_policy.max_concurrent | ||
else: | ||
raise ValueError("Unknown scope {}".format(shard_connection_backoff_policy.scope)) | ||
|
||
time.sleep(sleep_time / 2) | ||
self.assertFalse( | ||
is_connection_filled(shard_connection_backoff_policy.scope, session, sharding_info.shards_count)) | ||
time.sleep(sleep_time / 2 + 1) | ||
self.assertTrue( | ||
is_connection_filled(shard_connection_backoff_policy.scope, session, sharding_info.shards_count)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If policies accepted a scheduler instead of Session
, then this test could avoid having any sleeps - we could artificially control the scheduler. Much more could be unit tested as well!
with self.lock: | ||
if self.already_scheduled.get(scheduled_key): | ||
return False | ||
self.already_scheduled[scheduled_key] = True | ||
|
||
scope_info = self.scopes.get(scope_hash) | ||
if not scope_info: | ||
scope_info = _ScopeBucket(self.session, self.backoff_policy, self.max_concurrent) | ||
self.scopes[scope_hash] = scope_info | ||
scope_info.schedule_new_connection(CreateConnectionCallback(self._execute, scheduled_key, method, *args, **kwargs)) | ||
return True | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
schedule
is not supposed to return anything according to base class.
schedule = backoff_policy.new_schedule() | ||
for _ in range(shard_count): | ||
sleep_time += next(schedule) | ||
if sleep_time > 0: | ||
time.sleep(sleep_time/2) | ||
# Check that connection are not being established quicker than expected | ||
assert len(pool._connections) < expected_after | ||
time.sleep(sleep_time/2 + 1) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we somehow write this test without sleeps?
Python Driver tests are already unbearably slow, I don't want to make them any slower unless there is absolutely now way to avoid it.
Introduce
ShardReconnectionPolicy
and its implementations:NoDelayShardConnectionBackoffPolicy
: no delay or concurrency limit, ensures at most one pending connection per host+shard.LimitedConcurrencyShardConnectionBackoffPolicy
: limits pending concurrent connections tomax_concurrent
per scope (Cluster or Host) using a backoff policy.The idea of this PR is to shift responsibility of scheduling
HostConnection._open_connection_to_missing_shard
fromHostConnection
toShardConnectionBackoffPolicy
, that givesShardConnectionBackoffPolicy
control over process of opening connections.This feature enables finer control over process of creating per shard connections, helping to prevent connections storms.
Fixes: #483
Solutions tested and rejected
Naive delay
Description
Policy would introduce a delay instead of executing connection creation request right away.
Policy would remember last time when connection creation was scheduled to and when it tries to schedule next request it would make sure that there is time between old and new request execution is equal or more than
delay
it is configured with.Results
It worked fine when cluster operates in a normal way.
However, during testing with artificial delays, it became clear that this approach breaks down when the time to establish a
connection exceeds the configured delay.
In such cases, connections begin to pile up - the greater the connection initialization time relative to the delay, the faster they accumulate.
This becomes especially problematic during connection storms.
As the cluster becomes overloaded and connection initialization slows down, the delay-based throttling loses its effectiveness. In other words, the more the cluster suffers, the less effective the policy becomes.
Solution
The solution was to give the policy direct control over the connection initialization process.
This allows the policy to track how many connections are currently pending and apply delays after connections are created, rather than before.
That change ensures the policy remains effective even under heavy load.
This behavior is exactly what has been implemented in this PR.
Pre-review checklist
./docs/source/
.Fixes:
annotations to PR description.