|
11 | 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
12 | 12 | # See the License for the specific language governing permissions and
|
13 | 13 | # limitations under the License.
|
14 |
| - |
| 14 | +import os |
| 15 | +import time |
15 | 16 | import unittest
|
| 17 | +from typing import Optional |
| 18 | +from unittest.mock import Mock |
16 | 19 |
|
17 | 20 | from cassandra import ConsistencyLevel, Unavailable
|
18 |
| -from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT |
| 21 | +from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT, Session |
| 22 | +from cassandra.policies import LimitedConcurrencyShardConnectionBackoffPolicy, ConstantReconnectionPolicy, \ |
| 23 | + ShardConnectionBackoffPolicy, NoDelayShardConnectionBackoffPolicy, _ScopeBucket, \ |
| 24 | + _NoDelayShardConnectionBackoffScheduler |
| 25 | +from cassandra.shard_info import _ShardingInfo |
19 | 26 |
|
20 |
| -from tests.integration import use_cluster, get_cluster, get_node, TestCluster |
| 27 | +from tests.integration import use_cluster, get_cluster, get_node, TestCluster, remove_cluster |
21 | 28 |
|
22 | 29 |
|
23 | 30 | def setup_module():
|
24 |
| - use_cluster('test_cluster', [4]) |
| 31 | + os.environ['SCYLLA_EXT_OPTS'] = "--smp 8" |
| 32 | + use_cluster('test_cluster', [2]) |
| 33 | + |
| 34 | + |
| 35 | +def teardown_module(): |
| 36 | + remove_cluster() |
| 37 | + del os.environ['SCYLLA_EXT_OPTS'] |
25 | 38 |
|
26 | 39 |
|
27 | 40 | class RetryPolicyTests(unittest.TestCase):
|
@@ -65,3 +78,90 @@ def test_should_rethrow_on_unvailable_with_default_policy_if_cas(self):
|
65 | 78 | self.assertEqual(exception.consistency, ConsistencyLevel.SERIAL)
|
66 | 79 | self.assertEqual(exception.required_replicas, 2)
|
67 | 80 | self.assertEqual(exception.alive_replicas, 1)
|
| 81 | + |
| 82 | + |
| 83 | +class ShardBackoffPolicyTests(unittest.TestCase): |
| 84 | + def test_limited_concurrency_1_connection_per_host(self): |
| 85 | + self._test_backoff( |
| 86 | + LimitedConcurrencyShardConnectionBackoffPolicy( |
| 87 | + backoff_policy=ConstantReconnectionPolicy(0.1), |
| 88 | + max_concurrent=1, |
| 89 | + ) |
| 90 | + ) |
| 91 | + |
| 92 | + def test_limited_concurrency_2_connection_per_host(self): |
| 93 | + self._test_backoff( |
| 94 | + LimitedConcurrencyShardConnectionBackoffPolicy( |
| 95 | + backoff_policy=ConstantReconnectionPolicy(0.1), |
| 96 | + max_concurrent=1, |
| 97 | + ) |
| 98 | + ) |
| 99 | + |
| 100 | + def test_no_delay(self): |
| 101 | + self._test_backoff(NoDelayShardConnectionBackoffPolicy()) |
| 102 | + |
| 103 | + def _test_backoff(self, shard_connection_backoff_policy: ShardConnectionBackoffPolicy): |
| 104 | + backoff_policy = None |
| 105 | + if isinstance(shard_connection_backoff_policy, LimitedConcurrencyShardConnectionBackoffPolicy): |
| 106 | + backoff_policy = shard_connection_backoff_policy.backoff_policy |
| 107 | + |
| 108 | + cluster = TestCluster( |
| 109 | + shard_connection_backoff_policy=shard_connection_backoff_policy, |
| 110 | + reconnection_policy=ConstantReconnectionPolicy(0), |
| 111 | + ) |
| 112 | + |
| 113 | + # Collect scheduled calls and execute them right away |
| 114 | + scheduler_calls = [] |
| 115 | + original_schedule = cluster.scheduler.schedule |
| 116 | + pending = 0 |
| 117 | + |
| 118 | + def new_schedule(delay, fn, *args, **kwargs): |
| 119 | + nonlocal pending |
| 120 | + pending+=1 |
| 121 | + |
| 122 | + def execute(): |
| 123 | + nonlocal pending |
| 124 | + try: |
| 125 | + fn(*args, **kwargs) |
| 126 | + finally: |
| 127 | + pending-=1 |
| 128 | + |
| 129 | + scheduler_calls.append((delay, fn, args, kwargs)) |
| 130 | + return original_schedule(0, execute) |
| 131 | + |
| 132 | + cluster.scheduler.schedule = Mock(side_effect=new_schedule) |
| 133 | + |
| 134 | + session = cluster.connect() |
| 135 | + sharding_info = get_sharding_info(session) |
| 136 | + |
| 137 | + # Since scheduled calls executed in a separate thread we need to give them some time to complete |
| 138 | + while pending > 0: |
| 139 | + time.sleep(0.01) |
| 140 | + |
| 141 | + if not sharding_info: |
| 142 | + # If it is not scylla `ShardConnectionBackoffScheduler` should not be involved |
| 143 | + for delay, fn, args, kwargs in scheduler_calls: |
| 144 | + if fn.__self__.__class__ is _ScopeBucket or fn.__self__.__class__ is _NoDelayShardConnectionBackoffScheduler: |
| 145 | + self.fail( |
| 146 | + "in non-shard-aware case connection should be created directly, not involving ShardConnectionBackoffScheduler") |
| 147 | + return |
| 148 | + |
| 149 | + sleep_time = 0 |
| 150 | + if backoff_policy: |
| 151 | + schedule = backoff_policy.new_schedule() |
| 152 | + sleep_time = next(iter(schedule)) |
| 153 | + |
| 154 | + # Make sure that all scheduled calls have delay according to policy |
| 155 | + found_related_calls = 0 |
| 156 | + for delay, fn, args, kwargs in scheduler_calls: |
| 157 | + if fn.__self__.__class__ is _ScopeBucket or fn.__self__.__class__ is _NoDelayShardConnectionBackoffScheduler: |
| 158 | + found_related_calls += 1 |
| 159 | + self.assertEqual(delay, sleep_time) |
| 160 | + self.assertLessEqual(len(session.hosts) * (sharding_info.shards_count - 1), found_related_calls) |
| 161 | + |
| 162 | + |
| 163 | +def get_sharding_info(session: Session) -> Optional[_ShardingInfo]: |
| 164 | + for host in session.hosts: |
| 165 | + if host.sharding_info: |
| 166 | + return host.sharding_info |
| 167 | + return None |
0 commit comments