Skip to content

Commit 7c81ffe

Browse files
committed
HTTP-99 Generic Json url query creator
Signed-off-by: davidradl <[email protected]>
1 parent 0ee7808 commit 7c81ffe

21 files changed

+1083
-42
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,16 @@
22

33
## [Unreleased]
44

5+
- Added support for generic json and URL query creator
6+
57
## [0.19.0] - 2025-03-20
68

79
- OIDC token request to not flow during explain
810

911
## [0.18.0] - 2025-01-15
1012

13+
### Fixed
14+
1115
- Ignore Eclipse files in .gitignore
1216
- Support Flink 1.20
1317

README.md

+36-27
Large diffs are not rendered by default.

pom.xml

+20-9
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,26 @@ under the License.
154154
<scope>provided</scope>
155155
</dependency>
156156

157+
<dependency>
158+
<groupId>org.apache.flink</groupId>
159+
<artifactId>flink-format-common</artifactId>
160+
<version>${flink.version}</version>
161+
<scope>provided</scope>
162+
</dependency>
163+
164+
<dependency>
165+
<groupId>org.apache.flink</groupId>
166+
<artifactId>flink-json</artifactId>
167+
<version>${flink.version}</version>
168+
<scope>provided</scope>
169+
</dependency>
170+
171+
<dependency>
172+
<groupId>com.fasterxml.jackson.core</groupId>
173+
<artifactId>jackson-core</artifactId>
174+
<version>${jackson.version}</version>
175+
</dependency>
176+
157177
<dependency>
158178
<groupId>com.fasterxml.jackson.core</groupId>
159179
<artifactId>jackson-databind</artifactId>
@@ -226,15 +246,6 @@ under the License.
226246
<scope>test</scope>
227247
</dependency>
228248

229-
<!-- Serializers for tests -->
230-
<dependency>
231-
<groupId>org.apache.flink</groupId>
232-
<artifactId>flink-json</artifactId>
233-
<version>${flink.version}</version>
234-
<scope>test</scope>
235-
</dependency>
236-
<!-- ***** -->
237-
238249
<dependency>
239250
<groupId>org.apache.flink</groupId>
240251
<artifactId>flink-runtime-web</artifactId>

src/main/java/com/getindata/connectors/http/LookupQueryCreatorFactory.java

