Skip to content

Commit 4b3d7fb

Browse files
committed
HTTP-179 Allow HTTP version to be specified
Signed-off-by: davidradl <[email protected]>
1 parent e832eb1 commit 4b3d7fb

File tree

12 files changed

+81
-34
lines changed

12 files changed

+81
-34
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
## [Unreleased]
44

5-
- Added http 2 support.
5+
- Ability to specify http versions for http lookups.
6+
- Amend to not log HTTP request response and header values by default.
7+
68

79
## [0.22.0] - 2025-10-03
810

README.md

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,8 @@ The HTTP TableLookup connector that allows for pulling data from external system
99
Please use [releases](https://github.com/getindata/flink-http-connector/releases) instead of the `main` branch in order to get a stable set of binaries.
1010

1111
The goal for HTTP TableLookup connector was to use it in Flink SQL statement as a standard table that can be later joined with other stream using pure SQL Flink.
12-
13-
Currently, HTTP source connector supports only Lookup Joins (TableLookup) [1] in Table/SQL API.
14-
`HttpSink` supports both Streaming API (when using [HttpSink](src/main/java/com/getindata/connectors/http/internal/sink/HttpSink.java) built using [HttpSinkBuilder](src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkBuilder.java)) and the Table API (using connector created in [HttpDynamicTableSinkFactory](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicTableSinkFactory.java)).
15-
Note that the connector will work with both http 1.1 and http 2 endpoints.
12+
13+
`HttpSink` supports both Streaming API (when using [HttpSink](src/main/java/com/getindata/connectors/http/internal/sink/HttpSink.java) built using [HttpSinkBuilder](src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkBuilder.java)) and the Table API (using connector created in [HttpDynamicTableSinkFactory](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicTableSinkFactory.java)).
1614

1715
## Updating the connector
1816
In case of updating http-connector please see [Breaking changes](#breaking-changes) section.
@@ -428,10 +426,9 @@ that implements interface `HttpPostRequestCallbackFactory<HttpRequest>` to creat
428426
of class `CustomHttpSinkPostRequestCallbackFactory` in `resources/META-INF/services/org.apache.flink.table.factories.Factory` file
429427
and then reference identifier `rest-sink-logger` in the HttpSink DDL property field `gid.connector.http.sink.request-callback`.
430428

431-
A default implementation that logs those pairs as *INFO* level logs using Slf4j
432-
([Slf4jHttpPostRequestCallback](src/main/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java))
433-
is provided.
434-
429+
A default implementation that logs those pairs as *INFO* level logs using Slf4j ([Slf4jHttpPostRequestCallback](src/main/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java)) is provided.
430+
If you would like to log more http content (that maybe contain sensitive information), then you can provide a customized version
431+
of this callback; for inspiration on how to customize in this way, look back in the git history of this file.
435432

436433
- Http Lookup Source processes responses that it gets from the HTTP endpoint along their respective requests. One can customize the
437434
behaviour of the additional stage of processing done by Table Function API by implementing
@@ -582,6 +579,7 @@ be requested if the current time is later than the cached token expiry time minu
582579
| gid.connector.http.security.oidc.token.endpoint.url | optional | OIDC `Token Endpoint` url, to which the token request will be issued |
583580
| gid.connector.http.security.oidc.token.expiry.reduction | optional | OIDC tokens will be requested if the current time is later than the cached token expiry time minus this value. |
584581
| gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. |
582+
| gid.connector.http.source.lookup.http-version | optional | Version of HTTP to use for lookup http requests. The valid values are HTTP_1_1 and HTTP_2, which specify HTTP 1.1 or 2 respectively. This option may be required as HTTP_1_1, if the endpoint is HTTP 1.1, because some http 1.1 endpoints reject HTTP Version 2 calls, with 'Invalid HTTP request received' and 'HTTP/2 upgrade not supported'. |
585583
| gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. |
586584
| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. |
587585
| gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. |

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ public final class HttpConnectorConfigConstants {
5959
public static final String SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER =
6060
SOURCE_LOOKUP_PREFIX + "query-creator";
6161

62+
public static final String SOURCE_LOOKUP_QUERY_HTTP_VERSION =
63+
SOURCE_LOOKUP_PREFIX + "http-version";
64+
6265
// -------------- HTTPS security settings --------------
6366
public static final String ALLOW_SELF_SIGNED =
6467
GID_CONNECTOR_HTTP + "security.cert.server.allowSelfSigned";

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ private HttpRequest buildHttpRequest(List<HttpSinkRequestEntry> reqeustBatch, UR
124124
Builder requestBuilder = java.net.http.HttpRequest
125125
.newBuilder()
126126
.uri(endpointUri)
127-
.version(Version.HTTP_2)
127+
.version(Version.HTTP_1_1) // consider allowing users to specify http 2 to override
128128
.timeout(Duration.ofSeconds(httpRequestTimeOutSeconds))
129129
.method(method, publisher);
130130

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/PerRequestSubmitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private HttpRequest buildHttpRequest(HttpSinkRequestEntry requestEntry, URI endp
6262
Builder requestBuilder = java.net.http.HttpRequest
6363
.newBuilder()
6464
.uri(endpointUri)
65-
.version(Version.HTTP_2)
65+
.version(Version.HTTP_1_1) // consider allowing users to specify http 2 to override
6666
.timeout(Duration.ofSeconds(httpRequestTimeOutSeconds))
6767
.method(requestEntry.method,
6868
BodyPublishers.ofByteArray(requestEntry.element));

src/main/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import java.net.http.HttpRequest;
66
import java.net.http.HttpRequest.BodyPublishers;
77
import java.net.http.HttpRequest.Builder;
8-
import java.time.Duration;
98

109
import lombok.extern.slf4j.Slf4j;
1110
import org.slf4j.Logger;
@@ -42,10 +41,11 @@ public BodyBasedRequestFactory(
4241
*/
4342
@Override
4443
protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) {
45-
return HttpRequest.newBuilder()
44+
HttpRequest.Builder builder = super.setUpRequestMethod(lookupQueryInfo);
45+
builder
4646
.uri(constructUri(lookupQueryInfo))
47-
.method(methodName, BodyPublishers.ofString(lookupQueryInfo.getLookupQuery()))
48-
.timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds));
47+
.method(methodName, BodyPublishers.ofString(lookupQueryInfo.getLookupQuery()));
48+
return builder;
4949
}
5050

5151
@Override

src/main/java/com/getindata/connectors/http/internal/table/lookup/GetRequestFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import java.net.URISyntaxException;
55
import java.net.http.HttpRequest;
66
import java.net.http.HttpRequest.Builder;
7-
import java.time.Duration;
87

98
import lombok.extern.slf4j.Slf4j;
109
import org.slf4j.Logger;
@@ -48,10 +47,11 @@ protected Logger getLogger() {
4847
*/
4948
@Override
5049
protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) {
51-
return HttpRequest.newBuilder()
50+
HttpRequest.Builder builder = super.setUpRequestMethod(lookupQueryInfo);
51+
builder
5252
.uri(constructGetUri(lookupQueryInfo))
53-
.GET()
54-
.timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds));
53+
.GET();
54+
return builder;
5555
}
5656

5757
URI constructGetUri(LookupQueryInfo lookupQueryInfo) {

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,17 @@ public class HttpLookupConnectorOptions {
3939
.stringType()
4040
.noDefaultValue();
4141

42+
public static final ConfigOption<String> LOOKUP_HTTP_VERSION =
43+
ConfigOptions.key(SOURCE_LOOKUP_QUERY_HTTP_VERSION)
44+
.stringType()
45+
.noDefaultValue()
46+
.withDescription("Version of HTTP to use for lookup HTTP requests. " +
47+
"The valid values are HTTP_1_1 and HTTP_2, which specify HTTP 1.1 or 2" +
48+
" respectively. This option may be required as HTTP_1_1, if the" +
49+
" endpoint is http 1.1, because some http 1.1 endpoints reject HTTP" +
50+
" Version 2 calls, with 'Invalid HTTP request received' and " +
51+
" 'HTTP/2 upgrade not supported'.");
52+
4253
public static final ConfigOption<String> LOOKUP_REQUEST_FORMAT =
4354
ConfigOptions.key("lookup-request.format")
4455
.stringType()

src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,11 +210,9 @@ private HttpRowDataWrapper processHttpResponse(
210210
boolean isError) throws IOException {
211211

212212
this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap());
213-
214213
var responseBody = response.body();
215214

216-
log.debug("Received status code [{}] for RestTableSource request with Server response body [{}] ",
217-
response.statusCode(), responseBody);
215+
log.debug("Received status code [{}] for RestTableSource request", response.statusCode());
218216
if (!isError && (StringUtils.isNullOrWhitespaceOnly(responseBody) || ignoreResponse(response))) {
219217
return HttpRowDataWrapper.builder()
220218
.data(Collections.emptyList())

src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package com.getindata.connectors.http.internal.table.lookup;
22

3+
import java.net.http.HttpClient;
34
import java.net.http.HttpRequest;
45
import java.net.http.HttpRequest.Builder;
6+
import java.time.Duration;
57
import java.util.Arrays;
68
import java.util.Map;
7-
import java.util.stream.Collectors;
89

910
import lombok.extern.slf4j.Slf4j;
1011
import org.apache.flink.annotation.VisibleForTesting;
@@ -39,6 +40,7 @@ public abstract class RequestFactoryBase implements HttpRequestFactory {
3940
*/
4041
private final String[] headersAndValues;
4142
private final HttpLookupConfig options;
43+
final HttpClient.Version httpVersion;
4244

4345
public RequestFactoryBase(
4446
LookupQueryCreator lookupQueryCreator,
@@ -60,17 +62,18 @@ public RequestFactoryBase(
6062
);
6163

6264
this.headersAndValues = HttpHeaderUtils.toHeaderAndValueArray(headerMap);
63-
64-
log.debug("RequestFactoryBase headersAndValues: " +
65-
Arrays.stream(headersAndValues)
66-
.map(Object::toString)
67-
.collect(Collectors.joining(",")));
6865
this.httpRequestTimeOutSeconds = Integer.parseInt(
6966
options.getProperties().getProperty(
7067
HttpConnectorConfigConstants.LOOKUP_HTTP_TIMEOUT_SECONDS,
7168
DEFAULT_REQUEST_TIMEOUT_SECONDS
7269
)
7370
);
71+
String httpVersionFromConfig = options.getReadableConfig().get(HttpLookupConnectorOptions.LOOKUP_HTTP_VERSION);
72+
if (httpVersionFromConfig == null) {
73+
httpVersion = null;
74+
} else {
75+
httpVersion = HttpClient.Version.valueOf(httpVersionFromConfig);
76+
}
7477
}
7578

7679
@Override
@@ -94,7 +97,14 @@ public HttpLookupSourceRequestEntry buildLookupRequest(RowData lookupRow) {
9497
* @param lookupQuery lookup query used for request query parameters or body.
9598
* @return {@link HttpRequest.Builder} for given lookupQuery.
9699
*/
97-
protected abstract Builder setUpRequestMethod(LookupQueryInfo lookupQuery);
100+
protected Builder setUpRequestMethod(LookupQueryInfo lookupQuery) {
101+
HttpRequest.Builder builder = HttpRequest.newBuilder()
102+
.timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds));
103+
if (httpVersion !=null) {
104+
builder.version(httpVersion);
105+
}
106+
return builder;
107+
}
98108

99109
protected static StringBuilder resolvePathParameters(LookupQueryInfo lookupQueryInfo,
100110
StringBuilder resolvedUrl) {

0 commit comments

Comments
 (0)