3939from opal_common .schemas .store import TransactionType
4040from opal_common .security .sslcontext import get_custom_ssl_context
4141from opal_common .synchronization .hierarchical_lock import HierarchicalLock
42- from opal_common .utils import get_authorization_header
42+ from opal_common .utils import (
43+ get_authorization_header ,
44+ tuple_to_dict ,
45+ )
4346from pydantic .json import pydantic_encoder
4447
4548
4649class DataUpdater :
50+ async def get_base_policy_data (
51+ self , config_url : str = None , data_fetch_reason = "Initial load"
52+ ):
53+ pass
54+
55+ async def stop (self ):
56+ pass
57+
58+ async def wait_until_done (self ):
59+ pass
60+
61+
62+ class DefaultDataUpdater (DataUpdater ):
4763 """The DataUpdater is responsible for synchronizing data sources with the
4864 policy store (e.g. OPA). It listens to Pub/Sub topics for data updates,
4965 fetches the updated data, and writes it into the policy store. The updater
@@ -142,11 +158,11 @@ def __init__(
142158 self ._opal_client_id = opal_client_id
143159
144160 # Prepare any extra headers (token, shard id, etc.)
145- self ._extra_headers = []
146- if self ._token is not None :
147- self ._extra_headers . append (get_authorization_header (self ._token ))
161+ self ._extra_headers = {}
162+ if self ._token is not None and opal_common_config . AUTH_TYPE != "oauth2" :
163+ self ._extra_headers = tuple_to_dict (get_authorization_header (self ._token ))
148164 if self ._shard_id is not None :
149- self ._extra_headers . append (( "X-Shard-ID" , self ._shard_id ))
165+ self ._extra_headers [ "X-Shard-ID" ] = self ._shard_id
150166 if len (self ._extra_headers ) == 0 :
151167 self ._extra_headers = None
152168
@@ -236,11 +252,11 @@ async def get_policy_data_config(self, url: str = None) -> DataSourceConfig:
236252 url = self ._data_sources_config_url
237253 logger .info ("Getting data-sources configuration from '{source}'" , source = url )
238254
239-
240255 headers = {}
241256 if self ._extra_headers is not None :
242257 headers = self ._extra_headers .copy ()
243- headers ['Accept' ] = "application/json"
258+ headers ["Accept" ] = "application/json"
259+ await self ._authenticator .authenticate (headers )
244260
245261 try :
246262 response = await self ._load_policy_data_config (url , headers )
@@ -256,7 +272,9 @@ async def get_policy_data_config(self, url: str = None) -> DataSourceConfig:
256272 logger .exception ("Failed to load data sources config" )
257273 raise
258274
259- async def _load_policy_data_config (self , url : str , headers ) -> aiohttp .ClientResponse :
275+ async def _load_policy_data_config (
276+ self , url : str , headers
277+ ) -> aiohttp .ClientResponse :
260278 async with ClientSession (headers = headers , trust_env = True ) as session :
261279 return await session .get (url , ** self ._ssl_context_kwargs )
262280
@@ -370,8 +388,7 @@ async def _subscriber(self):
370388 methods_class = TenantAwareRpcEventClientMethods ,
371389 on_connect = [self .on_connect , * self ._on_connect_callbacks ],
372390 on_disconnect = [self .on_disconnect , * self ._on_disconnect_callbacks ],
373- additional_headers = self ._extra_headers ,
374- extra_headers = headers ,
391+ additional_headers = headers ,
375392 keep_alive = opal_client_config .KEEP_ALIVE_INTERVAL ,
376393 server_uri = self ._server_url ,
377394 ** self ._ssl_context_kwargs ,
0 commit comments