+3
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
public interface LookupQueryCreatorFactory extends Factory, Serializable {
3838

3939
/**
40+
* @param readableConfig readable config
41+
* @param lookupRow lookup row
42+
* @param dynamicTableFactoryContext context
4043
* @return {@link LookupQueryCreator} custom lookup query creator instance
4144
*/
4245
LookupQueryCreator createLookupQueryCreator(

src/main/java/com/getindata/connectors/http/internal/table/lookup/RowDataSingleValueLookupSchemaEntry.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public List<LookupArg> convertToLookupArg(RowData lookupKeyRow) {
4747
}
4848

4949
if (!(value instanceof BinaryStringData)) {
50-
log.debug("Unsupported Key Type {}. Trying simple toString(), wish me luck...",
50+
log.debug("Unsupported Key Type {}. Trying simple toString().",
5151
value.getClass());
5252
}
5353

src/main/java/com/getindata/connectors/http/internal/table/lookup/TableSourceHelper.java

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public final class TableSourceHelper {
2222
* <p>Note: This method returns an empty list for every {@link DataType} that is not a
2323
* composite
2424
* type.
25+
* @param type logical type
26+
* @return List of field names
2527
*/
2628
public static List<String> getFieldNames(LogicalType type) {
2729

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
/*
2+
* © Copyright IBM Corp. 2025
3+
*/
4+
5+
package com.getindata.connectors.http.internal.table.lookup.querycreators;
6+
7+
import java.io.IOException;
8+
import java.io.UnsupportedEncodingException;
9+
import java.net.URLEncoder;
10+
import java.nio.charset.StandardCharsets;
11+
import java.util.*;
12+
13+
import org.apache.flink.annotation.VisibleForTesting;
14+
import org.apache.flink.api.common.serialization.SerializationSchema;
15+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
16+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
17+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
18+
import org.apache.flink.table.api.DataTypes.Field;
19+
import org.apache.flink.table.data.GenericRowData;
20+
import org.apache.flink.table.data.RowData;
21+
import org.apache.flink.table.types.DataType;
22+
import org.apache.flink.table.types.FieldsDataType;
23+
import org.apache.flink.types.Row;
24+
import org.apache.flink.util.FlinkRuntimeException;
25+
import org.apache.flink.util.Preconditions;
26+
import org.apache.logging.log4j.LogManager;
27+
import org.apache.logging.log4j.Logger;
28+
29+
import com.getindata.connectors.http.LookupArg;
30+
import com.getindata.connectors.http.LookupQueryCreator;
31+
import com.getindata.connectors.http.internal.table.lookup.LookupQueryInfo;
32+
import com.getindata.connectors.http.internal.table.lookup.LookupRow;
33+
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;
34+
35+
/**
36+
* Generic JSON and URL query creator; in addition to be able to map columns to json requests,
37+
* it allows url inserts to be mapped to column names using templating.
38+
* <br>
39+
* For GETs, column names are mapped to query parameters. e.g. for
40+
* <code>GenericJsonAndUrlQueryCreator.REQUEST_PARAM_FIELDS</code> = "id1;id2"
41+
* and url of http://base. At lookup time with values of id1=1 and id2=2 a call of
42+
* http/base?id1=1&amp;id2=2 will be issued.
43+
* <br>
44+
* For PUT and POST, parameters are mapped to the json body e.g. for
45+
* REQUEST_PARAM_FIELDS = "id1;id2" and url of http://base. At lookup time with values of id1=1 and
46+
* id2=2 as call of http/base will be issued with a json payload of {"id1":1,"id2":2}
47+
* <br>
48+
* For all http methods, url segments can be used to include lookup up values. Using the map from
49+
* <code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code> which has a key of the insert and the
50+
* value of the associated column.
51+
* e.g. for <code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code> = "key1":"col1"
52+
* and url of http://base/{key1}. At lookup time with values of col1="aaaa" a call of
53+
* http/base/aaaa will be issued.
54+
*
55+
*/
56+
public class GenericJsonAndUrlQueryCreator implements LookupQueryCreator {
57+
private static final long serialVersionUID = 1L;
58+
private static final Logger log = LogManager.getLogger(GenericJsonAndUrlQueryCreator.class);
59+
60+
// not final so we can mutate for unit test
61+
private SerializationSchema<RowData> serializationSchema;
62+
private boolean schemaOpened = false;
63+
private LookupRow lookupRow;
64+
private final String httpMethod;
65+
private final List<String> requestQueryParamsFields;
66+
private final List<String> requestBodyFields;
67+
private final Map<String, String> requestUrlMap;
68+
69+
/**
70+
* Construct a Generic JSON and URL query creator.
71+
*
72+
* @param httpMethod the requested http method
73+
* @param serializationSchema serialization schema for RowData
74+
* @param requestQueryParamsFields query param fields
75+
* @param requestBodyFields body fields used for PUT and POSTs
76+
* @param requestUrlMap url map
77+
* @param lookupRow lookup row itself.
78+
*/
79+
public GenericJsonAndUrlQueryCreator(final String httpMethod,
80+
final SerializationSchema<RowData>
81+
serializationSchema,
82+
final List<String> requestQueryParamsFields,
83+
final List<String> requestBodyFields,
84+
final Map<String, String> requestUrlMap,
85+
final LookupRow lookupRow) {
86+
this.httpMethod = httpMethod;
87+
this.serializationSchema = serializationSchema;
88+
this.lookupRow = lookupRow;
89+
this.requestQueryParamsFields = requestQueryParamsFields;
90+
this.requestBodyFields = requestBodyFields;
91+
this.requestUrlMap = requestUrlMap;
92+
}
93+
@VisibleForTesting
94+
void setSerializationSchema(SerializationSchema<RowData>
95+
serializationSchema) {
96+
this.serializationSchema = serializationSchema;
97+
}
98+
99+
@Override
100+
public LookupQueryInfo createLookupQuery(final RowData lookupDataRow) {
101+
this.checkOpened();
102+
103+
final String lookupQuery;
104+
Map<String, String> bodyBasedUrlQueryParams = new HashMap<>();
105+
final Collection<LookupArg> lookupArgs =
106+
lookupRow.convertToLookupArgs(lookupDataRow);
107+
ObjectNode jsonObject;
108+
try {
109+
jsonObject = (ObjectNode) ObjectMapperAdapter.instance().readTree(
110+
serializationSchema.serialize(lookupDataRow));
111+
} catch (IOException e) {
112+
String message = "Unable to parse the lookup arguments to json.";
113+
log.error(message, e);
114+
throw new RuntimeException(message, e);
115+
}
116+
// Parameters are encoded as query params for GET and none GET.
117+
// Later code will turn these query params into the body for PUTs and POSTs
118+
ObjectNode jsonObjectForQueryParams = ObjectMapperAdapter.instance().createObjectNode();
119+
for (String requestColumnName : this.requestQueryParamsFields) {
120+
jsonObjectForQueryParams.set(requestColumnName, jsonObject.get(requestColumnName));
121+
}
122+
// TODO can we convertToQueryParameters for all ops
123+
// and not use/deprecate bodyBasedUrlQueryParams
124+
if (httpMethod.equalsIgnoreCase("GET")) {
125+
// add the query parameters
126+
lookupQuery = convertToQueryParameters(jsonObjectForQueryParams,
127+
StandardCharsets.UTF_8.toString());
128+
} else {
129+
// Body-based queries
130+
// serialize to a string for the body.
131+
try {
132+
lookupQuery = ObjectMapperAdapter.instance()
133+
.writeValueAsString(jsonObject.retain(requestBodyFields));
134+
} catch (JsonProcessingException e) {
135+
final String message = "Unable to convert Json Object to a string";
136+
log.error(message, e);
137+
throw new RuntimeException(message,e);
138+
}
139+
// body parameters
140+
// use the request json object to scope the required fields and the lookupArgs as values
141+
bodyBasedUrlQueryParams = createBodyBasedParams(lookupArgs,
142+
jsonObjectForQueryParams);
143+
}
144+
// add the path map
145+
final Map<String, String> pathBasedUrlParams = createPathBasedParams(lookupArgs,
146+
requestUrlMap);
147+
148+
return new LookupQueryInfo(lookupQuery, bodyBasedUrlQueryParams, pathBasedUrlParams);
149+
150+
}
151+
152+
/**
153+
* Create a Row from a RowData and DataType
154+
* @param lookupRowData the lookup RowData
155+
* @param rowType the datatype
156+
* @return row return row
157+
*/
158+
@VisibleForTesting
159+
static Row rowDataToRow(final RowData lookupRowData, final DataType rowType) {
160+
Preconditions.checkNotNull(lookupRowData);
161+
Preconditions.checkNotNull(rowType);
162+
163+
final Row row = Row.withNames();
164+
final List<Field> rowFields = FieldsDataType.getFields(rowType);
165+
166+
for (int idx = 0; idx < rowFields.size(); idx++) {
167+
final String fieldName = rowFields.get(idx).getName();
168+
final Object fieldValue = ((GenericRowData) lookupRowData).getField(idx);
169+
row.setField(fieldName, fieldValue);
170+
}
171+
return row;
172+
}
173+
174+
/**
175+
* Create map of the json key to the lookup argument
176+
* value. This is used for body based content.
177+
* @param args lookup arguments
178+
* @param objectNode object node
179+
* @return map of field content to the lookup argument value.
180+
*/
181+
private Map<String, String> createBodyBasedParams(final Collection<LookupArg> args,
182+
ObjectNode objectNode ) {
183+
Map<String, String> mapOfJsonKeyToLookupArg = new LinkedHashMap<>();
184+
Iterator<Map.Entry<String, JsonNode>> iterator = objectNode.fields();
185+
iterator.forEachRemaining(field -> {
186+
for (final LookupArg arg : args) {
187+
if (arg.getArgName().equals(field.getKey())) {
188+
String keyForMap = field.getKey();
189+
mapOfJsonKeyToLookupArg.put(
190+
keyForMap, arg.getArgValue());
191+
}
192+
}
193+
});
194+
195+
return mapOfJsonKeyToLookupArg;
196+
}
197+
/**
198+
* Create map of the json key to the lookup argument
199+
* value. This is used for body based content.
200+
* @param args lookup arguments
201+
* @param urlMap map of insert name to column name
202+
* @return map of field content to the lookup argument value.
203+
*/
204+
private Map<String, String> createPathBasedParams(final Collection<LookupArg> args,
205+
Map<String, String> urlMap ) {
206+
Map<String, String> mapOfJsonKeyToLookupArg = new LinkedHashMap<>();
207+
if (urlMap != null) {
208+
for (String key: urlMap.keySet()) {
209+
for (final LookupArg arg : args) {
210+
if (arg.getArgName().equals(key)) {
211+
mapOfJsonKeyToLookupArg.put(
212+
urlMap.get(key), arg.getArgValue());
213+
}
214+
}
215+
}
216+
}
217+
return mapOfJsonKeyToLookupArg;
218+
}
219+
/**
220+
* Convert json object to query params string
221+
* @param jsonObject supplies json object
222+
* @param enc encoding string - used in unit test to drive unsupported encoding
223+
* @return query params string
224+
*/
225+
@VisibleForTesting
226+
static String convertToQueryParameters(final ObjectNode jsonObject, String enc) {
227+
Preconditions.checkNotNull(jsonObject);
228+
229+
final StringJoiner result = new StringJoiner("&");
230+
jsonObject.fields().forEachRemaining(field -> {
231+
final String fieldName = field.getKey();
232+
final String fieldValue = field.getValue().asText();
233+
234+
try {
235+
result.add(fieldName + "="
236+
+ URLEncoder.encode(fieldValue, enc));
237+
} catch (UnsupportedEncodingException e) {
238+
final String message =
239+
"Failed to encode the value of the query parameter name "
240+
+ fieldName
241+
+ ": "
242+
+ fieldValue;
243+
log.error(message, e);
244+
throw new RuntimeException(message, e);
245+
}
246+
});
247+
248+
return result.toString();
249+
}
250+
251+
private void checkOpened() {
252+
if (!this.schemaOpened) {
253+
try {
254+
this.serializationSchema.open(
255+
SerializationSchemaUtils
256+
.createSerializationInitContext(
257+
GenericJsonAndUrlQueryCreator.class));
258+
this.schemaOpened = true;
259+
} catch (final Exception e) {
260+
final String message =
261+
"Failed to initialize serialization schema for "
262+
+ GenericJsonAndUrlQueryCreator.class;
263+
log.error(message, e);
264+
throw new FlinkRuntimeException(message, e);
265+
}
266+
}
267+
}
268+
}

0 commit comments

Comments
 (0)