@@ -7,10 +7,6 @@ use std::{
7
7
use anyhow:: Context ;
8
8
use async_trait:: async_trait;
9
9
use convex_fivetran_common:: config:: Config ;
10
- use convex_fivetran_source:: api_types:: {
11
- DocumentDeltasArgs ,
12
- ListSnapshotArgs ,
13
- } ;
14
10
use derive_more:: {
15
11
Display ,
16
12
From ,
@@ -20,6 +16,7 @@ use headers::{
20
16
HeaderName ,
21
17
HeaderValue ,
22
18
} ;
19
+ use maplit:: btreemap;
23
20
use serde:: {
24
21
de:: DeserializeOwned ,
25
22
Deserialize ,
@@ -69,70 +66,35 @@ pub struct ConvexApi {
69
66
}
70
67
71
68
impl ConvexApi {
72
- /// Performs a GET HTTP request to a given endpoint of the Convex API.
73
- async fn get < T : DeserializeOwned > ( & self , endpoint : & str ) -> anyhow:: Result < T > {
74
- let url = self
75
- . config
76
- . deploy_url
77
- . join ( "api/" )
78
- . unwrap ( )
79
- . join ( endpoint)
80
- . unwrap ( ) ;
81
-
82
- match reqwest:: Client :: new ( )
83
- . get ( url)
84
- . header ( CONVEX_CLIENT_HEADER , & * CONVEX_CLIENT_HEADER_VALUE )
85
- . header (
86
- reqwest:: header:: AUTHORIZATION ,
87
- format ! ( "Convex {}" , self . config. deploy_key) ,
88
- )
89
- . send ( )
90
- . await
91
- {
92
- Ok ( resp) if resp. status ( ) . is_success ( ) => Ok ( resp
93
- . json :: < T > ( )
94
- . await
95
- . context ( "Failed to deserialize query result" ) ?) ,
96
- Ok ( resp) => {
97
- if let Ok ( text) = resp. text ( ) . await {
98
- anyhow:: bail!(
99
- "Call to {endpoint} on {} returned an unsuccessful response: {text}" ,
100
- self . config. deploy_url
101
- )
102
- } else {
103
- anyhow:: bail!(
104
- "Call to {endpoint} on {} returned no response" ,
105
- self . config. deploy_url
106
- )
107
- }
108
- } ,
109
- Err ( e) => anyhow:: bail!( e. to_string( ) ) ,
110
- }
111
- }
112
-
113
- /// Performs a POST HTTP request to a given endpoint of the Convex API using
114
- /// the given parameters as a JSON body.
115
- async fn post < P : Serialize , T : DeserializeOwned > (
69
+ /// Performs a GET HTTP request to a given endpoint of the Convex API using
70
+ /// the given query parameters.
71
+ async fn get < T : DeserializeOwned > (
116
72
& self ,
117
73
endpoint : & str ,
118
- parameters : P ,
74
+ parameters : BTreeMap < & str , Option < String > > ,
119
75
) -> anyhow:: Result < T > {
120
- let url = self
76
+ let non_null_parameters: BTreeMap < & str , String > = parameters
77
+ . into_iter ( )
78
+ . filter_map ( |( key, value) | value. map ( |value| ( key, value) ) )
79
+ . collect ( ) ;
80
+
81
+ let mut url = self
121
82
. config
122
83
. deploy_url
123
84
. join ( "api/" )
124
85
. unwrap ( )
125
86
. join ( endpoint)
126
87
. unwrap ( ) ;
127
88
89
+ url. query_pairs_mut ( ) . extend_pairs ( non_null_parameters) ;
90
+
128
91
match reqwest:: Client :: new ( )
129
- . post ( url)
92
+ . get ( url)
130
93
. header ( CONVEX_CLIENT_HEADER , & * CONVEX_CLIENT_HEADER_VALUE )
131
94
. header (
132
95
reqwest:: header:: AUTHORIZATION ,
133
96
format ! ( "Convex {}" , self . config. deploy_key) ,
134
97
)
135
- . json ( & parameters)
136
98
. send ( )
137
99
. await
138
100
{
@@ -161,22 +123,21 @@ impl ConvexApi {
161
123
#[ async_trait]
162
124
impl Source for ConvexApi {
163
125
async fn test_streaming_export_connection ( & self ) -> anyhow:: Result < ( ) > {
164
- self . get ( "test_streaming_export_connection" ) . await
126
+ self . get ( "test_streaming_export_connection" , btreemap ! { } )
127
+ . await
165
128
}
166
129
167
130
async fn list_snapshot (
168
131
& self ,
169
132
snapshot : Option < i64 > ,
170
133
cursor : Option < ListSnapshotCursor > ,
171
134
) -> anyhow:: Result < ListSnapshotResponse > {
172
- self . post (
135
+ self . get (
173
136
"list_snapshot" ,
174
- ListSnapshotArgs {
175
- snapshot,
176
- cursor : cursor. map ( |c| c. into ( ) ) ,
177
- table_name : None ,
178
- component : None ,
179
- format : Some ( "convex_encoded_json" . to_string ( ) ) ,
137
+ btreemap ! {
138
+ "snapshot" => snapshot. map( |n| n. to_string( ) ) ,
139
+ "cursor" => cursor. map( |n| n. to_string( ) ) ,
140
+ "format" => Some ( "convex_encoded_json" . to_string( ) ) ,
180
141
} ,
181
142
)
182
143
. await
@@ -186,21 +147,19 @@ impl Source for ConvexApi {
186
147
& self ,
187
148
cursor : DocumentDeltasCursor ,
188
149
) -> anyhow:: Result < DocumentDeltasResponse > {
189
- self . post (
150
+ self . get (
190
151
"document_deltas" ,
191
- DocumentDeltasArgs {
192
- cursor : Some ( cursor. into ( ) ) ,
193
- table_name : None ,
194
- component : None ,
195
- format : Some ( "convex_encoded_json" . to_string ( ) ) ,
152
+ btreemap ! {
153
+ "cursor" => Some ( cursor. to_string( ) ) ,
154
+ "format" => Some ( "convex_encoded_json" . to_string( ) ) ,
196
155
} ,
197
156
)
198
157
. await
199
158
}
200
159
201
160
async fn get_tables_and_columns ( & self ) -> anyhow:: Result < BTreeMap < TableName , Vec < FieldName > > > {
202
161
let tables_to_columns: BTreeMap < TableName , Vec < String > > =
203
- self . get ( "get_tables_and_columns" ) . await ?;
162
+ self . get ( "get_tables_and_columns" , btreemap ! { } ) . await ?;
204
163
205
164
tables_to_columns
206
165
. into_iter ( )
0 commit comments