Skip to content

Commit 26e0684

Browse files
committed
chore: Add early support for FDv2-based test data source
1 parent 61b71b2 commit 26e0684

File tree

4 files changed

+1346
-0
lines changed

4 files changed

+1346
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
import threading
2+
import time
3+
from typing import Generator
4+
from queue import Queue, Empty
5+
6+
from ldclient.impl.datasystem import Update, BasisResult
7+
from ldclient.impl.datasystem.protocolv2 import (
8+
Basis, ChangeSetBuilder, ObjectKind, IntentCode, Selector
9+
)
10+
from ldclient.impl.util import _Success, _Fail, current_time_millis
11+
from ldclient.interfaces import DataSourceState, DataSourceErrorInfo, DataSourceErrorKind
12+
13+
14+
class _TestDataSourceV2:
15+
"""
16+
Internal implementation of both Initializer and Synchronizer protocols for TestDataV2.
17+
18+
This component bridges the test data management in TestDataV2 with the FDv2 protocol
19+
interfaces. Each instance implements both Initializer and Synchronizer protocols
20+
and receives change notifications for dynamic updates.
21+
"""
22+
23+
def __init__(self, test_data):
24+
self._test_data = test_data
25+
self._closed = False
26+
self._update_queue = Queue()
27+
self._lock = threading.Lock()
28+
29+
# Always register for change notifications
30+
self._test_data._add_instance(self)
31+
32+
# Locking strategy:
33+
# The threading.Lock instance (_lock) ensures thread safety for shared resources:
34+
# - Used in `fetch` and `close` to prevent concurrent modification of `_closed`.
35+
# - Added to `upsert_flag` to address potential race conditions.
36+
# - The `sync` method relies on Queue's thread-safe properties for updates.
37+
38+
def fetch(self) -> BasisResult:
39+
"""
40+
Implementation of the Initializer.fetch method.
41+
42+
Returns the current test data as a Basis for initial data loading.
43+
"""
44+
try:
45+
with self._lock:
46+
if self._closed:
47+
return _Fail("TestDataV2 source has been closed")
48+
49+
# Get all current flags from test data
50+
init_data = self._test_data._make_init_data()
51+
version = self._test_data._get_version()
52+
53+
# Build a full transfer changeset
54+
builder = ChangeSetBuilder()
55+
builder.start(IntentCode.TRANSFER_FULL)
56+
57+
# Add all flags to the changeset
58+
for key, flag_data in init_data.items():
59+
builder.add_put(
60+
ObjectKind.FLAG,
61+
key,
62+
flag_data.get('version', 1),
63+
flag_data
64+
)
65+
66+
# Create selector for this version
67+
selector = Selector.new_selector(str(version), version)
68+
change_set = builder.finish(selector)
69+
70+
basis = Basis(
71+
change_set=change_set,
72+
persist=False,
73+
environment_id=None
74+
)
75+
76+
return _Success(basis)
77+
78+
except Exception as e:
79+
return _Fail(f"Error fetching test data: {str(e)}")
80+
81+
def sync(self) -> Generator[Update, None, None]:
82+
"""
83+
Implementation of the Synchronizer.sync method.
84+
85+
Yields updates as test data changes occur.
86+
"""
87+
88+
89+
# First yield initial data
90+
initial_result = self.fetch()
91+
if isinstance(initial_result, _Fail):
92+
yield Update(
93+
state=DataSourceState.OFF,
94+
error=DataSourceErrorInfo(
95+
kind=DataSourceErrorKind.STORE_ERROR,
96+
status_code=0,
97+
time=current_time_millis(),
98+
message=initial_result.error
99+
)
100+
)
101+
return
102+
103+
# Yield the initial successful state
104+
yield Update(
105+
state=DataSourceState.VALID,
106+
change_set=initial_result.value.change_set
107+
)
108+
109+
# Continue yielding updates as they arrive
110+
while not self._closed:
111+
try:
112+
# Wait for updates with a timeout to allow checking closed status
113+
try:
114+
update = self._update_queue.get(timeout=1.0)
115+
except Empty:
116+
continue
117+
118+
if update is None: # Sentinel value for shutdown
119+
break
120+
121+
yield update
122+
123+
except Exception as e:
124+
yield Update(
125+
state=DataSourceState.OFF,
126+
error=DataSourceErrorInfo(
127+
kind=DataSourceErrorKind.UNKNOWN,
128+
status_code=0,
129+
time=current_time_millis(),
130+
message=f"Error in test data synchronizer: {str(e)}"
131+
)
132+
)
133+
break
134+
135+
def close(self):
136+
"""Close the data source and clean up resources."""
137+
with self._lock:
138+
if self._closed:
139+
return
140+
self._closed = True
141+
142+
self._test_data._closed_instance(self)
143+
# Signal shutdown to sync generator
144+
self._update_queue.put(None)
145+
146+
def upsert_flag(self, flag_data: dict):
147+
"""
148+
Called by TestDataV2 when a flag is updated.
149+
150+
This method converts the flag update into an FDv2 changeset and
151+
queues it for delivery through the sync() generator.
152+
"""
153+
with self._lock:
154+
if self._closed:
155+
return
156+
157+
try:
158+
version = self._test_data._get_version()
159+
160+
# Build a changes transfer changeset
161+
builder = ChangeSetBuilder()
162+
builder.start(IntentCode.TRANSFER_CHANGES)
163+
164+
# Add the updated flag
165+
builder.add_put(
166+
ObjectKind.FLAG,
167+
flag_data['key'],
168+
flag_data.get('version', 1),
169+
flag_data
170+
)
171+
172+
# Create selector for this version
173+
selector = Selector.new_selector(str(version), version)
174+
change_set = builder.finish(selector)
175+
176+
# Queue the update
177+
update = Update(
178+
state=DataSourceState.VALID,
179+
change_set=change_set
180+
)
181+
182+
self._update_queue.put(update)
183+
184+
except Exception as e:
185+
# Queue an error update
186+
error_update = Update(
187+
state=DataSourceState.OFF,
188+
error=DataSourceErrorInfo(
189+
kind=DataSourceErrorKind.STORE_ERROR,
190+
status_code=0,
191+
time=current_time_millis(),
192+
message=f"Error processing flag update: {str(e)}"
193+
)
194+
)
195+
self._update_queue.put(error_update)

0 commit comments

Comments
 (0)