Skip to content

Commit c69231a

Browse files
feat: Add support for provider events. (#12)
Co-authored-by: Matthew M. Keeler <[email protected]>
1 parent 256d4ac commit c69231a

File tree

4 files changed

+321
-8
lines changed

4 files changed

+321
-8
lines changed

ld_openfeature/provider.py

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1+
import threading
12
from typing import Any, List, Optional, Union
23

34
from ldclient import LDClient, Config
5+
from ldclient.interfaces import DataSourceStatus, FlagChange, DataSourceState
46
from openfeature.evaluation_context import EvaluationContext
5-
from openfeature.exception import ErrorCode
7+
from openfeature.exception import ErrorCode, ProviderFatalError
68
from openfeature.flag_evaluation import FlagResolutionDetails, FlagType, Reason
79
from openfeature.hook import Hook
810
from openfeature.provider.metadata import Metadata
9-
from openfeature.provider.provider import AbstractProvider
11+
from openfeature.provider import AbstractProvider
12+
from openfeature.event import ProviderEventDetails
1013

1114
from ld_openfeature.impl.context_converter import EvaluationContextConverter
1215
from ld_openfeature.impl.details_converter import ResolutionDetailsConverter
@@ -19,7 +22,60 @@ def __init__(self, config: Config):
1922
self.__context_converter = EvaluationContextConverter()
2023
self.__details_converter = ResolutionDetailsConverter()
2124

25+
def __handle_data_source_status(self, status: DataSourceStatus):
26+
state = status.state
27+
if state == DataSourceState.INITIALIZING:
28+
return
29+
elif state == DataSourceState.VALID:
30+
self.emit_provider_ready(ProviderEventDetails())
31+
elif state == DataSourceState.OFF:
32+
error_message = self.__get_message(status,
33+
"the provider has encountered a permanent error or has been shutdown")
34+
self.emit_provider_error(ProviderEventDetails(error_code=ErrorCode.PROVIDER_FATAL,
35+
message=error_message))
36+
elif state == DataSourceState.INTERRUPTED:
37+
error_message = self.__get_message(status, "encountered an unknown error")
38+
self.emit_provider_stale(ProviderEventDetails(message=error_message))
39+
40+
# For now treat an unknown state as no change.
41+
42+
def __handle_flag_change(self, change: FlagChange):
43+
self.emit_provider_configuration_changed(ProviderEventDetails(flags_changed=[change.key]))
44+
pass
45+
46+
def initialize(self, evaluation_context: EvaluationContext):
47+
ready_event = threading.Event()
48+
49+
def ready_handler(status: DataSourceStatus):
50+
if status.state == DataSourceState.VALID:
51+
ready_event.set()
52+
elif status.state == DataSourceState.OFF:
53+
ready_event.set()
54+
55+
# We listen just to handle the ready event. We do not emit events because the client emits them for us.
56+
self.__client.data_source_status_provider.add_listener(ready_handler)
57+
58+
# Check for conditions that may have happened before we added the listener.
59+
if self.__client.data_source_status_provider.status.state == DataSourceState.OFF:
60+
ready_event.set()
61+
62+
if self.__client.is_initialized():
63+
ready_event.set()
64+
65+
ready_event.wait()
66+
67+
self.__client.data_source_status_provider.remove_listener(ready_handler)
68+
69+
if not self.__client.is_initialized():
70+
raise ProviderFatalError(error_message="launchdarkly client initialization failed")
71+
72+
# Listen to new status events and emit them.
73+
self.__client.data_source_status_provider.add_listener(self.__handle_data_source_status)
74+
self.__client.flag_tracker.add_listener(self.__handle_flag_change)
75+
2276
def shutdown(self):
77+
self.__client.data_source_status_provider.remove_listener(self.__handle_data_source_status)
78+
self.__client.flag_tracker.remove_listener(self.__handle_flag_change)
2379
self.__client.close()
2480

2581
def get_metadata(self) -> Metadata:
@@ -73,7 +129,8 @@ def resolve_object_details(
73129
"""Resolves the flag value for the provided flag key as a list or dictionary"""
74130
return self.__resolve_value(FlagType(FlagType.OBJECT), flag_key, default_value, evaluation_context)
75131

76-
def __resolve_value(self, flag_type: FlagType, flag_key: str, default_value: Any, evaluation_context: Optional[EvaluationContext] = None) -> FlagResolutionDetails:
132+
def __resolve_value(self, flag_type: FlagType, flag_key: str, default_value: Any,
133+
evaluation_context: Optional[EvaluationContext] = None) -> FlagResolutionDetails:
77134
if evaluation_context is None:
78135
return FlagResolutionDetails(
79136
value=default_value,
@@ -103,9 +160,16 @@ def __resolve_value(self, flag_type: FlagType, flag_key: str, default_value: Any
103160

104161
return self.__details_converter.to_resolution_details(result)
105162

106-
def __mismatched_type_details(self, default_value: Any) -> FlagResolutionDetails:
163+
@staticmethod
164+
def __mismatched_type_details(default_value: Any) -> FlagResolutionDetails:
107165
return FlagResolutionDetails(
108166
value=default_value,
109167
reason=Reason(Reason.ERROR),
110168
error_code=ErrorCode.TYPE_MISMATCH
111169
)
170+
171+
@staticmethod
172+
def __get_message(status: DataSourceStatus, default: str):
173+
if status.error and status.error.message:
174+
return status.error.message
175+
return default

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ packages = [
2828

2929
[tool.poetry.dependencies]
3030
python = "^3.8"
31-
openfeature-sdk = ">=0.4.2,<1"
31+
openfeature-sdk = ">=0.7.0,<1"
3232
launchdarkly-server-sdk = "<10"
3333

3434

tests/test_data_sources.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import threading
2+
import time
3+
from typing import Optional
4+
5+
from ldclient import Config
6+
from ldclient.integrations.test_data import TestData
7+
from ldclient.interfaces import UpdateProcessor, DataSourceUpdateSink, DataSourceState, DataSourceErrorInfo, \
8+
DataSourceErrorKind
9+
from ldclient.versioned_data_kind import FEATURES
10+
11+
12+
class FailingDataSource(UpdateProcessor):
13+
def __init__(self, config: Config, store, ready: threading.Event):
14+
self._data_source_update_sink: Optional[DataSourceUpdateSink] = config.data_source_update_sink
15+
self._ready = ready
16+
17+
def start(self):
18+
if self._data_source_update_sink is None:
19+
return
20+
21+
self._ready.set()
22+
23+
self._data_source_update_sink.update_status(
24+
DataSourceState.OFF,
25+
DataSourceErrorInfo(
26+
DataSourceErrorKind.ERROR_RESPONSE,
27+
401,
28+
time.time(),
29+
str("Bad things")
30+
)
31+
)
32+
33+
def stop(self):
34+
pass
35+
36+
def is_alive(self):
37+
return False
38+
39+
def initialized(self):
40+
return False
41+
42+
43+
class DelayedFailingDataSource(UpdateProcessor):
44+
def __init__(self, config: Config, store, ready: threading.Event):
45+
self._data_source_update_sink: Optional[DataSourceUpdateSink] = config.data_source_update_sink
46+
self._ready = ready
47+
48+
def start(self):
49+
if self._data_source_update_sink is None:
50+
return
51+
52+
self._ready.set()
53+
54+
def data_source_failure():
55+
self._data_source_update_sink.update_status(
56+
DataSourceState.OFF,
57+
DataSourceErrorInfo(
58+
DataSourceErrorKind.ERROR_RESPONSE,
59+
401,
60+
time.time(),
61+
str("Bad things")
62+
)
63+
)
64+
65+
threading.Timer(0.1, data_source_failure).start()
66+
67+
def stop(self):
68+
pass
69+
70+
def is_alive(self):
71+
return False
72+
73+
def initialized(self):
74+
return False
75+
76+
77+
class StaleDataSource(UpdateProcessor):
78+
def __init__(self, config: Config, store, ready: threading.Event):
79+
self._data_source_update_sink: Optional[DataSourceUpdateSink] = config.data_source_update_sink
80+
self._ready = ready
81+
82+
def start(self):
83+
self._ready.set()
84+
self._data_source_update_sink.update_status(DataSourceState.VALID, None)
85+
86+
def data_source_interrupted():
87+
self._data_source_update_sink.update_status(
88+
DataSourceState.INTERRUPTED,
89+
DataSourceErrorInfo(
90+
DataSourceErrorKind.ERROR_RESPONSE,
91+
408,
92+
time.time(),
93+
str("Less bad things")
94+
)
95+
)
96+
97+
threading.Timer(0.1, data_source_interrupted).start()
98+
99+
def stop(self):
100+
pass
101+
102+
def is_alive(self):
103+
return False
104+
105+
def initialized(self):
106+
return True
107+
108+
109+
class UpdatingDataSource(UpdateProcessor):
110+
def __init__(self, config: Config, store, ready: threading.Event):
111+
self._data_source_update_sink: Optional[DataSourceUpdateSink] = config.data_source_update_sink
112+
self._ready = ready
113+
114+
def start(self):
115+
self._ready.set()
116+
self._data_source_update_sink.init({})
117+
self._data_source_update_sink.update_status(DataSourceState.VALID, None)
118+
119+
def update_data():
120+
# The test_data_source is only used to access the flag builder.
121+
# We call _build here, once TestData supports change handlers we should remove this.
122+
self._data_source_update_sink.upsert(FEATURES,
123+
TestData().data_source().flag("potato").on(True)._build(1))
124+
125+
threading.Timer(0.1, update_data).start()
126+
127+
def stop(self):
128+
pass
129+
130+
def is_alive(self):
131+
return False
132+
133+
def initialized(self):
134+
return True

0 commit comments

Comments
 (0)