Skip to content

Commit efdaa65

Browse files
authored
Allow customization of APScheduler (#223)
* Remove deceptive boostrap argument. * Allow customization of the APS scheduler. * Go home VScode you're drunk. * Allow customization of the APS scheduler. * Go home VScode you're drunk. * Remove some exploratory code. * Test with lowewr interval, possibly change for the future.
1 parent 0fcfb6f commit efdaa65

File tree

3 files changed

+81
-8
lines changed

3 files changed

+81
-8
lines changed

UnleashClient/__init__.py

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
# pylint: disable=invalid-name
22
import warnings
3+
import random
4+
import string
35
from datetime import datetime, timezone
46
from typing import Callable, Optional
57
from apscheduler.job import Job
8+
from apscheduler.schedulers.base import BaseScheduler
69
from apscheduler.schedulers.background import BackgroundScheduler
710
from apscheduler.triggers.interval import IntervalTrigger
11+
from apscheduler.executors.pool import ThreadPoolExecutor
812
from UnleashClient.api import register_client
913
from UnleashClient.periodic_tasks import fetch_and_load_features, aggregate_and_send_metrics
1014
from UnleashClient.strategies import ApplicationHostname, Default, GradualRolloutRandom, \
@@ -35,6 +39,8 @@ class UnleashClient:
3539
:param cache_directory: Location of the cache directory. When unset, FCache will determine the location.
3640
:param verbose_log_level: Numerical log level (https://docs.python.org/3/library/logging.html#logging-levels) for cases where checking a feature flag fails.
3741
:param cache: Custom cache implementation that extends UnleashClient.cache.BaseCache. When unset, UnleashClient will use Fcache.
42+
:param scheduler: Custom APScheduler object. Use this if you want to customize jobstore or executors. When unset, UnleashClient will create it's own scheduler.
43+
:param scheduler_executor: Name of APSCheduler executor to use if using a custom scheduler.
3844
"""
3945
def __init__(self,
4046
url: str,
@@ -53,7 +59,9 @@ def __init__(self,
5359
cache_directory: Optional[str] = None,
5460
project_name: str = None,
5561
verbose_log_level: int = 30,
56-
cache: Optional[BaseCache] = None) -> None:
62+
cache: Optional[BaseCache] = None,
63+
scheduler: Optional[BaseScheduler] = None,
64+
scheduler_executor: Optional[str] = None) -> None:
5765
custom_headers = custom_headers or {}
5866
custom_options = custom_options or {}
5967
custom_strategies = custom_strategies or {}
@@ -80,7 +88,6 @@ def __init__(self,
8088

8189
# Class objects
8290
self.features: dict = {}
83-
self.scheduler = BackgroundScheduler()
8491
self.fl_job: Job = None
8592
self.metric_job: Job = None
8693

@@ -91,6 +98,27 @@ def __init__(self,
9198
})
9299
self.unleash_bootstrapped = self.cache.bootstrapped
93100

101+
# Scheduler bootstrapping
102+
# - Figure out the Unleash executor name.
103+
if scheduler and scheduler_executor:
104+
self.unleash_executor_name = scheduler_executor
105+
elif scheduler and not scheduler_executor:
106+
raise ValueError("If using a custom scheduler, you must specify a executor.")
107+
else:
108+
if not scheduler:
109+
LOGGER.warning("scheduler_executor should only be used with a custom scheduler.")
110+
111+
self.unleash_executor_name = f"unleash_executor_{''.join(random.choices(string.ascii_uppercase + string.digits, k=6))}"
112+
113+
# Set up the scheduler.
114+
if scheduler:
115+
self.unleash_scheduler = scheduler
116+
else:
117+
executors = {
118+
self.unleash_executor_name: ThreadPoolExecutor()
119+
}
120+
self.unleash_scheduler = BackgroundScheduler(executors=executors)
121+
94122
# Mappings
95123
default_strategy_mapping = {
96124
"applicationHostname": ApplicationHostname,
@@ -184,20 +212,22 @@ def initialize_client(self, fetch_toggles: bool = True) -> None:
184212

185213
job_func(**job_args) # type: ignore
186214
# Start periodic jobs
187-
self.scheduler.start()
188-
self.fl_job = self.scheduler.add_job(job_func,
215+
self.unleash_scheduler.start()
216+
self.fl_job = self.unleash_scheduler.add_job(job_func,
189217
trigger=IntervalTrigger(
190218
seconds=int(self.unleash_refresh_interval),
191219
jitter=self.unleash_refresh_jitter,
192220
),
221+
executor=self.unleash_executor_name,
193222
kwargs=job_args)
194223

195224
if not self.unleash_disable_metrics:
196-
self.metric_job = self.scheduler.add_job(aggregate_and_send_metrics,
225+
self.metric_job = self.unleash_scheduler.add_job(aggregate_and_send_metrics,
197226
trigger=IntervalTrigger(
198227
seconds=int(self.unleash_metrics_interval),
199228
jitter=self.unleash_metrics_jitter,
200229
),
230+
executor=self.unleash_executor_name,
201231
kwargs=metrics_args)
202232
except Exception as excep:
203233
# Log exceptions during initialization. is_initialized will remain false.
@@ -218,7 +248,7 @@ def destroy(self) -> None:
218248
self.fl_job.remove()
219249
if self.metric_job:
220250
self.metric_job.remove()
221-
self.scheduler.shutdown()
251+
self.unleash_scheduler.shutdown()
222252
self.cache.destroy()
223253

224254
@staticmethod

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ disable = [
2828
]
2929
max-attributes = 25
3030
max-args = 25
31-
max-locals = 20
31+
max-locals = 25
3232
extension-pkg-allow-list = ["mmh3"]
3333

3434
[tool.setuptools_scm]

tests/unit_tests/test_client.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
import pytest
77
import responses
8+
from apscheduler.schedulers.background import BackgroundScheduler
9+
from apscheduler.executors.pool import ThreadPoolExecutor
810
from UnleashClient import UnleashClient
911
from UnleashClient.strategies import Strategy
1012
from tests.utilities.testing_constants import URL, ENVIRONMENT, APP_NAME, INSTANCE_ID, REFRESH_INTERVAL, REFRESH_JITTER, \
@@ -228,7 +230,7 @@ def test_uc_dirty_cache(unleash_client_nodestroy):
228230
unleash_client.initialize_client()
229231
time.sleep(5)
230232
assert unleash_client.is_enabled("testFlag")
231-
unleash_client.scheduler.shutdown()
233+
unleash_client.unleash_scheduler.shutdown()
232234

233235
# Check that everything works if previous cache exists.
234236
unleash_client.initialize_client()
@@ -492,3 +494,44 @@ def test_uc_cache_bootstrap_url(cache):
492494
)
493495
assert len(unleash_client.features) >= 4
494496
assert unleash_client.is_enabled("testFlag")
497+
498+
499+
@responses.activate
500+
def test_uc_custom_scheduler():
501+
# Set up API
502+
responses.add(responses.POST, URL + REGISTER_URL, json={}, status=202)
503+
responses.add(responses.GET, URL + FEATURES_URL, json=MOCK_FEATURE_RESPONSE, status=200, headers={'etag': ETAG_VALUE})
504+
responses.add(responses.POST, URL + METRICS_URL, json={}, status=202)
505+
506+
# Set up UnleashClient
507+
custom_executors = {
508+
'hamster_executor': ThreadPoolExecutor()
509+
}
510+
511+
custom_scheduler = BackgroundScheduler(
512+
executors=custom_executors
513+
)
514+
515+
unleash_client = UnleashClient(
516+
URL,
517+
APP_NAME,
518+
refresh_interval=5,
519+
metrics_interval=10,
520+
scheduler=custom_scheduler,
521+
scheduler_executor='hamster_executor'
522+
)
523+
524+
# Create Unleash client and check initial load
525+
unleash_client.initialize_client()
526+
time.sleep(1)
527+
assert unleash_client.is_initialized
528+
assert len(unleash_client.features) >= 4
529+
530+
# Simulate caching
531+
responses.add(responses.GET, URL + FEATURES_URL, json={}, status=304, headers={'etag': ETAG_VALUE})
532+
time.sleep(6)
533+
534+
# Simulate server provisioning change
535+
responses.add(responses.GET, URL + FEATURES_URL, json=MOCK_ALL_FEATURES, status=200, headers={'etag': 'W/somethingelse'})
536+
time.sleep(6)
537+
assert len(unleash_client.features) >= 9

0 commit comments

Comments
 (0)