|
| 1 | +import threading |
| 2 | +import time |
| 3 | +from typing import Generator |
| 4 | +from queue import Queue, Empty |
| 5 | + |
| 6 | +from ldclient.impl.datasystem import Update, BasisResult |
| 7 | +from ldclient.impl.datasystem.protocolv2 import ( |
| 8 | + Basis, ChangeSetBuilder, ObjectKind, IntentCode, Selector |
| 9 | +) |
| 10 | +from ldclient.impl.util import _Success, _Fail, current_time_millis |
| 11 | +from ldclient.interfaces import DataSourceState, DataSourceErrorInfo, DataSourceErrorKind |
| 12 | + |
| 13 | + |
| 14 | +class _TestDataSourceV2: |
| 15 | + """Internal implementation of both Initializer and Synchronizer protocols for TestDataV2. |
| 16 | +
|
| 17 | + This component bridges the test data management in TestDataV2 with the FDv2 protocol |
| 18 | + interfaces. Each instance implements both Initializer and Synchronizer protocols |
| 19 | + and receives change notifications for dynamic updates. |
| 20 | + """ |
| 21 | + |
| 22 | + def __init__(self, test_data): |
| 23 | + self._test_data = test_data |
| 24 | + self._closed = False |
| 25 | + self._update_queue = Queue() |
| 26 | + self._lock = threading.Lock() |
| 27 | + |
| 28 | + # Always register for change notifications |
| 29 | + self._test_data._add_instance(self) |
| 30 | + |
| 31 | + # Locking strategy: |
| 32 | + # The threading.Lock instance (_lock) ensures thread safety for shared resources: |
| 33 | + # - Used in `fetch` and `close` to prevent concurrent modification of `_closed`. |
| 34 | + # - Added to `upsert_flag` to address potential race conditions. |
| 35 | + # - The `sync` method relies on Queue's thread-safe properties for updates. |
| 36 | + |
| 37 | + def fetch(self) -> BasisResult: |
| 38 | + """Implementation of the Initializer.fetch method. |
| 39 | +
|
| 40 | + Returns the current test data as a Basis for initial data loading. |
| 41 | + """ |
| 42 | + try: |
| 43 | + with self._lock: |
| 44 | + if self._closed: |
| 45 | + return _Fail("TestDataV2 source has been closed") |
| 46 | + |
| 47 | + # Get all current flags from test data |
| 48 | + init_data = self._test_data._make_init_data() |
| 49 | + version = self._test_data._get_version() |
| 50 | + |
| 51 | + # Build a full transfer changeset |
| 52 | + builder = ChangeSetBuilder() |
| 53 | + builder.start(IntentCode.TRANSFER_FULL) |
| 54 | + |
| 55 | + # Add all flags to the changeset |
| 56 | + for key, flag_data in init_data.items(): |
| 57 | + builder.add_put( |
| 58 | + ObjectKind.FLAG, |
| 59 | + key, |
| 60 | + flag_data.get('version', 1), |
| 61 | + flag_data |
| 62 | + ) |
| 63 | + |
| 64 | + # Create selector for this version |
| 65 | + selector = Selector.new_selector(str(version), version) |
| 66 | + change_set = builder.finish(selector) |
| 67 | + |
| 68 | + basis = Basis( |
| 69 | + change_set=change_set, |
| 70 | + persist=False, |
| 71 | + environment_id=None |
| 72 | + ) |
| 73 | + |
| 74 | + return _Success(basis) |
| 75 | + |
| 76 | + except Exception as e: |
| 77 | + return _Fail(f"Error fetching test data: {str(e)}") |
| 78 | + |
| 79 | + def sync(self) -> Generator[Update, None, None]: |
| 80 | + """Implementation of the Synchronizer.sync method. |
| 81 | +
|
| 82 | + Yields updates as test data changes occur. |
| 83 | + """ |
| 84 | + |
| 85 | + |
| 86 | + # First yield initial data |
| 87 | + initial_result = self.fetch() |
| 88 | + if isinstance(initial_result, _Fail): |
| 89 | + yield Update( |
| 90 | + state=DataSourceState.OFF, |
| 91 | + error=DataSourceErrorInfo( |
| 92 | + kind=DataSourceErrorKind.STORE_ERROR, |
| 93 | + status_code=0, |
| 94 | + time=current_time_millis(), |
| 95 | + message=initial_result.error |
| 96 | + ) |
| 97 | + ) |
| 98 | + return |
| 99 | + |
| 100 | + # Yield the initial successful state |
| 101 | + yield Update( |
| 102 | + state=DataSourceState.VALID, |
| 103 | + change_set=initial_result.value.change_set |
| 104 | + ) |
| 105 | + |
| 106 | + # Continue yielding updates as they arrive |
| 107 | + while not self._closed: |
| 108 | + try: |
| 109 | + # Wait for updates with a timeout to allow checking closed status |
| 110 | + try: |
| 111 | + update = self._update_queue.get(timeout=1.0) |
| 112 | + except Empty: |
| 113 | + continue |
| 114 | + |
| 115 | + if update is None: # Sentinel value for shutdown |
| 116 | + break |
| 117 | + |
| 118 | + yield update |
| 119 | + |
| 120 | + except Exception as e: |
| 121 | + yield Update( |
| 122 | + state=DataSourceState.OFF, |
| 123 | + error=DataSourceErrorInfo( |
| 124 | + kind=DataSourceErrorKind.UNKNOWN, |
| 125 | + status_code=0, |
| 126 | + time=current_time_millis(), |
| 127 | + message=f"Error in test data synchronizer: {str(e)}" |
| 128 | + ) |
| 129 | + ) |
| 130 | + break |
| 131 | + |
| 132 | + def close(self): |
| 133 | + """Close the data source and clean up resources.""" |
| 134 | + with self._lock: |
| 135 | + if self._closed: |
| 136 | + return |
| 137 | + self._closed = True |
| 138 | + |
| 139 | + self._test_data._closed_instance(self) |
| 140 | + # Signal shutdown to sync generator |
| 141 | + self._update_queue.put(None) |
| 142 | + |
| 143 | + def upsert_flag(self, flag_data: dict): |
| 144 | + """Called by TestDataV2 when a flag is updated. |
| 145 | +
|
| 146 | + This method converts the flag update into an FDv2 changeset and |
| 147 | + queues it for delivery through the sync() generator. |
| 148 | + """ |
| 149 | + with self._lock: |
| 150 | + if self._closed: |
| 151 | + return |
| 152 | + |
| 153 | + try: |
| 154 | + version = self._test_data._get_version() |
| 155 | + |
| 156 | + # Build a changes transfer changeset |
| 157 | + builder = ChangeSetBuilder() |
| 158 | + builder.start(IntentCode.TRANSFER_CHANGES) |
| 159 | + |
| 160 | + # Add the updated flag |
| 161 | + builder.add_put( |
| 162 | + ObjectKind.FLAG, |
| 163 | + flag_data['key'], |
| 164 | + flag_data.get('version', 1), |
| 165 | + flag_data |
| 166 | + ) |
| 167 | + |
| 168 | + # Create selector for this version |
| 169 | + selector = Selector.new_selector(str(version), version) |
| 170 | + change_set = builder.finish(selector) |
| 171 | + |
| 172 | + # Queue the update |
| 173 | + update = Update( |
| 174 | + state=DataSourceState.VALID, |
| 175 | + change_set=change_set |
| 176 | + ) |
| 177 | + |
| 178 | + self._update_queue.put(update) |
| 179 | + |
| 180 | + except Exception as e: |
| 181 | + # Queue an error update |
| 182 | + error_update = Update( |
| 183 | + state=DataSourceState.OFF, |
| 184 | + error=DataSourceErrorInfo( |
| 185 | + kind=DataSourceErrorKind.STORE_ERROR, |
| 186 | + status_code=0, |
| 187 | + time=current_time_millis(), |
| 188 | + message=f"Error processing flag update: {str(e)}" |
| 189 | + ) |
| 190 | + ) |
| 191 | + self._update_queue.put(error_update) |
0 commit comments