|
| 1 | +import threading |
| 2 | +import time |
| 3 | +from queue import Empty, Queue |
| 4 | +from typing import Generator |
| 5 | + |
| 6 | +from ldclient.impl.datasystem import BasisResult, Update |
| 7 | +from ldclient.impl.datasystem.protocolv2 import ( |
| 8 | + Basis, |
| 9 | + ChangeSetBuilder, |
| 10 | + IntentCode, |
| 11 | + ObjectKind, |
| 12 | + Selector |
| 13 | +) |
| 14 | +from ldclient.impl.util import _Fail, _Success, current_time_millis |
| 15 | +from ldclient.interfaces import ( |
| 16 | + DataSourceErrorInfo, |
| 17 | + DataSourceErrorKind, |
| 18 | + DataSourceState |
| 19 | +) |
| 20 | + |
| 21 | + |
| 22 | +class _TestDataSourceV2: |
| 23 | + """ |
| 24 | + Internal implementation of both Initializer and Synchronizer protocols for TestDataV2. |
| 25 | +
|
| 26 | + This component bridges the test data management in TestDataV2 with the FDv2 protocol |
| 27 | + interfaces. Each instance implements both Initializer and Synchronizer protocols |
| 28 | + and receives change notifications for dynamic updates. |
| 29 | + """ |
| 30 | + |
| 31 | + def __init__(self, test_data): |
| 32 | + self._test_data = test_data |
| 33 | + self._closed = False |
| 34 | + self._update_queue = Queue() |
| 35 | + self._lock = threading.Lock() |
| 36 | + |
| 37 | + # Always register for change notifications |
| 38 | + self._test_data._add_instance(self) |
| 39 | + |
| 40 | + # Locking strategy: |
| 41 | + # The threading.Lock instance (_lock) ensures thread safety for shared resources: |
| 42 | + # - Used in `fetch` and `close` to prevent concurrent modification of `_closed`. |
| 43 | + # - Added to `upsert_flag` to address potential race conditions. |
| 44 | + # - The `sync` method relies on Queue's thread-safe properties for updates. |
| 45 | + |
| 46 | + def fetch(self) -> BasisResult: |
| 47 | + """ |
| 48 | + Implementation of the Initializer.fetch method. |
| 49 | +
|
| 50 | + Returns the current test data as a Basis for initial data loading. |
| 51 | + """ |
| 52 | + try: |
| 53 | + with self._lock: |
| 54 | + if self._closed: |
| 55 | + return _Fail("TestDataV2 source has been closed") |
| 56 | + |
| 57 | + # Get all current flags from test data |
| 58 | + init_data = self._test_data._make_init_data() |
| 59 | + version = self._test_data._get_version() |
| 60 | + |
| 61 | + # Build a full transfer changeset |
| 62 | + builder = ChangeSetBuilder() |
| 63 | + builder.start(IntentCode.TRANSFER_FULL) |
| 64 | + |
| 65 | + # Add all flags to the changeset |
| 66 | + for key, flag_data in init_data.items(): |
| 67 | + builder.add_put( |
| 68 | + ObjectKind.FLAG, |
| 69 | + key, |
| 70 | + flag_data.get('version', 1), |
| 71 | + flag_data |
| 72 | + ) |
| 73 | + |
| 74 | + # Create selector for this version |
| 75 | + selector = Selector.new_selector(str(version), version) |
| 76 | + change_set = builder.finish(selector) |
| 77 | + |
| 78 | + basis = Basis( |
| 79 | + change_set=change_set, |
| 80 | + persist=False, |
| 81 | + environment_id=None |
| 82 | + ) |
| 83 | + |
| 84 | + return _Success(basis) |
| 85 | + |
| 86 | + except Exception as e: |
| 87 | + return _Fail(f"Error fetching test data: {str(e)}") |
| 88 | + |
| 89 | + def sync(self) -> Generator[Update, None, None]: |
| 90 | + """ |
| 91 | + Implementation of the Synchronizer.sync method. |
| 92 | +
|
| 93 | + Yields updates as test data changes occur. |
| 94 | + """ |
| 95 | + |
| 96 | + |
| 97 | + # First yield initial data |
| 98 | + initial_result = self.fetch() |
| 99 | + if isinstance(initial_result, _Fail): |
| 100 | + yield Update( |
| 101 | + state=DataSourceState.OFF, |
| 102 | + error=DataSourceErrorInfo( |
| 103 | + kind=DataSourceErrorKind.STORE_ERROR, |
| 104 | + status_code=0, |
| 105 | + time=current_time_millis(), |
| 106 | + message=initial_result.error |
| 107 | + ) |
| 108 | + ) |
| 109 | + return |
| 110 | + |
| 111 | + # Yield the initial successful state |
| 112 | + yield Update( |
| 113 | + state=DataSourceState.VALID, |
| 114 | + change_set=initial_result.value.change_set |
| 115 | + ) |
| 116 | + |
| 117 | + # Continue yielding updates as they arrive |
| 118 | + while not self._closed: |
| 119 | + try: |
| 120 | + # Wait for updates with a timeout to allow checking closed status |
| 121 | + try: |
| 122 | + update = self._update_queue.get(timeout=1.0) |
| 123 | + except Empty: |
| 124 | + continue |
| 125 | + |
| 126 | + if update is None: # Sentinel value for shutdown |
| 127 | + break |
| 128 | + |
| 129 | + yield update |
| 130 | + |
| 131 | + except Exception as e: |
| 132 | + yield Update( |
| 133 | + state=DataSourceState.OFF, |
| 134 | + error=DataSourceErrorInfo( |
| 135 | + kind=DataSourceErrorKind.UNKNOWN, |
| 136 | + status_code=0, |
| 137 | + time=current_time_millis(), |
| 138 | + message=f"Error in test data synchronizer: {str(e)}" |
| 139 | + ) |
| 140 | + ) |
| 141 | + break |
| 142 | + |
| 143 | + def close(self): |
| 144 | + """Close the data source and clean up resources.""" |
| 145 | + with self._lock: |
| 146 | + if self._closed: |
| 147 | + return |
| 148 | + self._closed = True |
| 149 | + |
| 150 | + self._test_data._closed_instance(self) |
| 151 | + # Signal shutdown to sync generator |
| 152 | + self._update_queue.put(None) |
| 153 | + |
| 154 | + def upsert_flag(self, flag_data: dict): |
| 155 | + """ |
| 156 | + Called by TestDataV2 when a flag is updated. |
| 157 | +
|
| 158 | + This method converts the flag update into an FDv2 changeset and |
| 159 | + queues it for delivery through the sync() generator. |
| 160 | + """ |
| 161 | + with self._lock: |
| 162 | + if self._closed: |
| 163 | + return |
| 164 | + |
| 165 | + try: |
| 166 | + version = self._test_data._get_version() |
| 167 | + |
| 168 | + # Build a changes transfer changeset |
| 169 | + builder = ChangeSetBuilder() |
| 170 | + builder.start(IntentCode.TRANSFER_CHANGES) |
| 171 | + |
| 172 | + # Add the updated flag |
| 173 | + builder.add_put( |
| 174 | + ObjectKind.FLAG, |
| 175 | + flag_data['key'], |
| 176 | + flag_data.get('version', 1), |
| 177 | + flag_data |
| 178 | + ) |
| 179 | + |
| 180 | + # Create selector for this version |
| 181 | + selector = Selector.new_selector(str(version), version) |
| 182 | + change_set = builder.finish(selector) |
| 183 | + |
| 184 | + # Queue the update |
| 185 | + update = Update( |
| 186 | + state=DataSourceState.VALID, |
| 187 | + change_set=change_set |
| 188 | + ) |
| 189 | + |
| 190 | + self._update_queue.put(update) |
| 191 | + |
| 192 | + except Exception as e: |
| 193 | + # Queue an error update |
| 194 | + error_update = Update( |
| 195 | + state=DataSourceState.OFF, |
| 196 | + error=DataSourceErrorInfo( |
| 197 | + kind=DataSourceErrorKind.STORE_ERROR, |
| 198 | + status_code=0, |
| 199 | + time=current_time_millis(), |
| 200 | + message=f"Error processing flag update: {str(e)}" |
| 201 | + ) |
| 202 | + ) |
| 203 | + self._update_queue.put(error_update) |
0 commit comments