1
+ // SSEHTTPQUERY.tsx
1
2
import { Dropdown , ValueFromOption } from "components/Dropdown" ;
2
3
import { QueryConfigItemWrapper , QueryConfigLabel , QueryConfigWrapper } from "components/query" ;
3
4
import { valueComp , withDefault } from "comps/generators" ;
4
5
import { trans } from "i18n" ;
5
6
import { includes } from "lodash" ;
6
7
import { CompAction , MultiBaseComp } from "lowcoder-core" ;
7
8
import { keyValueListControl } from "../../controls/keyValueListControl" ;
8
- import { ParamsJsonControl , ParamsStringControl , ParamsControlType } from "../../controls/paramsControl" ;
9
+ import { ParamsJsonControl , ParamsStringControl } from "../../controls/paramsControl" ;
9
10
import { withTypeAndChildrenAbstract } from "../../generators/withType" ;
10
- import { QueryResult } from "../queryComp" ;
11
- import { QUERY_EXECUTION_ERROR , QUERY_EXECUTION_OK } from "constants/queryConstants" ;
12
- import { JSONValue } from "util/jsonTypes" ;
13
- import { FunctionProperty } from "../queryCompUtils" ;
11
+ import { toSseQueryView } from "../queryCompUtils" ;
14
12
import {
15
13
HttpHeaderPropertyView ,
16
14
HttpParametersPropertyView ,
@@ -52,7 +50,9 @@ const CommandMap = {
52
50
const childrenMap = {
53
51
httpMethod : valueComp < HttpMethodValue > ( "GET" ) ,
54
52
path : ParamsStringControl ,
55
- headers : withDefault ( keyValueListControl ( ) , [ { key : "" , value : "" } ] ) ,
53
+ headers : withDefault ( keyValueListControl ( ) , [
54
+ { key : "Accept" , value : "text/event-stream" }
55
+ ] ) ,
56
56
params : withDefault ( keyValueListControl ( ) , [ { key : "" , value : "" } ] ) ,
57
57
bodyFormData : withDefault (
58
58
keyValueListControl ( true , [
@@ -61,6 +61,8 @@ const childrenMap = {
61
61
] as const ) ,
62
62
[ { key : "" , value : "" , type : "text" } ]
63
63
) ,
64
+ // Add SSE-specific configuration
65
+ streamingEnabled : valueComp < boolean > ( true ) ,
64
66
} ;
65
67
66
68
const SseHttpTmpQuery = withTypeAndChildrenAbstract (
@@ -72,9 +74,6 @@ const SseHttpTmpQuery = withTypeAndChildrenAbstract(
72
74
) ;
73
75
74
76
export class SseHttpQuery extends SseHttpTmpQuery {
75
- private eventSource : EventSource | undefined ;
76
- private controller : AbortController | undefined ;
77
-
78
77
isWrite ( action : CompAction ) {
79
78
return (
80
79
action . path . includes ( "httpMethod" ) && "value" in action && ! includes ( [ "GET" ] , action . value )
@@ -89,241 +88,13 @@ export class SseHttpQuery extends SseHttpTmpQuery {
89
88
...children . bodyFormData . getQueryParams ( ) ,
90
89
...children . path . getQueryParams ( ) ,
91
90
...children . body . getQueryParams ( ) ,
91
+ // Add streaming flag to params
92
+ { key : "_streaming" , value : ( ) => "true" } ,
93
+ { key : "_streamingEnabled" , value : ( ) => children . streamingEnabled . getView ( ) }
92
94
] ;
93
95
94
- return this . createStreamingQueryView ( params ) ;
95
- }
96
-
97
- private createStreamingQueryView ( params : FunctionProperty [ ] ) {
98
- return async ( props : {
99
- queryId : string ;
100
- applicationId : string ;
101
- applicationPath : string [ ] ;
102
- args ?: Record < string , unknown > ;
103
- variables ?: any ;
104
- timeout : InstanceType < ParamsControlType > ;
105
- callback ?: ( result : QueryResult ) => void ;
106
- } ) : Promise < QueryResult > => {
107
-
108
- try {
109
- const timer = performance . now ( ) ;
110
-
111
- // Process parameters like toQueryView does
112
- const processedParams = this . processParameters ( params , props ) ;
113
-
114
- // Build request from processed parameters
115
- const { url, headers, method, body } = this . buildRequestFromParams ( processedParams , props . args ) ;
116
-
117
- // Execute streaming logic
118
- if ( method === "GET" ) {
119
- return this . handleEventSource ( url , headers , props , timer ) ;
120
- } else {
121
- return this . handleStreamingFetch ( url , headers , method , body , props , timer ) ;
122
- }
123
-
124
- } catch ( error ) {
125
- return this . createErrorResponse ( ( error as Error ) . message ) ;
126
- }
127
- } ;
128
- }
129
-
130
- private processParameters ( params : FunctionProperty [ ] , props : any ) {
131
- let mappedVariables : Array < { key : string , value : string } > = [ ] ;
132
- Object . keys ( props . variables || { } )
133
- . filter ( k => k !== "$queryName" )
134
- . forEach ( key => {
135
- const value = Object . hasOwn ( props . variables [ key ] , 'value' ) ? props . variables [ key ] . value : props . variables [ key ] ;
136
- mappedVariables . push ( {
137
- key : `${ key } .value` ,
138
- value : value || ""
139
- } ) ;
140
- } ) ;
141
-
142
- return [
143
- ...params . filter ( param => {
144
- return ! mappedVariables . map ( v => v . key ) . includes ( param . key ) ;
145
- } ) . map ( ( { key, value } ) => ( { key, value : value ( props . args ) } ) ) ,
146
- ...Object . entries ( props . timeout . getView ( ) ) . map ( ( [ key , value ] ) => ( {
147
- key,
148
- value : ( value as any ) ( props . args ) ,
149
- } ) ) ,
150
- ...mappedVariables ,
151
- ] ;
152
- }
153
-
154
- private buildRequestFromParams ( processedParams : Array < { key : string , value : any } > , args : Record < string , unknown > = { } ) {
155
- // Hardcoded values from the screenshot for testing
156
- const url = "http://localhost:11434/api/generate" ;
157
- const headers = {
158
- "Content-Type" : "application/json" ,
159
- "Accept" : "text/event-stream"
160
- } ;
161
- const method = "POST" ;
162
- const body = JSON . stringify ( {
163
- "model" : "gemma3" ,
164
- "prompt" : "Tell me a short story about a robot" ,
165
- "stream" : true
166
- } ) ;
167
-
168
- console . log ( "Hardcoded request:" , { url, headers, method, body } ) ;
169
-
170
- return { url, headers, method, body } ;
171
- }
172
-
173
- private async handleEventSource (
174
- url : string ,
175
- headers : Record < string , string > ,
176
- props : any ,
177
- timer : number
178
- ) : Promise < QueryResult > {
179
- return new Promise ( ( resolve , reject ) => {
180
- // Clean up any existing connection
181
- this . cleanup ( ) ;
182
-
183
- this . eventSource = new EventSource ( url ) ;
184
-
185
- this . eventSource . onopen = ( ) => {
186
- resolve ( this . createSuccessResponse ( "SSE connection established" , timer ) ) ;
187
- } ;
188
-
189
- this . eventSource . onmessage = ( event ) => {
190
- try {
191
- const data = JSON . parse ( event . data ) ;
192
- props . callback ?.( this . createSuccessResponse ( data ) ) ;
193
- } catch ( error ) {
194
- // Handle non-JSON data
195
- props . callback ?.( this . createSuccessResponse ( event . data ) ) ;
196
- }
197
- } ;
198
-
199
- this . eventSource . onerror = ( error ) => {
200
- this . cleanup ( ) ;
201
- reject ( this . createErrorResponse ( "SSE connection error" ) ) ;
202
- } ;
203
- } ) ;
204
- }
205
-
206
- private async handleStreamingFetch (
207
- url : string ,
208
- headers : Record < string , string > ,
209
- method : string ,
210
- body : string | FormData | undefined ,
211
- props : any ,
212
- timer : number
213
- ) : Promise < QueryResult > {
214
- // Clean up any existing connection
215
- this . cleanup ( ) ;
216
-
217
- this . controller = new AbortController ( ) ;
218
-
219
- const response = await fetch ( url , {
220
- method,
221
- headers : {
222
- ...headers ,
223
- 'Accept' : 'text/event-stream' ,
224
- } ,
225
- body,
226
- signal : this . controller . signal ,
227
- } ) ;
228
-
229
- if ( ! response . ok ) {
230
- throw new Error ( `HTTP ${ response . status } : ${ response . statusText } ` ) ;
231
- }
232
-
233
- // Handle streaming response
234
- const reader = response . body ?. getReader ( ) ;
235
- const decoder = new TextDecoder ( ) ;
236
-
237
- if ( ! reader ) {
238
- throw new Error ( "No readable stream available" ) ;
239
- }
240
-
241
- // Process stream in background
242
- this . processStream ( reader , decoder , props . callback ) ;
243
-
244
- return this . createSuccessResponse ( "Stream connection established" , timer ) ;
245
- }
246
-
247
- private async processStream (
248
- reader : ReadableStreamDefaultReader < Uint8Array > ,
249
- decoder : TextDecoder ,
250
- callback ?: ( result : QueryResult ) => void
251
- ) {
252
- let buffer = '' ;
253
-
254
- try {
255
- while ( true ) {
256
- const { done, value } = await reader . read ( ) ;
257
-
258
- if ( done ) break ;
259
-
260
- buffer += decoder . decode ( value , { stream : true } ) ;
261
-
262
- // Process complete JSON objects or SSE events
263
- const lines = buffer . split ( '\n' ) ;
264
- buffer = lines . pop ( ) || '' ;
265
-
266
- for ( const line of lines ) {
267
- if ( line . trim ( ) ) {
268
- try {
269
- // Handle SSE format: data: {...}
270
- let jsonData = line . trim ( ) ;
271
- if ( jsonData . startsWith ( 'data: ' ) ) {
272
- jsonData = jsonData . substring ( 6 ) ;
273
- }
274
-
275
- // Skip SSE control messages
276
- if ( jsonData === '[DONE]' || jsonData . startsWith ( 'event:' ) || jsonData . startsWith ( 'id:' ) ) {
277
- continue ;
278
- }
279
-
280
- const data = JSON . parse ( jsonData ) ;
281
- callback ?.( this . createSuccessResponse ( data ) ) ;
282
- } catch ( error ) {
283
- // Handle non-JSON lines or plain text
284
- if ( line . trim ( ) !== '' ) {
285
- callback ?.( this . createSuccessResponse ( line . trim ( ) ) ) ;
286
- }
287
- }
288
- }
289
- }
290
- }
291
- } catch ( error : any ) {
292
- if ( error . name !== 'AbortError' ) {
293
- callback ?.( this . createErrorResponse ( ( error as Error ) . message ) ) ;
294
- }
295
- } finally {
296
- reader . releaseLock ( ) ;
297
- }
298
- }
299
-
300
- private createSuccessResponse ( data : JSONValue , runTime ?: number ) : QueryResult {
301
- return {
302
- data,
303
- runTime : runTime || 0 ,
304
- success : true ,
305
- code : QUERY_EXECUTION_OK ,
306
- } ;
307
- }
308
-
309
- private createErrorResponse ( message : string ) : QueryResult {
310
- return {
311
- message,
312
- data : "" ,
313
- success : false ,
314
- code : QUERY_EXECUTION_ERROR ,
315
- } ;
316
- }
317
-
318
- public cleanup ( ) {
319
- if ( this . eventSource ) {
320
- this . eventSource . close ( ) ;
321
- this . eventSource = undefined ;
322
- }
323
- if ( this . controller ) {
324
- this . controller . abort ( ) ;
325
- this . controller = undefined ;
326
- }
96
+ // Use SSE-specific query view
97
+ return toSseQueryView ( params ) ;
327
98
}
328
99
329
100
propertyView ( props : {
@@ -410,6 +181,13 @@ const SseHttpQueryPropertyView = (props: {
410
181
let headers = children . headers
411
182
. toJsonValue ( )
412
183
. filter ( ( header ) => header . key !== ContentTypeKey ) ;
184
+
185
+ // Always ensure Accept: text/event-stream for SSE
186
+ const hasAcceptHeader = headers . some ( h => h . key === "Accept" ) ;
187
+ if ( ! hasAcceptHeader ) {
188
+ headers . push ( { key : "Accept" , value : "text/event-stream" } ) ;
189
+ }
190
+
413
191
if ( value !== "none" ) {
414
192
headers = [
415
193
{
@@ -430,6 +208,15 @@ const SseHttpQueryPropertyView = (props: {
430
208
< QueryConfigLabel />
431
209
< QueryConfigItemWrapper > { showBodyConfig ( children ) } </ QueryConfigItemWrapper >
432
210
</ QueryConfigWrapper >
211
+
212
+ < QueryConfigWrapper >
213
+ < QueryConfigLabel > Streaming Options</ QueryConfigLabel >
214
+ < QueryConfigItemWrapper >
215
+ < div style = { { fontSize : "13px" , color : "#8B8FA3" } } >
216
+ This query will establish a Server-Sent Events connection for real-time data streaming.
217
+ </ div >
218
+ </ QueryConfigItemWrapper >
219
+ </ QueryConfigWrapper >
433
220
</ >
434
221
) ;
435
222
} ;
0 commit comments