@@ -16,6 +16,10 @@ typedef struct {
1616 int32 timeout_milliseconds ;
1717} CurlData ;
1818
19+ static SPIPlanPtr del_response_plan = NULL ;
20+ static SPIPlanPtr del_return_queue_plan = NULL ;
21+ static SPIPlanPtr ins_response_plan = NULL ;
22+
1923static size_t
2024body_cb (void * contents , size_t size , size_t nmemb , void * userp )
2125{
@@ -139,19 +143,29 @@ void set_curl_mhandle(WorkerState *wstate){
139143uint64 delete_expired_responses (char * ttl , int batch_size ){
140144 SPI_connect ();
141145
142- int ret_code = SPI_execute_with_args ("\
143- WITH\
144- rows AS (\
145- SELECT ctid\
146- FROM net._http_response\
147- WHERE created < now() - $1\
148- ORDER BY created\
149- LIMIT $2\
150- )\
151- DELETE FROM net._http_response r\
152- USING rows WHERE r.ctid = rows.ctid" ,
153- 2 ,
154- (Oid []){INTERVALOID , INT4OID },
146+ if (del_response_plan == NULL ) {
147+ SPIPlanPtr tmp = SPI_prepare ("\
148+ WITH\
149+ rows AS (\
150+ SELECT ctid\
151+ FROM net._http_response\
152+ WHERE created < now() - $1\
153+ ORDER BY created\
154+ LIMIT $2\
155+ )\
156+ DELETE FROM net._http_response r\
157+ USING rows WHERE r.ctid = rows.ctid" ,
158+ 2 ,
159+ (Oid []){INTERVALOID , INT4OID });
160+ if (tmp == NULL )
161+ ereport (ERROR , errmsg ("SPI_prepare failed: %s" , SPI_result_code_string (SPI_result )));
162+
163+ del_response_plan = SPI_saveplan (tmp );
164+ if (del_response_plan == NULL )
165+ ereport (ERROR , errmsg ("SPI_saveplan failed" ));
166+ }
167+
168+ int ret_code = SPI_execute_plan (del_response_plan ,
155169 (Datum []){
156170 DirectFunctionCall3 (interval_in , CStringGetDatum (ttl ), ObjectIdGetDatum (InvalidOid ), Int32GetDatum (-1 ))
157171 , Int32GetDatum (batch_size )
@@ -172,21 +186,30 @@ uint64 delete_expired_responses(char *ttl, int batch_size){
172186uint64 consume_request_queue (CURLM * curl_mhandle , int batch_size , MemoryContext curl_memctx ){
173187 SPI_connect ();
174188
175- int ret_code = SPI_execute_with_args ("\
176- WITH\
177- rows AS (\
178- SELECT id\
179- FROM net.http_request_queue\
180- ORDER BY id\
181- LIMIT $1\
182- )\
183- DELETE FROM net.http_request_queue q\
184- USING rows WHERE q.id = rows.id\
185- RETURNING q.id, q.method, q.url, timeout_milliseconds, array(select key || ': ' || value from jsonb_each_text(q.headers)), q.body" ,
186- 1 ,
187- (Oid []){INT4OID },
188- (Datum []){Int32GetDatum (batch_size )},
189- NULL , false, 0 );
189+ if (del_return_queue_plan == NULL ) {
190+ SPIPlanPtr tmp = SPI_prepare ("\
191+ WITH\
192+ rows AS (\
193+ SELECT id\
194+ FROM net.http_request_queue\
195+ ORDER BY id\
196+ LIMIT $1\
197+ )\
198+ DELETE FROM net.http_request_queue q\
199+ USING rows WHERE q.id = rows.id\
200+ RETURNING q.id, q.method, q.url, timeout_milliseconds, array(select key || ': ' || value from jsonb_each_text(q.headers)), q.body" ,
201+ 1 ,
202+ (Oid []){INT4OID });
203+
204+ if (tmp == NULL )
205+ ereport (ERROR , errmsg ("SPI_prepare failed: %s" , SPI_result_code_string (SPI_result )));
206+
207+ del_return_queue_plan = SPI_saveplan (tmp );
208+ if (del_return_queue_plan == NULL )
209+ ereport (ERROR , errmsg ("SPI_saveplan failed" ));
210+ }
211+
212+ int ret_code = SPI_execute_plan (del_return_queue_plan , (Datum []){Int32GetDatum (batch_size )}, NULL , false, 0 );
190213
191214 if (ret_code != SPI_OK_DELETE_RETURNING )
192215 ereport (ERROR , errmsg ("Error getting http request queue: %s" , SPI_result_code_string (ret_code )));
@@ -305,12 +328,23 @@ static void insert_response(CURL *ez_handle, CurlData *cdata, CURLcode curl_retu
305328 }
306329 }
307330
308- int ret_code = SPI_execute_with_args ("\
309- insert into net._http_response(id, status_code, content, headers, content_type, timed_out, error_msg) values ($1, $2, $3, $4, $5, $6, $7)" ,
310- nparams ,
311- (Oid [nparams ]){INT8OID , INT4OID , TEXTOID , JSONBOID , TEXTOID , BOOLOID , TEXTOID },
312- vals , nulls ,
313- false, 1 );
331+ if (ins_response_plan == NULL ) {
332+ SPIPlanPtr tmp = SPI_prepare ("\
333+ insert into net._http_response(id, status_code, content, headers, content_type, timed_out, error_msg) values ($1, $2, $3, $4, $5, $6, $7)" ,
334+ nparams ,
335+ (Oid [nparams ]){INT8OID , INT4OID , TEXTOID , JSONBOID , TEXTOID , BOOLOID , TEXTOID });
336+
337+ if (tmp == NULL )
338+ ereport (ERROR , errmsg ("SPI_prepare failed: %s" , SPI_result_code_string (SPI_result )));
339+
340+ ins_response_plan = SPI_saveplan (tmp );
341+ if (ins_response_plan == NULL )
342+ ereport (ERROR , errmsg ("SPI_saveplan failed" ));
343+
344+ SPI_freeplan (tmp );
345+ }
346+
347+ int ret_code = SPI_execute_plan (ins_response_plan , vals , nulls , false, 0 );
314348
315349 if (ret_code != SPI_OK_INSERT )
316350 {
0 commit comments