1414 INJECTED_COMPONENTS_PY ,
1515 INJECTED_COMPONENTS_PY_CHECKSUMS ,
1616)
17+ from airbyte_cdk .utils .airbyte_secrets_utils import filter_secrets
1718
1819from ..api_models import (
1920 CheckRequest ,
2021 CheckResponse ,
2122 DiscoverRequest ,
2223 DiscoverResponse ,
24+ ErrorResponse ,
2325 FullResolveRequest ,
2426 Manifest ,
2527 ManifestResponse ,
@@ -64,7 +66,13 @@ def safe_build_source(
6466)
6567
6668
67- @router .post ("/test_read" , operation_id = "testRead" )
69+ @router .post (
70+ "/test_read" ,
71+ operation_id = "testRead" ,
72+ responses = {
73+ 400 : {"description" : "Bad Request - Error processing request" , "model" : ErrorResponse }
74+ },
75+ )
6876def test_read (request : StreamTestReadRequest ) -> StreamReadResponse :
6977 """
7078 Test reading from a specific stream in the manifest.
@@ -109,18 +117,29 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse:
109117 )
110118
111119 runner = ManifestCommandProcessor (source )
112- cdk_result = runner .test_read (
113- config_dict ,
114- catalog ,
115- converted_state ,
116- request .record_limit ,
117- request .page_limit ,
118- request .slice_limit ,
119- )
120- return StreamReadResponse .model_validate (asdict (cdk_result ))
121-
122-
123- @router .post ("/check" , operation_id = "check" )
120+ try :
121+ cdk_result = runner .test_read (
122+ config_dict ,
123+ catalog ,
124+ converted_state ,
125+ request .record_limit ,
126+ request .page_limit ,
127+ request .slice_limit ,
128+ )
129+ return StreamReadResponse .model_validate (asdict (cdk_result ))
130+ except Exception as exc :
131+ # Filter secrets from error message before returning to client
132+ sanitized_message = filter_secrets (f"Error reading stream: { str (exc )} " )
133+ raise HTTPException (status_code = 400 , detail = sanitized_message )
134+
135+
136+ @router .post (
137+ "/check" ,
138+ operation_id = "check" ,
139+ responses = {
140+ 400 : {"description" : "Bad Request - Error processing request" , "model" : ErrorResponse }
141+ },
142+ )
124143def check (request : CheckRequest ) -> CheckResponse :
125144 """Check configuration against a manifest"""
126145 # Apply trace tags from context if provided
@@ -130,13 +149,24 @@ def check(request: CheckRequest) -> CheckResponse:
130149 project_id = request .context .project_id ,
131150 )
132151
133- source = safe_build_source (request .manifest .model_dump (), request .config .model_dump ())
134- runner = ManifestCommandProcessor (source )
135- success , message = runner .check_connection (request .config .model_dump ())
136- return CheckResponse (success = success , message = message )
137-
138-
139- @router .post ("/discover" , operation_id = "discover" )
152+ try :
153+ source = safe_build_source (request .manifest .model_dump (), request .config .model_dump ())
154+ runner = ManifestCommandProcessor (source )
155+ success , message = runner .check_connection (request .config .model_dump ())
156+ return CheckResponse (success = success , message = message )
157+ except Exception as exc :
158+ # Filter secrets from error message before returning to client
159+ sanitized_message = filter_secrets (f"Error checking connection: { str (exc )} " )
160+ raise HTTPException (status_code = 400 , detail = sanitized_message )
161+
162+
163+ @router .post (
164+ "/discover" ,
165+ operation_id = "discover" ,
166+ responses = {
167+ 400 : {"description" : "Bad Request - Error processing request" , "model" : ErrorResponse }
168+ },
169+ )
140170def discover (request : DiscoverRequest ) -> DiscoverResponse :
141171 """Discover streams from a manifest"""
142172 # Apply trace tags from context if provided
@@ -146,15 +176,31 @@ def discover(request: DiscoverRequest) -> DiscoverResponse:
146176 project_id = request .context .project_id ,
147177 )
148178
149- source = safe_build_source (request .manifest .model_dump (), request .config .model_dump ())
150- runner = ManifestCommandProcessor (source )
151- catalog = runner .discover (request .config .model_dump ())
152- if catalog is None :
153- raise HTTPException (status_code = 422 , detail = "Connector did not return a discovered catalog" )
154- return DiscoverResponse (catalog = catalog )
155-
156-
157- @router .post ("/resolve" , operation_id = "resolve" )
179+ try :
180+ source = safe_build_source (request .manifest .model_dump (), request .config .model_dump ())
181+ runner = ManifestCommandProcessor (source )
182+ catalog = runner .discover (request .config .model_dump ())
183+ if catalog is None :
184+ raise HTTPException (
185+ status_code = 422 , detail = "Connector did not return a discovered catalog"
186+ )
187+ return DiscoverResponse (catalog = catalog )
188+ except HTTPException :
189+ # Re-raise HTTPExceptions as-is (like the catalog None check above)
190+ raise
191+ except Exception as exc :
192+ # Filter secrets from error message before returning to client
193+ sanitized_message = filter_secrets (f"Error discovering streams: { str (exc )} " )
194+ raise HTTPException (status_code = 400 , detail = sanitized_message )
195+
196+
197+ @router .post (
198+ "/resolve" ,
199+ operation_id = "resolve" ,
200+ responses = {
201+ 400 : {"description" : "Bad Request - Error processing request" , "model" : ErrorResponse }
202+ },
203+ )
158204def resolve (request : ResolveRequest ) -> ManifestResponse :
159205 """Resolve a manifest to its final configuration."""
160206 # Apply trace tags from context if provided
@@ -164,11 +210,22 @@ def resolve(request: ResolveRequest) -> ManifestResponse:
164210 project_id = request .context .project_id ,
165211 )
166212
167- source = safe_build_source (request .manifest .model_dump (), {})
168- return ManifestResponse (manifest = Manifest (** source .resolved_manifest ))
169-
170-
171- @router .post ("/full_resolve" , operation_id = "fullResolve" )
213+ try :
214+ source = safe_build_source (request .manifest .model_dump (), {})
215+ return ManifestResponse (manifest = Manifest (** source .resolved_manifest ))
216+ except Exception as exc :
217+ # Filter secrets from error message before returning to client
218+ sanitized_message = filter_secrets (f"Error resolving manifest: { str (exc )} " )
219+ raise HTTPException (status_code = 400 , detail = sanitized_message )
220+
221+
222+ @router .post (
223+ "/full_resolve" ,
224+ operation_id = "fullResolve" ,
225+ responses = {
226+ 400 : {"description" : "Bad Request - Error processing request" , "model" : ErrorResponse }
227+ },
228+ )
172229def full_resolve (request : FullResolveRequest ) -> ManifestResponse :
173230 """
174231 Fully resolve a manifest, including dynamic streams.
@@ -182,21 +239,26 @@ def full_resolve(request: FullResolveRequest) -> ManifestResponse:
182239 project_id = request .context .project_id ,
183240 )
184241
185- source = safe_build_source (request .manifest .model_dump (), request .config .model_dump ())
186- manifest = {** source .resolved_manifest }
187- streams = manifest .get ("streams" , [])
188- for stream in streams :
189- stream ["dynamic_stream_name" ] = None
190-
191- mapped_streams : Dict [str , List [Dict [str , Any ]]] = {}
192- for stream in source .dynamic_streams :
193- generated_streams = mapped_streams .setdefault (stream ["dynamic_stream_name" ], [])
194-
195- if len (generated_streams ) < request .stream_limit :
196- generated_streams += [stream ]
197-
198- for generated_streams_list in mapped_streams .values ():
199- streams .extend (generated_streams_list )
200-
201- manifest ["streams" ] = streams
202- return ManifestResponse (manifest = Manifest (** manifest ))
242+ try :
243+ source = safe_build_source (request .manifest .model_dump (), request .config .model_dump ())
244+ manifest = {** source .resolved_manifest }
245+ streams = manifest .get ("streams" , [])
246+ for stream in streams :
247+ stream ["dynamic_stream_name" ] = None
248+
249+ mapped_streams : Dict [str , List [Dict [str , Any ]]] = {}
250+ for stream in source .dynamic_streams :
251+ generated_streams = mapped_streams .setdefault (stream ["dynamic_stream_name" ], [])
252+
253+ if len (generated_streams ) < request .stream_limit :
254+ generated_streams += [stream ]
255+
256+ for generated_streams_list in mapped_streams .values ():
257+ streams .extend (generated_streams_list )
258+
259+ manifest ["streams" ] = streams
260+ return ManifestResponse (manifest = Manifest (** manifest ))
261+ except Exception as exc :
262+ # Filter secrets from error message before returning to client
263+ sanitized_message = filter_secrets (f"Error full resolving manifest: { str (exc )} " )
264+ raise HTTPException (status_code = 400 , detail = sanitized_message )
0 commit comments