From 8275b59d138df089cc3b066282a1bb54c9516af1 Mon Sep 17 00:00:00 2001
From: zaki
Date: Sat, 13 Apr 2024 21:20:12 +0800
Subject: [PATCH 01/26] feat: Add HTTP Sink Connector
---
.../eventmesh-connector-http/build.gradle | 1 +
.../http/server/HttpConnectServer.java | 4 +-
.../http/sink/config/HttpSinkConfig.java | 31 ++++
.../http/sink/config/SinkConnectorConfig.java | 30 ++++
.../sink/connector/HttpSinkConnector.java | 152 ++++++++++++++++++
.../src/main/resources/sink-config.yml | 30 ++++
6 files changed, 247 insertions(+), 1 deletion(-)
create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpSinkConfig.java
create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java
create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java
create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml
diff --git a/eventmesh-connectors/eventmesh-connector-http/build.gradle b/eventmesh-connectors/eventmesh-connector-http/build.gradle
index 734b2fc622..55a59bd51c 100644
--- a/eventmesh-connectors/eventmesh-connector-http/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-http/build.gradle
@@ -20,6 +20,7 @@ dependencies {
implementation project(":eventmesh-common")
implementation 'io.cloudevents:cloudevents-http-vertx:2.3.0'
implementation 'io.vertx:vertx-web:4.4.6'
+ implementation 'com.squareup.okhttp3:okhttp'
testImplementation "org.apache.httpcomponents:httpclient"
compileOnly 'org.projectlombok:lombok'
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java
index bd94fed126..d7e4ce3a78 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java
@@ -18,6 +18,7 @@
package org.apache.eventmesh.connector.http.server;
import org.apache.eventmesh.connector.http.config.HttpServerConfig;
+import org.apache.eventmesh.connector.http.sink.connector.HttpSinkConnector;
import org.apache.eventmesh.connector.http.source.connector.HttpSourceConnector;
import org.apache.eventmesh.openconnect.Application;
import org.apache.eventmesh.openconnect.util.ConfigUtil;
@@ -33,7 +34,8 @@ public static void main(String[] args) throws Exception {
}
if (serverConfig.isSinkEnable()) {
- // TODO support sink connector
+ Application httpSinkApp = new Application();
+ httpSinkApp.run(HttpSinkConnector.class);
}
}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpSinkConfig.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpSinkConfig.java
new file mode 100644
index 0000000000..38cf529e59
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpSinkConfig.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.config;
+
+
+import org.apache.eventmesh.openconnect.api.config.SinkConfig;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class HttpSinkConfig extends SinkConfig {
+
+ public SinkConnectorConfig connectorConfig;
+}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java
new file mode 100644
index 0000000000..59c55ddec6
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.config;
+
+import lombok.Data;
+
+@Data
+public class SinkConnectorConfig {
+
+ private String connectorName;
+
+ private String address;
+
+ private String path;
+}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java
new file mode 100644
index 0000000000..931fcf9e90
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.connector;
+
+
+import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig;
+import org.apache.eventmesh.openconnect.api.config.Config;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.api.sink.Sink;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+
+@Slf4j
+public class HttpSinkConnector implements Sink {
+
+ private OkHttpClient okHttpClient;
+
+ private HttpSinkConfig httpSinkConfig;
+
+ private String messageSendUrl;
+
+ private volatile boolean isRunning = false;
+
+
+ @Override
+ public Class extends Config> configClass() {
+ return HttpSinkConfig.class;
+ }
+
+ @Override
+ public void init(Config config) throws Exception {
+ httpSinkConfig = (HttpSinkConfig) config;
+ doInit();
+ }
+
+ @Override
+ public void init(ConnectorContext connectorContext) throws Exception {
+ SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) connectorContext;
+ this.httpSinkConfig = (HttpSinkConfig) sinkConnectorContext.getSinkConfig();
+ doInit();
+ }
+
+ @SneakyThrows
+ private void doInit() {
+ this.messageSendUrl = this.httpSinkConfig.getConnectorConfig().getAddress() + this.httpSinkConfig.getConnectorConfig().getPath();
+ this.okHttpClient = new OkHttpClient.Builder()
+ .connectTimeout(60, TimeUnit.SECONDS)
+ .readTimeout(60, TimeUnit.SECONDS)
+ .writeTimeout(60, TimeUnit.SECONDS)
+ .retryOnConnectionFailure(true)
+ .build();
+ }
+
+ @Override
+ public void start() throws Exception {
+ this.isRunning = true;
+ }
+
+ @Override
+ public void commit(ConnectRecord record) {
+
+ }
+
+ @Override
+ public String name() {
+ return this.httpSinkConfig.getConnectorConfig().getConnectorName();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ this.isRunning = false;
+ }
+
+ public boolean isRunning() {
+ return isRunning;
+ }
+
+ @Override
+ public void put(List sinkRecords) {
+ for (ConnectRecord sinkRecord : sinkRecords) {
+ try {
+ if (Objects.isNull(sinkRecord)) {
+ log.warn("ConnectRecord data is null, ignore.");
+ continue;
+ }
+ } catch (Exception e) {
+ log.error("Failed to sink message via HTTP.", e);
+ }
+ sendMessage(sinkRecord);
+ }
+ }
+
+ @SneakyThrows
+ private void sendMessage(ConnectRecord record) {
+ // Construct HTTP request
+ MediaType mediaType = MediaType.parse("application/json; charset=utf-8");
+ String data = new String((byte[]) record.getData(), StandardCharsets.UTF_8);
+ RequestBody body = RequestBody.create(mediaType, data);
+ Request request = new Request.Builder()
+ .url(this.messageSendUrl)
+ .post(body)
+ .build();
+
+ // Send HTTP request
+ Response response = okHttpClient.newCall(request).execute();
+
+ // Verify HTTP response
+ if (!response.isSuccessful()) {
+ log.error("server response: {}", ToStringBuilder.reflectionToString(response));
+ throw new IOException("Unexpected code " + response.code());
+ }
+ ResponseBody responseBody = response.body();
+ if (responseBody == null) {
+ throw new IOException("Response body is null.");
+ }
+ // TODO Define the response result template
+ }
+
+}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml
new file mode 100644
index 0000000000..3ba9ec900a
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+ meshAddress: 127.0.0.1:10000
+ subject: TopicTest
+ idc: FT
+ env: PRD
+ group: httpSource
+ appId: 5032
+ userName: httpSourceUser
+ passWord: httpPassWord
+connectorConfig:
+ connectorName: httpSink
+ address: "http://127.0.0.1:8080"
+ path: "/test"
From 67764902b374243a660101a824dfd5ed7de6d410 Mon Sep 17 00:00:00 2001
From: zaki
Date: Sat, 13 Apr 2024 23:47:30 +0800
Subject: [PATCH 02/26] refactor: Replace okHttpClient with vertx.WebClient
---
.../eventmesh-connector-http/build.gradle | 1 -
.../http/sink/config/SinkConnectorConfig.java | 6 +-
.../sink/connector/HttpSinkConnector.java | 74 +++++++------------
.../src/main/resources/sink-config.yml | 10 ++-
4 files changed, 39 insertions(+), 52 deletions(-)
diff --git a/eventmesh-connectors/eventmesh-connector-http/build.gradle b/eventmesh-connectors/eventmesh-connector-http/build.gradle
index 55a59bd51c..734b2fc622 100644
--- a/eventmesh-connectors/eventmesh-connector-http/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-http/build.gradle
@@ -20,7 +20,6 @@ dependencies {
implementation project(":eventmesh-common")
implementation 'io.cloudevents:cloudevents-http-vertx:2.3.0'
implementation 'io.vertx:vertx-web:4.4.6'
- implementation 'com.squareup.okhttp3:okhttp'
testImplementation "org.apache.httpcomponents:httpclient"
compileOnly 'org.projectlombok:lombok'
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java
index 59c55ddec6..ae80bfd9e5 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java
@@ -24,7 +24,11 @@ public class SinkConnectorConfig {
private String connectorName;
- private String address;
+ private String host;
+
+ private int port;
private String path;
+
+ private int idleTimeout;
}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java
index 931fcf9e90..8d74cfea83 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java
@@ -25,32 +25,25 @@
import org.apache.eventmesh.openconnect.api.sink.Sink;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.TimeUnit;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
+
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import okhttp3.Response;
-import okhttp3.ResponseBody;
@Slf4j
public class HttpSinkConnector implements Sink {
- private OkHttpClient okHttpClient;
-
private HttpSinkConfig httpSinkConfig;
- private String messageSendUrl;
+ private WebClient webClient;
private volatile boolean isRunning = false;
@@ -75,13 +68,13 @@ public void init(ConnectorContext connectorContext) throws Exception {
@SneakyThrows
private void doInit() {
- this.messageSendUrl = this.httpSinkConfig.getConnectorConfig().getAddress() + this.httpSinkConfig.getConnectorConfig().getPath();
- this.okHttpClient = new OkHttpClient.Builder()
- .connectTimeout(60, TimeUnit.SECONDS)
- .readTimeout(60, TimeUnit.SECONDS)
- .writeTimeout(60, TimeUnit.SECONDS)
- .retryOnConnectionFailure(true)
- .build();
+ final Vertx vertx = Vertx.vertx();
+ // TODO Add more configurations
+ WebClientOptions options = new WebClientOptions()
+ .setDefaultHost(this.httpSinkConfig.connectorConfig.getHost())
+ .setDefaultPort(this.httpSinkConfig.connectorConfig.getPort())
+ .setIdleTimeout(this.httpSinkConfig.connectorConfig.getIdleTimeout());
+ this.webClient = WebClient.create(vertx, options);
}
@Override
@@ -96,7 +89,7 @@ public void commit(ConnectRecord record) {
@Override
public String name() {
- return this.httpSinkConfig.getConnectorConfig().getConnectorName();
+ return this.httpSinkConfig.connectorConfig.getConnectorName();
}
@Override
@@ -117,36 +110,25 @@ public void put(List sinkRecords) {
continue;
}
} catch (Exception e) {
- log.error("Failed to sink message via HTTP.", e);
+ log.error("Failed to sink message via HTTP. ", e);
}
sendMessage(sinkRecord);
}
}
- @SneakyThrows
private void sendMessage(ConnectRecord record) {
- // Construct HTTP request
- MediaType mediaType = MediaType.parse("application/json; charset=utf-8");
- String data = new String((byte[]) record.getData(), StandardCharsets.UTF_8);
- RequestBody body = RequestBody.create(mediaType, data);
- Request request = new Request.Builder()
- .url(this.messageSendUrl)
- .post(body)
- .build();
-
- // Send HTTP request
- Response response = okHttpClient.newCall(request).execute();
-
- // Verify HTTP response
- if (!response.isSuccessful()) {
- log.error("server response: {}", ToStringBuilder.reflectionToString(response));
- throw new IOException("Unexpected code " + response.code());
- }
- ResponseBody responseBody = response.body();
- if (responseBody == null) {
- throw new IOException("Response body is null.");
- }
- // TODO Define the response result template
+ this.webClient.post(this.httpSinkConfig.connectorConfig.getPath())
+ .putHeader("Content-Type", "application/json ; charset=utf-8")
+ .sendBuffer(Buffer.buffer((byte[]) record.getData()))
+ .onSuccess(res -> {
+ if (res.statusCode() != 200) {
+ log.error("[HttpSinkConnector] Failed to send message via HTTP. Response: {}", res);
+ }
+ })
+ .onFailure(event -> {
+ // This function is accessed only when an error occurs at the network level
+ log.error("[HttpSinkConnector] Failed to send message via HTTP. Exception: {}", event.getMessage());
+ });
}
}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml
index 3ba9ec900a..bd2bbd3ffc 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml
@@ -20,11 +20,13 @@ pubSubConfig:
subject: TopicTest
idc: FT
env: PRD
- group: httpSource
+ group: httpSink
appId: 5032
- userName: httpSourceUser
+ userName: httpSinkUser
passWord: httpPassWord
connectorConfig:
connectorName: httpSink
- address: "http://127.0.0.1:8080"
- path: "/test"
+ host: 127.0.0.1
+ port: 8987
+ path: /test
+ idleTimeout: 5
From 003f0785ca77133e1f9d242438cd8d38c4279cad Mon Sep 17 00:00:00 2001
From: zaki
Date: Sun, 14 Apr 2024 15:32:47 +0800
Subject: [PATCH 03/26] fix: Resolving dependency conflicts
---
.../eventmesh-connector-http/build.gradle | 7 +++--
.../sink/connector/HttpSinkConnector.java | 26 ++++++++++---------
2 files changed, 19 insertions(+), 14 deletions(-)
diff --git a/eventmesh-connectors/eventmesh-connector-http/build.gradle b/eventmesh-connectors/eventmesh-connector-http/build.gradle
index 734b2fc622..313b9dd5b5 100644
--- a/eventmesh-connectors/eventmesh-connector-http/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-http/build.gradle
@@ -18,10 +18,13 @@
dependencies {
api project(":eventmesh-openconnect:eventmesh-openconnect-java")
implementation project(":eventmesh-common")
- implementation 'io.cloudevents:cloudevents-http-vertx:2.3.0'
- implementation 'io.vertx:vertx-web:4.4.6'
+ // implementation 'io.cloudevents:cloudevents-http-vertx:2.3.0'
+ implementation 'io.cloudevents:cloudevents-http-vertx:3.0.0'
+ // implementation 'io.vertx:vertx-web:4.4.6'
+ implementation 'io.vertx:vertx-web:4.3.7'
testImplementation "org.apache.httpcomponents:httpclient"
+ testImplementation 'org.mock-server:mockserver-netty:5.15.0'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java
index 8d74cfea83..ef512469d6 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java
@@ -28,8 +28,8 @@
import java.util.List;
import java.util.Objects;
+import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Vertx;
-import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
@@ -95,6 +95,7 @@ public String name() {
@Override
public void stop() throws Exception {
this.isRunning = false;
+ this.webClient.close();
}
public boolean isRunning() {
@@ -109,26 +110,27 @@ public void put(List sinkRecords) {
log.warn("ConnectRecord data is null, ignore.");
continue;
}
+ sendMessage(sinkRecord);
} catch (Exception e) {
log.error("Failed to sink message via HTTP. ", e);
}
- sendMessage(sinkRecord);
}
}
private void sendMessage(ConnectRecord record) {
this.webClient.post(this.httpSinkConfig.connectorConfig.getPath())
- .putHeader("Content-Type", "application/json ; charset=utf-8")
- .sendBuffer(Buffer.buffer((byte[]) record.getData()))
- .onSuccess(res -> {
- if (res.statusCode() != 200) {
- log.error("[HttpSinkConnector] Failed to send message via HTTP. Response: {}", res);
+ .putHeader("Content-Type", "application/json; charset=utf-8")
+ .sendJson(record, ar -> {
+ if (ar.succeeded()) {
+ if (ar.result().statusCode() != HttpResponseStatus.OK.code()) {
+ log.error("[HttpSinkConnector] Failed to send message via HTTP. Response: {}", ar.result());
+ } else {
+ log.info("[HttpSinkConnector] Successfully send message via HTTP. ");
+ }
+ } else {
+ // This function is accessed only when an error occurs at the network level
+ log.error("[HttpSinkConnector] Failed to send message via HTTP. Exception: {}", ar.cause().getMessage());
}
- })
- .onFailure(event -> {
- // This function is accessed only when an error occurs at the network level
- log.error("[HttpSinkConnector] Failed to send message via HTTP. Exception: {}", event.getMessage());
});
}
-
}
From cd21ea3b88db55c052374cfb55c9a8d78e8e84cc Mon Sep 17 00:00:00 2001
From: zaki
Date: Sun, 14 Apr 2024 15:34:03 +0800
Subject: [PATCH 04/26] test: Add HttpSinkConnectorTest
---
.../connector/HttpSinkConnectorTest.java | 90 +++++++++++++++++++
.../src/test/resources/sink-config.yml | 32 +++++++
2 files changed, 122 insertions(+)
create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
new file mode 100644
index 0000000000..1547aec2a3
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
@@ -0,0 +1,90 @@
+package org.apache.eventmesh.connector.http.source.connector;
+
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockserver.model.HttpRequest.request;
+
+import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig;
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.connector.HttpSinkConnector;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockserver.client.MockServerClient;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+
+import io.vertx.core.http.HttpMethod;
+
+public class HttpSinkConnectorTest {
+
+ private HttpSinkConnector sinkConnector;
+ private SinkConnectorConfig sinkConnectorConfig;
+
+ private ClientAndServer mockServer;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ sinkConnector = new HttpSinkConnector();
+ HttpSinkConfig sinkConfig = (HttpSinkConfig) ConfigUtil.parse(sinkConnector.configClass());
+ sinkConnectorConfig = sinkConfig.connectorConfig;
+ sinkConnector.init(sinkConfig);
+ sinkConnector.start();
+ mockServer = ClientAndServer.startClientAndServer(sinkConnectorConfig.getPort());
+ }
+
+ @AfterEach
+ public void stopMockServer() throws Exception {
+ sinkConnector.stop();
+ mockServer.close();
+ }
+
+ @Test
+ void testPut() throws InterruptedException {
+ new MockServerClient(sinkConnectorConfig.getHost(), sinkConnectorConfig.getPort())
+ .when(
+ request()
+ .withMethod("POST")
+ .withPath(sinkConnectorConfig.getPath())
+ )
+ .respond(
+ HttpResponse.response()
+ .withStatusCode(200)
+ );
+
+ final int times = 10;
+ List connectRecords = new ArrayList<>();
+ for (int i = 0; i < times; i++) {
+ RecordPartition partition = new RecordPartition();
+ RecordOffset offset = new RecordOffset();
+ long timestamp = System.currentTimeMillis();
+ ConnectRecord connectRecord = new ConnectRecord(partition, offset,
+ timestamp, "test-http " + i);
+ connectRecords.add(connectRecord);
+ }
+
+ sinkConnector.put(connectRecords);
+ // Sleeps for 3 seconds, waiting for the webClient to finish sending all requests
+ Thread.sleep(3000);
+
+ HttpRequest[] allRequests = mockServer.retrieveRecordedRequests(null);
+ // Determine the total number of requests
+ assertEquals(times, allRequests.length);
+
+ for (int i = 0; i < times; i++) {
+ HttpRequest actualRequest = allRequests[0];
+ // Determine the request method
+ assertEquals(HttpMethod.POST.name(), actualRequest.getMethod().getValue());
+ }
+ }
+
+}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml
new file mode 100644
index 0000000000..bd2bbd3ffc
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+ meshAddress: 127.0.0.1:10000
+ subject: TopicTest
+ idc: FT
+ env: PRD
+ group: httpSink
+ appId: 5032
+ userName: httpSinkUser
+ passWord: httpPassWord
+connectorConfig:
+ connectorName: httpSink
+ host: 127.0.0.1
+ port: 8987
+ path: /test
+ idleTimeout: 5
From 8486080cc088cd632e03230d2886bc38930706a5 Mon Sep 17 00:00:00 2001
From: zaki
Date: Sun, 14 Apr 2024 15:56:12 +0800
Subject: [PATCH 05/26] fix: Add License
---
.../source/connector/HttpSinkConnectorTest.java | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
index 1547aec2a3..0f1af6752f 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.eventmesh.connector.http.source.connector;
From f2e3835ffa8b2d39c5b9802070ba3f905fcf0093 Mon Sep 17 00:00:00 2001
From: zaki
Date: Sun, 14 Apr 2024 17:22:24 +0800
Subject: [PATCH 06/26] fix: Solving dependency issues
---
eventmesh-connectors/eventmesh-connector-http/build.gradle | 6 +++---
tools/dependency-check/known-dependencies.txt | 1 +
2 files changed, 4 insertions(+), 3 deletions(-)
diff --git a/eventmesh-connectors/eventmesh-connector-http/build.gradle b/eventmesh-connectors/eventmesh-connector-http/build.gradle
index 313b9dd5b5..f860ed236a 100644
--- a/eventmesh-connectors/eventmesh-connector-http/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-http/build.gradle
@@ -18,10 +18,10 @@
dependencies {
api project(":eventmesh-openconnect:eventmesh-openconnect-java")
implementation project(":eventmesh-common")
- // implementation 'io.cloudevents:cloudevents-http-vertx:2.3.0'
+
implementation 'io.cloudevents:cloudevents-http-vertx:3.0.0'
- // implementation 'io.vertx:vertx-web:4.4.6'
- implementation 'io.vertx:vertx-web:4.3.7'
+ implementation 'io.vertx:vertx-web:4.4.6'
+ implementation 'io.vertx:vertx-web-client:4.4.6'
testImplementation "org.apache.httpcomponents:httpclient"
testImplementation 'org.mock-server:mockserver-netty:5.15.0'
diff --git a/tools/dependency-check/known-dependencies.txt b/tools/dependency-check/known-dependencies.txt
index b30ca5d5ed..bd1ce9fc01 100644
--- a/tools/dependency-check/known-dependencies.txt
+++ b/tools/dependency-check/known-dependencies.txt
@@ -347,6 +347,7 @@ vertx-auth-common-4.4.6.jar
vertx-bridge-common-4.4.6.jar
vertx-core-4.4.6.jar
vertx-web-4.4.6.jar
+vertx-web-client-4.4.6.jar
vertx-web-client-4.0.0.jar
vertx-web-common-4.4.6.jar
xpp3-1.1.4c.jar
From eda1d140941a76d13ed2bb7506e204dc8dc2830f Mon Sep 17 00:00:00 2001
From: zaki
Date: Sun, 14 Apr 2024 18:01:24 +0800
Subject: [PATCH 07/26] fix: License Check
---
tools/dependency-check/known-dependencies.txt | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/tools/dependency-check/known-dependencies.txt b/tools/dependency-check/known-dependencies.txt
index bd1ce9fc01..bc16692bd1 100644
--- a/tools/dependency-check/known-dependencies.txt
+++ b/tools/dependency-check/known-dependencies.txt
@@ -46,7 +46,7 @@ checker-qual-3.12.0.jar
classmate-1.5.1.jar
cloudevents-api-2.4.2.jar
cloudevents-core-2.4.2.jar
-cloudevents-http-vertx-2.3.0.jar
+cloudevents-http-vertx-3.0.0.jar
cloudevents-json-jackson-2.4.2.jar
cloudevents-kafka-2.4.2.jar
cloudevents-protobuf-2.4.2.jar
@@ -348,8 +348,8 @@ vertx-bridge-common-4.4.6.jar
vertx-core-4.4.6.jar
vertx-web-4.4.6.jar
vertx-web-client-4.4.6.jar
-vertx-web-client-4.0.0.jar
vertx-web-common-4.4.6.jar
+vertx-uri-template-4.4.6.jar
xpp3-1.1.4c.jar
xsdlib-2013.6.1.jar
zipkin-2.23.2.jar
From 611c8d681c0342311fd04a95ff29c0888bad908d Mon Sep 17 00:00:00 2001
From: zaki
Date: Sun, 14 Apr 2024 20:17:46 +0800
Subject: [PATCH 08/26] feat: Add HTTPS/SSL support
---
.../connector/http/sink/config/SinkConnectorConfig.java | 2 ++
.../connector/http/sink/connector/HttpSinkConnector.java | 4 +---
.../src/main/resources/sink-config.yml | 1 +
3 files changed, 4 insertions(+), 3 deletions(-)
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java
index ae80bfd9e5..4f836c449f 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java
@@ -30,5 +30,7 @@ public class SinkConnectorConfig {
private String path;
+ private boolean ssl;
+
private int idleTimeout;
}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java
index ef512469d6..ad55fc8bd2 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java
@@ -17,7 +17,6 @@
package org.apache.eventmesh.connector.http.sink.connector;
-
import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
@@ -33,7 +32,6 @@
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
-
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -47,7 +45,6 @@ public class HttpSinkConnector implements Sink {
private volatile boolean isRunning = false;
-
@Override
public Class extends Config> configClass() {
return HttpSinkConfig.class;
@@ -73,6 +70,7 @@ private void doInit() {
WebClientOptions options = new WebClientOptions()
.setDefaultHost(this.httpSinkConfig.connectorConfig.getHost())
.setDefaultPort(this.httpSinkConfig.connectorConfig.getPort())
+ .setSsl(this.httpSinkConfig.connectorConfig.isSsl())
.setIdleTimeout(this.httpSinkConfig.connectorConfig.getIdleTimeout());
this.webClient = WebClient.create(vertx, options);
}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml
index bd2bbd3ffc..260e9079d4 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml
@@ -29,4 +29,5 @@ connectorConfig:
host: 127.0.0.1
port: 8987
path: /test
+ ssl: false
idleTimeout: 5
From 64f28222049e773c7d22ff80602ef67b936e73c2 Mon Sep 17 00:00:00 2001
From: zaki
Date: Mon, 15 Apr 2024 19:18:13 +0800
Subject: [PATCH 09/26] fix: Optimize logging
---
.../http/sink/connector/HttpSinkConnector.java | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java
index ad55fc8bd2..2968b0dd6f 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java
@@ -110,7 +110,7 @@ public void put(List sinkRecords) {
}
sendMessage(sinkRecord);
} catch (Exception e) {
- log.error("Failed to sink message via HTTP. ", e);
+ log.error("Failed to sink message via HTTP. Exception: ", e);
}
}
}
@@ -120,14 +120,14 @@ private void sendMessage(ConnectRecord record) {
.putHeader("Content-Type", "application/json; charset=utf-8")
.sendJson(record, ar -> {
if (ar.succeeded()) {
+ log.info("[HttpSinkConnector] Successfully send message via HTTP. Record: timestamp={}, offset={}", record.getTimestamp(),
+ record.getPosition().getOffset());
if (ar.result().statusCode() != HttpResponseStatus.OK.code()) {
- log.error("[HttpSinkConnector] Failed to send message via HTTP. Response: {}", ar.result());
- } else {
- log.info("[HttpSinkConnector] Successfully send message via HTTP. ");
+ log.error("[HttpSinkConnector] Unexpected response received. StatusCode: {}", ar.result().statusCode());
}
} else {
- // This function is accessed only when an error occurs at the network level
- log.error("[HttpSinkConnector] Failed to send message via HTTP. Exception: {}", ar.cause().getMessage());
+ // This branch is only entered if an error occurs at the network layer
+ log.error("[HttpSinkConnector] Failed to send message via HTTP. Exception: ", ar.cause());
}
});
}
From 77ad32f8ed0f08999fa1788f6f7a958a7c2c9010 Mon Sep 17 00:00:00 2001
From: zaki
Date: Tue, 16 Apr 2024 22:23:43 +0800
Subject: [PATCH 10/26] feat: Add webhook functionality
---
.../http/server/HttpConnectServer.java | 2 +-
.../{connector => }/HttpSinkConnector.java | 61 ++-----
.../http/sink/config/HttpWebhookConfig.java | 39 +++++
.../http/sink/config/SinkConnectorConfig.java | 7 +-
.../sink/handle/CommonHttpSinkHandler.java | 100 ++++++++++++
.../http/sink/handle/HttpSinkHandler.java | 43 +++++
.../sink/handle/WebhookHttpSinkHandler.java | 149 ++++++++++++++++++
.../src/main/resources/sink-config.yml | 2 +
.../connector/HttpSinkConnectorTest.java | 5 +-
.../src/test/resources/sink-config.yml | 3 +
10 files changed, 361 insertions(+), 50 deletions(-)
rename eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/{connector => }/HttpSinkConnector.java (55%)
create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpWebhookConfig.java
create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java
create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java
index d7e4ce3a78..8d753d2815 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java
@@ -18,7 +18,7 @@
package org.apache.eventmesh.connector.http.server;
import org.apache.eventmesh.connector.http.config.HttpServerConfig;
-import org.apache.eventmesh.connector.http.sink.connector.HttpSinkConnector;
+import org.apache.eventmesh.connector.http.sink.HttpSinkConnector;
import org.apache.eventmesh.connector.http.source.connector.HttpSourceConnector;
import org.apache.eventmesh.openconnect.Application;
import org.apache.eventmesh.openconnect.util.ConfigUtil;
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
similarity index 55%
rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java
rename to eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
index 2968b0dd6f..23b694e891 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
@@ -15,9 +15,12 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.http.sink.connector;
+package org.apache.eventmesh.connector.http.sink;
import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig;
+import org.apache.eventmesh.connector.http.sink.handle.CommonHttpSinkHandler;
+import org.apache.eventmesh.connector.http.sink.handle.HttpSinkHandler;
+import org.apache.eventmesh.connector.http.sink.handle.WebhookHttpSinkHandler;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
@@ -27,11 +30,6 @@
import java.util.List;
import java.util.Objects;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.vertx.core.Vertx;
-import io.vertx.ext.web.client.WebClient;
-import io.vertx.ext.web.client.WebClientOptions;
-
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -41,9 +39,7 @@ public class HttpSinkConnector implements Sink {
private HttpSinkConfig httpSinkConfig;
- private WebClient webClient;
-
- private volatile boolean isRunning = false;
+ private HttpSinkHandler sinkHandler;
@Override
public Class extends Config> configClass() {
@@ -52,7 +48,7 @@ public Class extends Config> configClass() {
@Override
public void init(Config config) throws Exception {
- httpSinkConfig = (HttpSinkConfig) config;
+ this.httpSinkConfig = (HttpSinkConfig) config;
doInit();
}
@@ -65,19 +61,17 @@ public void init(ConnectorContext connectorContext) throws Exception {
@SneakyThrows
private void doInit() {
- final Vertx vertx = Vertx.vertx();
- // TODO Add more configurations
- WebClientOptions options = new WebClientOptions()
- .setDefaultHost(this.httpSinkConfig.connectorConfig.getHost())
- .setDefaultPort(this.httpSinkConfig.connectorConfig.getPort())
- .setSsl(this.httpSinkConfig.connectorConfig.isSsl())
- .setIdleTimeout(this.httpSinkConfig.connectorConfig.getIdleTimeout());
- this.webClient = WebClient.create(vertx, options);
+ // Create different handlers for different configurations
+ if (this.httpSinkConfig.connectorConfig.getWebhookConfig().isActivate()) {
+ this.sinkHandler = new WebhookHttpSinkHandler(this.httpSinkConfig.connectorConfig);
+ } else {
+ this.sinkHandler = new CommonHttpSinkHandler(this.httpSinkConfig.connectorConfig);
+ }
}
@Override
public void start() throws Exception {
- this.isRunning = true;
+ this.sinkHandler.start();
}
@Override
@@ -92,12 +86,7 @@ public String name() {
@Override
public void stop() throws Exception {
- this.isRunning = false;
- this.webClient.close();
- }
-
- public boolean isRunning() {
- return isRunning;
+ this.sinkHandler.stop();
}
@Override
@@ -108,27 +97,11 @@ public void put(List sinkRecords) {
log.warn("ConnectRecord data is null, ignore.");
continue;
}
- sendMessage(sinkRecord);
+ // Handle the ConnectRecord
+ this.sinkHandler.handle(sinkRecord);
} catch (Exception e) {
- log.error("Failed to sink message via HTTP. Exception: ", e);
+ log.error("Failed to sink message via HTTP. ", e);
}
}
}
-
- private void sendMessage(ConnectRecord record) {
- this.webClient.post(this.httpSinkConfig.connectorConfig.getPath())
- .putHeader("Content-Type", "application/json; charset=utf-8")
- .sendJson(record, ar -> {
- if (ar.succeeded()) {
- log.info("[HttpSinkConnector] Successfully send message via HTTP. Record: timestamp={}, offset={}", record.getTimestamp(),
- record.getPosition().getOffset());
- if (ar.result().statusCode() != HttpResponseStatus.OK.code()) {
- log.error("[HttpSinkConnector] Unexpected response received. StatusCode: {}", ar.result().statusCode());
- }
- } else {
- // This branch is only entered if an error occurs at the network layer
- log.error("[HttpSinkConnector] Failed to send message via HTTP. Exception: ", ar.cause());
- }
- });
- }
}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpWebhookConfig.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpWebhookConfig.java
new file mode 100644
index 0000000000..0aca6ce0bf
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpWebhookConfig.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.config;
+
+import lombok.Data;
+
+
+@Data
+public class HttpWebhookConfig {
+
+ private boolean activate = false;
+
+ // Path to receive callback data
+ private String callbackPath = "/callback";
+
+ // Path to display/export callback data
+ private String exportPath = "/export";
+
+ private int port;
+
+ // timeunit: ms
+ private int idleTimeout = 5000;
+
+}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java
index 4f836c449f..7428cadc0e 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java
@@ -30,7 +30,10 @@ public class SinkConnectorConfig {
private String path;
- private boolean ssl;
+ private boolean ssl = false;
- private int idleTimeout;
+ // timeunit: ms
+ private int idleTimeout = 5000;
+
+ private HttpWebhookConfig webhookConfig = new HttpWebhookConfig();
}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
new file mode 100644
index 0000000000..8b68097d7d
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handle;
+
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Common HttpSinkHandler to handle ConnectRecord over HTTP/HTTPS
+ */
+@Slf4j
+public class CommonHttpSinkHandler implements HttpSinkHandler {
+
+ private final SinkConnectorConfig connectorConfig;
+
+ private WebClient webClient;
+
+ public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
+ this.connectorConfig = sinkConnectorConfig;
+ }
+
+ @Override
+ public void start() {
+ // Create WebClient
+ doInitWebClient();
+ }
+
+ private void doInitWebClient() {
+ final Vertx vertx = Vertx.vertx();
+ // TODO add more configurations
+ WebClientOptions options = new WebClientOptions()
+ .setDefaultHost(this.connectorConfig.getHost())
+ .setDefaultPort(this.connectorConfig.getPort())
+ .setSsl(this.connectorConfig.isSsl())
+ .setIdleTimeout(this.connectorConfig.getIdleTimeout())
+ .setIdleTimeoutUnit(TimeUnit.MILLISECONDS);
+
+ this.webClient = WebClient.create(vertx, options);
+ }
+
+
+ @Override
+ public void handle(ConnectRecord record) {
+ this.webClient.post(this.connectorConfig.getPath())
+ .putHeader("Content-Type", "application/json; charset=utf-8")
+ .sendJson(record)
+ .onComplete(ar -> {
+ Long timestamp = record.getTimestamp();
+ Map offset = record.getPosition().getOffset().getOffset();
+ if (ar.succeeded()) {
+ log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, offset);
+ // Determine whether the status code is 200
+ if (ar.result().statusCode() != HttpResponseStatus.OK.code()) {
+ log.error("Unexpected response received. Record: timestamp={}, offset={}. Response: statusCode={}",
+ timestamp,
+ offset,
+ ar.result().statusCode()
+ );
+ }
+ } else {
+ // This branch is only entered if an error occurs at the network layer
+ log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, offset, ar.cause());
+ }
+ });
+ }
+
+ @Override
+ public void stop() {
+ if (this.webClient != null) {
+ this.webClient.close();
+ } else {
+ log.warn("WebClient is null, ignore.");
+ }
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java
new file mode 100644
index 0000000000..89bf4963e4
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handle;
+
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+/**
+ * Any class that needs to process ConnectRecord via HTTP needs to implement this interface.
+ */
+public interface HttpSinkHandler {
+
+ /**
+ * start the handler
+ */
+ void start();
+
+ /**
+ * Handle the ConnectRecord.
+ *
+ * @param record the ConnectRecord to handle
+ */
+ void handle(ConnectRecord record);
+
+ /**
+ * stop the handler
+ */
+ void stop();
+}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
new file mode 100644
index 0000000000..19583576ed
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handle;
+
+import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.handler.LoggerHandler;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * HttpSinkHandler with webhook functionality
+ */
+@Slf4j
+public class WebhookHttpSinkHandler extends CommonHttpSinkHandler {
+
+ private final HttpWebhookConfig webhookConfig;
+
+ // store the callback data
+ private final BlockingQueue
*
*
Implementing classes should ensure thread safety and handle HTTP/HTTPS communication efficiently.
- * The {@link #start()} method initializes any necessary resources for HTTP/HTTPS communication.
- * The {@link #multiHandle(ConnectRecord)} method processes a ConnectRecord multiple times by sending it over HTTP or HTTPS.
- * The {@link #handle(URI, HttpConnectRecord)} method processes a single ConnectRecord by sending it over HTTP or HTTPS to the specified URL.
- * The {@link #stop()} method releases any resources used for HTTP/HTTPS communication.
+ * The {@link #start()} method initializes any necessary resources for HTTP/HTTPS communication. The {@link #handle(ConnectRecord)} method processes a
+ * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, HttpConnectRecord)} method processes HttpConnectRecord on specified URL
+ * while returning its own processing logic {@link #stop()} method releases any resources used for HTTP/HTTPS communication.
*
- *
It's recommended to handle exceptions gracefully within the {@link #handle(URI, HttpConnectRecord)} method
+ *
It's recommended to handle exceptions gracefully within the {@link #deliver(URI, HttpConnectRecord)} method
* to prevent message loss or processing interruptions.
*/
public interface HttpSinkHandler {
@@ -51,19 +50,21 @@ public interface HttpSinkHandler {
void start();
/**
- * Processes the ConnectRecord multiple times.
+ * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
*
- * @param record the ConnectRecord to handle
+ * @param record the ConnectRecord to process
*/
- void multiHandle(ConnectRecord record);
+ void handle(ConnectRecord record);
+
/**
- * Processes the ConnectRecord once.
+ * Processes HttpConnectRecord on specified URL while returning its own processing logic
*
- * @param url the URL to send the ConnectRecord to
- * @param httpConnectRecord the ConnectRecord to handle
+ * @param url URI to which the HttpConnectRecord should be sent
+ * @param httpConnectRecord HttpConnectRecord to process
+ * @return processing chain
*/
- Future> handle(URI url, HttpConnectRecord httpConnectRecord);
+ Future> deliver(URI url, HttpConnectRecord httpConnectRecord);
/**
* Cleans up and releases resources used by the HTTP/HTTPS handler. This method should be called when the handler is no longer needed.
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
index 22a19d05df..5f66f6430f 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
@@ -20,35 +20,40 @@
import org.apache.eventmesh.connector.http.sink.config.HttpRetryConfig;
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata;
+import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata.HttpExportMetadataBuilder;
+import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord;
import org.apache.eventmesh.connector.http.util.HttpUtils;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import java.net.ConnectException;
import java.net.URI;
import java.time.Duration;
+import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.UUID;
import java.util.stream.Collectors;
-import io.github.resilience4j.retry.Retry;
-import io.github.resilience4j.retry.RetryConfig;
-import io.github.resilience4j.retry.RetryRegistry;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpResponse;
import lombok.extern.slf4j.Slf4j;
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
+import dev.failsafe.RetryPolicyBuilder;
+import dev.failsafe.event.ExecutionEvent;
+
+
@Slf4j
public class RetryHttpSinkHandler implements HttpSinkHandler {
private final SinkConnectorConfig connectorConfig;
- private Retry retry;
-
- private ScheduledExecutorService scheduler;
+ // Retry policy builder
+ private RetryPolicyBuilder> retryPolicyBuilder;
private final List urls;
@@ -71,28 +76,12 @@ public RetryHttpSinkHandler(SinkConnectorConfig connectorConfig, HttpSinkHandler
private void initRetry() {
HttpRetryConfig httpRetryConfig = this.connectorConfig.getRetryConfig();
- // Create a custom RetryConfig
- RetryConfig retryConfig = RetryConfig.>custom()
- .maxAttempts(httpRetryConfig.getMaxAttempts())
- .waitDuration(Duration.ofMillis(httpRetryConfig.getInterval()))
- .retryOnException(throwable -> throwable instanceof ConnectException)
- .retryOnResult(response -> httpRetryConfig.isRetryAll() && !HttpUtils.is2xxSuccessful(response.statusCode()))
- .failAfterMaxAttempts(true)
- .build();
-
- // Create a RetryRegistry with a custom global configuration
- RetryRegistry retryRegistry = RetryRegistry.of(retryConfig);
-
- // Get or create a Retry from the registry
- this.retry = retryRegistry.retry("retryHttpSinkHandler");
-
- // Create a ScheduledExecutorService with the number of threads equal to the maximum connection pool size
- this.scheduler = Executors.newScheduledThreadPool(this.connectorConfig.getMaxConnectionPoolSize());
-
- // register event listeners
- retry.getEventPublisher()
- .onSuccess(event -> log.info(event.toString()))
- .onError(event -> log.error(event.toString()));
+
+ this.retryPolicyBuilder = RetryPolicy.>builder()
+ .handleIf(e -> e instanceof ConnectException)
+ .handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode()))
+ .withMaxRetries(httpRetryConfig.getMaxRetries())
+ .withDelay(Duration.ofMillis(httpRetryConfig.getInterval()));
}
@@ -104,13 +93,14 @@ public void start() {
sinkHandler.start();
}
+
/**
- * Handles the ConnectRecord by sending it to all configured URLs using the WebClient.
+ * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
*
- * @param record the ConnectRecord to handle
+ * @param record the ConnectRecord to process
*/
@Override
- public void multiHandle(ConnectRecord record) {
+ public void handle(ConnectRecord record) {
for (URI url : this.urls) {
// convert ConnectRecord to HttpConnectRecord
String type = String.format("%s.%s.%s",
@@ -118,32 +108,107 @@ public void multiHandle(ConnectRecord record) {
this.connectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common");
HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);
// handle the HttpConnectRecord
- handle(url, httpConnectRecord);
+ deliver(url, httpConnectRecord);
}
}
+
/**
- * Handles the HttpConnectRecord by sending it to the specified URL using the WebClient. If the request fails, it will be retried according to the
- * RetryConfig.
+ * Processes HttpConnectRecord on specified URL while returning its own processing logic This method provides the retry power to process the
+ * HttpConnectRecord
*
- * @param url the URL to send the HttpConnectRecord to
- * @param httpConnectRecord the HttpConnectRecord to send
- * @return a Future representing the result of the HTTP request
+ * @param url URI to which the HttpConnectRecord should be sent
+ * @param httpConnectRecord HttpConnectRecord to process
+ * @return processing chain
*/
@Override
- public Future> handle(URI url, HttpConnectRecord httpConnectRecord) {
- this.retry.executeCompletionStage(scheduler, () ->
- this.sinkHandler.handle(url, httpConnectRecord).toCompletionStage());
+ public Future> deliver(URI url, HttpConnectRecord httpConnectRecord) {
+ // Only webhook mode needs to use the firstTryId
+ String firstTryId = UUID.randomUUID().toString();
+
+ // Build the retry policy
+ RetryPolicy> retryPolicy = retryPolicyBuilder
+ .onSuccess(e -> {
+ if (connectorConfig.getWebhookConfig().isActivate()) {
+ // convert the result to an HttpExportRecord
+ HttpExportRecord exportRecord = covertToExportRecord(e, e.getResult(), e.getException(), url, firstTryId);
+ // add the data to the queue
+ ((WebhookHttpSinkHandler) sinkHandler).addDataToQueue(exportRecord);
+ }
+ })
+ .onRetry(e -> {
+ if (log.isDebugEnabled()) {
+ log.warn("Retrying the request to {} for the {} time. HttpConnectRecord= {}", url, e.getAttemptCount(), httpConnectRecord);
+ } else {
+ log.warn("Retrying the request to {} for the {} time.", url, e.getAttemptCount());
+ }
+ if (connectorConfig.getWebhookConfig().isActivate()) {
+ HttpExportRecord exportRecord = covertToExportRecord(e, e.getLastResult(), e.getLastException(), url, firstTryId);
+ ((WebhookHttpSinkHandler) sinkHandler).addDataToQueue(exportRecord);
+ }
+ })
+ .onFailure(e -> {
+ if (log.isDebugEnabled()) {
+ log.error("Failed to send the request to {} after {} attempts. HttpConnectRecord= {}", url, e.getAttemptCount(),
+ httpConnectRecord, e.getException());
+ } else {
+ log.error("Failed to send the request to {} after {} attempts.", url, e.getAttemptCount(), e.getException());
+ }
+ if (connectorConfig.getWebhookConfig().isActivate()) {
+ HttpExportRecord exportRecord = covertToExportRecord(e, e.getResult(), e.getException(), url, firstTryId);
+ ((WebhookHttpSinkHandler) sinkHandler).addDataToQueue(exportRecord);
+ }
+ }).build();
+
+ // Handle the HttpConnectRecord with retry
+ Failsafe.with(retryPolicy)
+ .getStageAsync(() -> sinkHandler.deliver(url, httpConnectRecord).toCompletionStage());
+
return null;
}
+ /**
+ * Converts the ExecutionCompletedEvent to an HttpExportRecord.
+ *
+ * @param event the ExecutionCompletedEvent to convert
+ * @param response the response of the request, may be null
+ * @param e the exception thrown during the request, may be null
+ * @param url the URL the request was sent to
+ * @param firstTryId the UUID of the first try
+ * @return the converted HttpExportRecord
+ */
+ private HttpExportRecord covertToExportRecord(ExecutionEvent event, HttpResponse response, Throwable e, URI url, String firstTryId) {
+ HttpExportMetadataBuilder builder = HttpExportMetadata.builder()
+ .url(url.toString())
+ .receivedTime(LocalDateTime.now())
+ .retryNum(event.getAttemptCount() - 1);
+
+ if (event.getAttemptCount() == 1) {
+ builder.retriedBy(null)
+ .uuid(firstTryId);
+ } else {
+ builder.retriedBy(firstTryId)
+ .uuid(UUID.randomUUID().toString());
+ }
+
+ if (response != null) {
+ // record the response
+ builder.code(response.statusCode())
+ .message(response.statusMessage());
+ } else {
+ // record the exception
+ builder.code(-1)
+ .message(e.getMessage());
+ }
+
+ return new HttpExportRecord(builder.build(), response == null ? null : response.bodyAsString());
+ }
+
/**
* Cleans up and releases resources used by the HTTP/HTTPS handler.
*/
@Override
public void stop() {
sinkHandler.stop();
- // Shutdown the scheduler
- scheduler.shutdown();
}
}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
index 8e4b423f6f..ed6be468a2 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
@@ -17,21 +17,32 @@
package org.apache.eventmesh.connector.http.sink.handle;
-
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
-import org.apache.eventmesh.connector.http.util.HttpUtils;
+import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata;
+import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata.HttpExportMetadataBuilder;
+import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord;
+import org.apache.eventmesh.connector.http.sink.data.HttpExportRecordPage;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.commons.lang3.StringUtils;
+
import java.net.URI;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
@@ -42,8 +53,10 @@
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.handler.LoggerHandler;
-import com.alibaba.fastjson2.JSONObject;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONWriter;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
@@ -53,6 +66,8 @@
@Slf4j
public class WebhookHttpSinkHandler extends CommonHttpSinkHandler {
+ private final SinkConnectorConfig sinkConnectorConfig;
+
// the configuration for webhook
private final HttpWebhookConfig webhookConfig;
@@ -60,12 +75,21 @@ public class WebhookHttpSinkHandler extends CommonHttpSinkHandler {
private HttpServer exportServer;
// store the received data, when webhook is enabled
- private final BlockingQueue receivedDataQueue;
+ private final ConcurrentLinkedQueue receivedDataQueue;
+
+ // the maximum queue size
+ private final int maxQueueSize;
+
+ // the current queue size
+ private final AtomicInteger currentQueueSize;
public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
super(sinkConnectorConfig);
+ this.sinkConnectorConfig = sinkConnectorConfig;
this.webhookConfig = sinkConnectorConfig.getWebhookConfig();
- this.receivedDataQueue = new LinkedBlockingQueue<>();
+ this.maxQueueSize = this.webhookConfig.getMaxStorageSize();
+ this.currentQueueSize = new AtomicInteger(0);
+ this.receivedDataQueue = new ConcurrentLinkedQueue<>();
// init the export server
doInitExportServer();
}
@@ -84,33 +108,74 @@ private void doInitExportServer() {
.method(HttpMethod.GET)
.produces("application/json")
.handler(ctx -> {
- // get received data
- Object data = this.receivedDataQueue.poll();
- if (data != null) {
+ // Validate the request parameters
+ MultiMap params = ctx.request().params();
+ String pageNumStr = params.get(ParamEnum.PAGE_NUM.getValue());
+ String pageSizeStr = params.get(ParamEnum.PAGE_SIZE.getValue());
+ String type = params.get(ParamEnum.TYPE.getValue());
+
+ // 1. type must be "poll" or "peek" or null
+ // 2. if type is "peek", pageNum must be greater than 0
+ // 3. pageSize must be greater than 0
+ if ((type != null && !Objects.equals(type, TypeEnum.PEEK.getValue()) && !Objects.equals(type, TypeEnum.POLL.getValue()))
+ || (Objects.equals(type, TypeEnum.PEEK.getValue()) && (StringUtils.isBlank(pageNumStr) || Integer.parseInt(pageNumStr) < 1))
+ || (StringUtils.isBlank(pageSizeStr) || Integer.parseInt(pageSizeStr) < 1)) {
- // export the received data
+ // Return 400 Bad Request if the request parameters are invalid
ctx.response()
.putHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8")
- .setStatusCode(HttpResponseStatus.OK.code())
- .send(JSONObject.of("data", data).toJSONString());
- if (log.isDebugEnabled()) {
- log.debug("Succeed to export callback data. Data: {}", data);
- } else {
- log.info("Succeed to export callback data.");
- }
- } else {
- // no data to export
+ .setStatusCode(HttpResponseStatus.BAD_REQUEST.code())
+ .end();
+ log.info("Invalid request parameters. pageNum: {}, pageSize: {}, type: {}", pageNumStr, pageSizeStr, type);
+ return;
+ }
+
+ // Parse the request parameters
+ if (type == null) {
+ type = TypeEnum.PEEK.getValue();
+ }
+ int pageNum = StringUtils.isBlank(pageNumStr) ? 1 : Integer.parseInt(pageNumStr);
+ int pageSize = Integer.parseInt(pageSizeStr);
+
+ if (currentQueueSize.get() == 0) {
ctx.response()
.putHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8")
.setStatusCode(HttpResponseStatus.NO_CONTENT.code())
.end();
log.info("No callback data to export.");
+ return;
+ }
+
+ // Get the received data
+ List exportRecords;
+ if (Objects.equals(type, TypeEnum.POLL.getValue())) {
+ // If the type is poll, only the first page of data is exported and removed
+ exportRecords = getDataFromQueue(0, pageSize, true);
+ } else {
+ // If the type is peek, the specified page of data is exported without removing
+ int startIndex = (pageNum - 1) * pageSize;
+ int endIndex = startIndex + pageSize;
+ exportRecords = getDataFromQueue(startIndex, endIndex, false);
+ }
+
+ // Create HttpExportRecordPage
+ HttpExportRecordPage page = new HttpExportRecordPage(pageNum, exportRecords.size(), exportRecords);
+
+ // export the received data
+ ctx.response()
+ .putHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8")
+ .setStatusCode(HttpResponseStatus.OK.code())
+ .send(JSON.toJSONString(page, JSONWriter.Feature.WriteMapNullValue));
+ if (log.isDebugEnabled()) {
+ log.debug("Succeed to export callback data. Data: {}", page);
+ } else {
+ log.info("Succeed to export callback data.");
}
});
// create the export server
this.exportServer = vertx.createHttpServer(new HttpServerOptions()
.setPort(this.webhookConfig.getPort())
- .setIdleTimeout(this.webhookConfig.getIdleTimeout())
+ .setIdleTimeout(this.webhookConfig.getServerIdleTimeout())
.setIdleTimeoutUnit(TimeUnit.MILLISECONDS)).requestHandler(router);
}
@@ -129,66 +194,118 @@ public void start() {
}
/**
- * Processes the ConnectRecord multiple times by sending it over HTTP or HTTPS to all configured URLs.
+ * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
*
- * @param record the ConnectRecord to handle
+ * @param record the ConnectRecord to process
*/
@Override
- public void multiHandle(ConnectRecord record) {
+ public void handle(ConnectRecord record) {
for (URI url : super.getUrls()) {
// convert ConnectRecord to HttpConnectRecord
String type = String.format("%s.%s.%s", this.getConnectorConfig().getConnectorName(), url.getScheme(), "webhook");
HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);
// handle the HttpConnectRecord
- handle(url, httpConnectRecord);
+ deliver(url, httpConnectRecord);
}
}
+
/**
- * Processes the ConnectRecord once by sending it over HTTP or HTTPS to the specified URL. If the status code is 2xx, the received data will be
- * stored in the queue.
+ * Processes HttpConnectRecord on specified URL while returning its own processing logic This method sends the HttpConnectRecord to the specified
+ * URL by super class method and stores the received data.
*
- * @param url the URL to send the ConnectRecord to
- * @param httpConnectRecord the ConnectRecord to handle
- * @return the Future of the HTTP request
+ * @param url URI to which the HttpConnectRecord should be sent
+ * @param httpConnectRecord HttpConnectRecord to process
+ * @return processing chain
*/
@Override
- public Future> handle(URI url, HttpConnectRecord httpConnectRecord) {
+ public Future> deliver(URI url, HttpConnectRecord httpConnectRecord) {
// send the request
- Future> responseFuture = super.handle(url, httpConnectRecord);
+ Future> responseFuture = super.deliver(url, httpConnectRecord);
// store the received data
- return responseFuture.onSuccess(res -> {
- // Determine whether the status code is 2xx
- if (!HttpUtils.is2xxSuccessful(res.statusCode())) {
+ return responseFuture.onComplete(arr -> {
+ // If open retry, return directly and handled by RetryHttpSinkHandler
+ if (sinkConnectorConfig.getRetryConfig().getMaxRetries() > 0) {
return;
}
- // Get the received data
- String receivedData = res.bodyAsString();
- if (receivedData.isEmpty()) {
- log.warn("Received data is empty.");
- return;
+ // create ExportMetadataBuilder
+ HttpExportMetadataBuilder builder = HttpExportMetadata.builder()
+ .url(url.toString())
+ .receivedTime(LocalDateTime.now())
+ .retriedBy(null)
+ .uuid(UUID.randomUUID().toString())
+ .retryNum(0);
+
+ if (arr.succeeded()) {
+ HttpResponse response = arr.result();
+ builder.code(response.statusCode())
+ .message(response.statusMessage());
+ } else {
+ builder.code(-1)
+ .message(arr.cause().getMessage());
}
- // If the queue is full, remove the oldest element
- if (receivedDataQueue.size() == Integer.MAX_VALUE) {
- Object removedData = receivedDataQueue.poll();
- if (log.isDebugEnabled()) {
- log.debug("The queue is full, remove the oldest element: {}", removedData);
- } else {
- log.info("The queue is full, remove the oldest element");
- }
+ // create ExportRecord
+ HttpExportRecord exportRecord = new HttpExportRecord(builder.build(), arr.succeeded() ? arr.result().bodyAsString() : null);
+ // add the data to the queue
+ addDataToQueue(exportRecord);
+ });
+ }
+
+
+ /**
+ * Adds the received data to the queue.
+ *
+ * @param exportRecord the received data to add to the queue
+ */
+ public void addDataToQueue(HttpExportRecord exportRecord) {
+ // If the current queue size is greater than or equal to the maximum queue size, remove the oldest element
+ if (currentQueueSize.get() >= maxQueueSize) {
+ Object removedData = receivedDataQueue.poll();
+ if (log.isDebugEnabled()) {
+ log.debug("The queue is full, remove the oldest element: {}", removedData);
+ } else {
+ log.info("The queue is full, remove the oldest element");
}
- // Try to put the received data into the queue
- if (receivedDataQueue.offer(receivedData)) {
- if (log.isDebugEnabled()) {
- log.debug("Successfully put the received data into the queue: {}", receivedData);
- } else {
- log.info("Successfully put the received data into the queue");
- }
+ currentQueueSize.decrementAndGet();
+ }
+ // Try to put the received data into the queue
+ if (receivedDataQueue.offer(exportRecord)) {
+ currentQueueSize.incrementAndGet();
+ if (log.isDebugEnabled()) {
+ log.debug("Successfully put the received data into the queue: {}", exportRecord);
} else {
- log.error("Failed to put the received data into the queue: {}", receivedData);
+ log.info("Successfully put the received data into the queue");
}
+ } else {
+ log.error("Failed to put the received data into the queue: {}", exportRecord);
+ }
+ }
- });
+ /**
+ * Gets the received data from the queue.
+ *
+ * @param startIndex the start index of the data to get
+ * @param endIndex the end index of the data to get
+ * @param removed whether to remove the data from the queue
+ * @return the received data
+ */
+ private List getDataFromQueue(int startIndex, int endIndex, boolean removed) {
+ Iterator iterator = receivedDataQueue.iterator();
+
+ List pageItems = new ArrayList<>(endIndex - startIndex);
+ int count = 0;
+ while (iterator.hasNext() && count < endIndex) {
+ HttpExportRecord item = iterator.next();
+ if (count >= startIndex) {
+ pageItems.add(item);
+ if (removed) {
+ iterator.remove();
+ currentQueueSize.decrementAndGet();
+ }
+ }
+ count++;
+ }
+ return pageItems;
}
/**
@@ -208,4 +325,33 @@ public void stop() {
log.warn("Callback server is null, ignore.");
}
}
-}
+
+
+ @Getter
+ public enum ParamEnum {
+ PAGE_NUM("pageNum"),
+ PAGE_SIZE("pageSize"),
+ TYPE("type");
+
+ private final String value;
+
+ ParamEnum(String value) {
+ this.value = value;
+ }
+
+ }
+
+
+ @Getter
+ public enum TypeEnum {
+ POLL("poll"),
+ PEEK("peek");
+
+ private final String value;
+
+ TypeEnum(String value) {
+ this.value = value;
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml
index a3eedb6960..f740cf7cd1 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml
@@ -35,11 +35,12 @@ connectorConfig:
connectionTimeout: 5000 # timeunit: ms, recommended scope: 5 - 10s
maxConnectionPoolSize: 5
retryConfig:
- maxAttempts: 3
+ maxRetries: 2
interval: 1000
- retryAll: false
+ retryOnNonSuccess: false
webhookConfig:
activate: false
exportPath: /export
port: 8988
- idleTimeout: 5000
+ serverIdleTimeout: 5000
+ maxStorageSize: 5000
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
index d32a11e2f2..87ba96f76c 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
@@ -29,6 +29,7 @@
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
@@ -51,6 +52,7 @@
import io.vertx.core.http.HttpMethod;
import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
public class HttpSinkConnectorTest {
@@ -129,17 +131,31 @@ void testPut() throws Exception {
// verify response
HttpWebhookConfig webhookConfig = sinkConfig.connectorConfig.getWebhookConfig();
- String url = "http://" + severUri.getHost() + ":" + webhookConfig.getPort() + webhookConfig.getExportPath();
+ URI uri = new URIBuilder()
+ .setScheme("http")
+ .setHost(severUri.getHost())
+ .setPort(webhookConfig.getPort())
+ .setPath(webhookConfig.getExportPath())
+ .addParameter("pageNum", "1")
+ .addParameter("pageSize", "10")
+ .addParameter("type", "poll")
+ .build();
+
CloseableHttpClient httpClient = HttpClients.createDefault();
+ HttpGet httpGet = new HttpGet(uri);
+ httpGet.setHeader("Content-Type", "application/json");
+ CloseableHttpResponse response = httpClient.execute(httpGet);
+ String body = EntityUtils.toString(response.getEntity());
+ assert body != null;
+ JSONArray pageItems = JSON.parseObject(body).getJSONArray("pageItems");
+ assert pageItems != null && pageItems.size() == times;
for (int i = 0; i < times; i++) {
- HttpGet httpGet = new HttpGet(url);
- httpGet.setHeader("Content-Type", "application/json");
- CloseableHttpResponse response = httpClient.execute(httpGet);
- assert response.getEntity() != null;
- String responseBody = EntityUtils.toString(response.getEntity());
- JSONObject jsonObject = JSON.parseObject(responseBody);
- assert jsonObject.get("data") != null;
+ JSONObject pageItem = pageItems.getJSONObject(i);
+ assert pageItem != null;
+ assert pageItem.getJSONObject("data") != null;
+ assert pageItem.getJSONObject("metadata") != null;
}
+
httpClient.close();
}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml
index 189cd5faf4..149ad7681b 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml
+++ b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml
@@ -35,11 +35,12 @@ connectorConfig:
connectionTimeout: 5000 # timeunit: ms, recommended scope: 5 - 10s
maxConnectionPoolSize: 10
retryConfig:
- maxAttempts: 3
+ maxRetries: 2
interval: 1000
- retryAll: true
+ retryOnNonSuccess: true
webhookConfig:
activate: true
exportPath: /export
port: 8988
- idleTimeout: 5000
\ No newline at end of file
+ serverIdleTimeout: 5000
+ maxStorageSize: 5000
\ No newline at end of file
diff --git a/tools/dependency-check/known-dependencies.txt b/tools/dependency-check/known-dependencies.txt
index 3982543ff7..6922791cea 100644
--- a/tools/dependency-check/known-dependencies.txt
+++ b/tools/dependency-check/known-dependencies.txt
@@ -77,6 +77,7 @@ endpoint-util-0.0.7.jar
endpoints-spi-2.20.29.jar
error_prone_annotations-2.9.0.jar
eventstream-1.0.1.jar
+failsafe-3.3.2.jar
failureaccess-1.0.1.jar
fastjson-1.2.69_noneautotype.jar
fastjson2-2.0.48.jar
@@ -297,8 +298,6 @@ reactor-core-3.4.13.jar
redisson-3.17.3.jar
regions-2.20.29.jar
relaxngDatatype-20020414.jar
-resilience4j-retry-1.7.1.jar
-resilience4j-core-1.7.1.jar
rocketmq-acl-4.9.5.jar
rocketmq-broker-4.9.5.jar
rocketmq-client-4.9.5.jar
@@ -345,8 +344,6 @@ tomcat-embed-el-9.0.56.jar
txw2-2.3.1.jar
utils-2.20.29.jar
validation-api-1.1.0.Final.jar
-vavr-0.10.2.jar
-vavr-match-0.10.2.jar
vertx-auth-common-4.4.6.jar
vertx-bridge-common-4.4.6.jar
vertx-core-4.4.6.jar
From 33b14e45dada174469e59ae9f782427f1647f482 Mon Sep 17 00:00:00 2001
From: zaki
Date: Mon, 29 Apr 2024 02:37:59 +0800
Subject: [PATCH 19/26] fix: fix License Check
---
eventmesh-connectors/eventmesh-connector-http/build.gradle | 1 -
1 file changed, 1 deletion(-)
diff --git a/eventmesh-connectors/eventmesh-connector-http/build.gradle b/eventmesh-connectors/eventmesh-connector-http/build.gradle
index b860f664e1..cee0c0623f 100644
--- a/eventmesh-connectors/eventmesh-connector-http/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-http/build.gradle
@@ -22,7 +22,6 @@ dependencies {
implementation 'io.cloudevents:cloudevents-http-vertx:3.0.0'
implementation 'io.vertx:vertx-web:4.4.6'
implementation 'io.vertx:vertx-web-client:4.4.6'
- implementation 'io.vertx:vertx-web-validation:4.4.6'
implementation 'dev.failsafe:failsafe:3.3.2'
testImplementation "org.apache.httpcomponents:httpclient"
From 4af7455c9b09d4a1771fa7580b7406a61160ea74 Mon Sep 17 00:00:00 2001
From: zaki
Date: Mon, 29 Apr 2024 20:11:28 +0800
Subject: [PATCH 20/26] fix: update something
---
.../http/sink/HttpSinkConnector.java | 8 +--
.../http/sink/data/HttpConnectRecord.java | 11 +++-
.../sink/handle/RetryHttpSinkHandler.java | 55 ++++++++++---------
.../sink/handle/WebhookHttpSinkHandler.java | 9 +--
4 files changed, 44 insertions(+), 39 deletions(-)
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
index c5bc3dff3d..23d09fa141 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
@@ -75,14 +75,14 @@ private void doInit() {
}
int maxRetries = this.httpSinkConfig.connectorConfig.getRetryConfig().getMaxRetries();
- if (maxRetries < 0) {
- throw new IllegalArgumentException("Max retries must be greater than or equal to 0.");
- } else if (maxRetries == 0) {
+ if (maxRetries == 0) {
// Use the original sink handler
this.sinkHandler = nonRetryHandler;
- } else {
+ } else if (maxRetries > 0) {
// Wrap the sink handler with a retry handler
this.sinkHandler = new RetryHttpSinkHandler(this.httpSinkConfig.connectorConfig, nonRetryHandler);
+ } else {
+ throw new IllegalArgumentException("Max retries must be greater than or equal to 0.");
}
}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
index a388e301c7..79d34a80b4 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
@@ -20,6 +20,7 @@
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import java.time.LocalDateTime;
+import java.util.UUID;
import lombok.Builder;
import lombok.Data;
@@ -33,7 +34,11 @@ public class HttpConnectRecord {
private String type;
- private String timestamp;
+ private String time;
+
+ private String uuid;
+
+ private String eventId;
private ConnectRecord data;
@@ -46,7 +51,9 @@ public class HttpConnectRecord {
public static HttpConnectRecord convertConnectRecord(ConnectRecord record, String type) {
return HttpConnectRecord.builder()
.type(type)
- .timestamp(LocalDateTime.now().toString())
+ .time(LocalDateTime.now().toString())
+ .uuid(UUID.randomUUID().toString())
+ .eventId(type + "-" + record.getTimestamp())
.data(record)
.build();
}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
index 5f66f6430f..fa7130962a 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
@@ -123,39 +123,43 @@ public void handle(ConnectRecord record) {
*/
@Override
public Future> deliver(URI url, HttpConnectRecord httpConnectRecord) {
- // Only webhook mode needs to use the firstTryId
- String firstTryId = UUID.randomUUID().toString();
+ // Only webhook mode needs to use the UUID to identify the request
+ String id = httpConnectRecord.getUuid();
// Build the retry policy
RetryPolicy> retryPolicy = retryPolicyBuilder
- .onSuccess(e -> {
+ .onSuccess(event -> {
if (connectorConfig.getWebhookConfig().isActivate()) {
// convert the result to an HttpExportRecord
- HttpExportRecord exportRecord = covertToExportRecord(e, e.getResult(), e.getException(), url, firstTryId);
+ HttpExportRecord exportRecord = covertToExportRecord(httpConnectRecord, event, event.getResult(), event.getException(), url, id);
// add the data to the queue
((WebhookHttpSinkHandler) sinkHandler).addDataToQueue(exportRecord);
}
})
- .onRetry(e -> {
+ .onRetry(event -> {
if (log.isDebugEnabled()) {
- log.warn("Retrying the request to {} for the {} time. HttpConnectRecord= {}", url, e.getAttemptCount(), httpConnectRecord);
+ log.warn("Retrying the request to {} for the {} time. HttpConnectRecord= {}", url, event.getAttemptCount(), httpConnectRecord);
} else {
- log.warn("Retrying the request to {} for the {} time.", url, e.getAttemptCount());
+ log.warn("Retrying the request to {} for the {} time.", url, event.getAttemptCount());
}
if (connectorConfig.getWebhookConfig().isActivate()) {
- HttpExportRecord exportRecord = covertToExportRecord(e, e.getLastResult(), e.getLastException(), url, firstTryId);
+ HttpExportRecord exportRecord =
+ covertToExportRecord(httpConnectRecord, event, event.getLastResult(), event.getLastException(), url, id);
((WebhookHttpSinkHandler) sinkHandler).addDataToQueue(exportRecord);
}
+ // update the HttpConnectRecord
+ httpConnectRecord.setTime(LocalDateTime.now().toString());
+ httpConnectRecord.setUuid(UUID.randomUUID().toString());
})
- .onFailure(e -> {
+ .onFailure(event -> {
if (log.isDebugEnabled()) {
- log.error("Failed to send the request to {} after {} attempts. HttpConnectRecord= {}", url, e.getAttemptCount(),
- httpConnectRecord, e.getException());
+ log.error("Failed to send the request to {} after {} attempts. HttpConnectRecord= {}", url, event.getAttemptCount(),
+ httpConnectRecord, event.getException());
} else {
- log.error("Failed to send the request to {} after {} attempts.", url, e.getAttemptCount(), e.getException());
+ log.error("Failed to send the request to {} after {} attempts.", url, event.getAttemptCount(), event.getException());
}
if (connectorConfig.getWebhookConfig().isActivate()) {
- HttpExportRecord exportRecord = covertToExportRecord(e, e.getResult(), e.getException(), url, firstTryId);
+ HttpExportRecord exportRecord = covertToExportRecord(httpConnectRecord, event, event.getResult(), event.getException(), url, id);
((WebhookHttpSinkHandler) sinkHandler).addDataToQueue(exportRecord);
}
}).build();
@@ -170,25 +174,24 @@ public Future> deliver(URI url, HttpConnectRecord httpConne
/**
* Converts the ExecutionCompletedEvent to an HttpExportRecord.
*
- * @param event the ExecutionCompletedEvent to convert
- * @param response the response of the request, may be null
- * @param e the exception thrown during the request, may be null
- * @param url the URL the request was sent to
- * @param firstTryId the UUID of the first try
+ * @param httpConnectRecord HttpConnectRecord
+ * @param event ExecutionEvent
+ * @param response the response of the request, may be null
+ * @param e the exception thrown during the request, may be null
+ * @param url the URL the request was sent to
+ * @param id UUID
* @return the converted HttpExportRecord
*/
- private HttpExportRecord covertToExportRecord(ExecutionEvent event, HttpResponse response, Throwable e, URI url, String firstTryId) {
+ private HttpExportRecord covertToExportRecord(HttpConnectRecord httpConnectRecord, ExecutionEvent event, HttpResponse response,
+ Throwable e, URI url, String id) {
HttpExportMetadataBuilder builder = HttpExportMetadata.builder()
.url(url.toString())
.receivedTime(LocalDateTime.now())
- .retryNum(event.getAttemptCount() - 1);
+ .retryNum(event.getAttemptCount() - 1)
+ .uuid(httpConnectRecord.getUuid());
- if (event.getAttemptCount() == 1) {
- builder.retriedBy(null)
- .uuid(firstTryId);
- } else {
- builder.retriedBy(firstTryId)
- .uuid(UUID.randomUUID().toString());
+ if (event.getAttemptCount() > 1) {
+ builder.retriedBy(id);
}
if (response != null) {
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
index ed6be468a2..860fdd9256 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
@@ -35,7 +35,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
-import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -233,7 +232,7 @@ public Future> deliver(URI url, HttpConnectRecord httpConne
.url(url.toString())
.receivedTime(LocalDateTime.now())
.retriedBy(null)
- .uuid(UUID.randomUUID().toString())
+ .uuid(httpConnectRecord.getUuid())
.retryNum(0);
if (arr.succeeded()) {
@@ -271,11 +270,7 @@ public void addDataToQueue(HttpExportRecord exportRecord) {
// Try to put the received data into the queue
if (receivedDataQueue.offer(exportRecord)) {
currentQueueSize.incrementAndGet();
- if (log.isDebugEnabled()) {
- log.debug("Successfully put the received data into the queue: {}", exportRecord);
- } else {
- log.info("Successfully put the received data into the queue");
- }
+ log.debug("Successfully put the received data into the queue: {}", exportRecord);
} else {
log.error("Failed to put the received data into the queue: {}", exportRecord);
}
From 5e2bf5254194bae8c5a29a38fff36cf9dd6d9d2f Mon Sep 17 00:00:00 2001
From: zaki
Date: Mon, 29 Apr 2024 20:31:26 +0800
Subject: [PATCH 21/26] fix: fix ci
---
.../sink/handle/RetryHttpSinkHandler.java | 27 ++++++-------------
.../sink/handle/WebhookHttpSinkHandler.java | 20 ++++++--------
2 files changed, 16 insertions(+), 31 deletions(-)
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
index fa7130962a..06700261d5 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
@@ -21,7 +21,6 @@
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata;
-import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata.HttpExportMetadataBuilder;
import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord;
import org.apache.eventmesh.connector.http.util.HttpUtils;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
@@ -184,27 +183,17 @@ public Future> deliver(URI url, HttpConnectRecord httpConne
*/
private HttpExportRecord covertToExportRecord(HttpConnectRecord httpConnectRecord, ExecutionEvent event, HttpResponse response,
Throwable e, URI url, String id) {
- HttpExportMetadataBuilder builder = HttpExportMetadata.builder()
+
+ HttpExportMetadata httpExportMetadata = HttpExportMetadata.builder()
.url(url.toString())
+ .code(response != null ? response.statusCode() : -1)
+ .message(response != null ? response.statusMessage() : e.getMessage())
.receivedTime(LocalDateTime.now())
- .retryNum(event.getAttemptCount() - 1)
- .uuid(httpConnectRecord.getUuid());
-
- if (event.getAttemptCount() > 1) {
- builder.retriedBy(id);
- }
-
- if (response != null) {
- // record the response
- builder.code(response.statusCode())
- .message(response.statusMessage());
- } else {
- // record the exception
- builder.code(-1)
- .message(e.getMessage());
- }
+ .uuid(httpConnectRecord.getUuid())
+ .retriedBy(event.getAttemptCount() > 1 ? id : null)
+ .retryNum(event.getAttemptCount() - 1).build();
- return new HttpExportRecord(builder.build(), response == null ? null : response.bodyAsString());
+ return new HttpExportRecord(httpExportMetadata, response == null ? null : response.bodyAsString());
}
/**
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
index 860fdd9256..e07683fcfa 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
@@ -22,7 +22,6 @@
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata;
-import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata.HttpExportMetadataBuilder;
import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord;
import org.apache.eventmesh.connector.http.sink.data.HttpExportRecordPage;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
@@ -228,23 +227,20 @@ public Future> deliver(URI url, HttpConnectRecord httpConne
return;
}
// create ExportMetadataBuilder
- HttpExportMetadataBuilder builder = HttpExportMetadata.builder()
+ HttpResponse response = arr.succeeded() ? arr.result() : null;
+
+ HttpExportMetadata httpExportMetadata = HttpExportMetadata.builder()
.url(url.toString())
+ .code(response != null ? response.statusCode() : -1)
+ .message(response != null ? response.statusMessage() : arr.cause().getMessage())
.receivedTime(LocalDateTime.now())
.retriedBy(null)
.uuid(httpConnectRecord.getUuid())
- .retryNum(0);
+ .retryNum(0)
+ .build();
- if (arr.succeeded()) {
- HttpResponse response = arr.result();
- builder.code(response.statusCode())
- .message(response.statusMessage());
- } else {
- builder.code(-1)
- .message(arr.cause().getMessage());
- }
// create ExportRecord
- HttpExportRecord exportRecord = new HttpExportRecord(builder.build(), arr.succeeded() ? arr.result().bodyAsString() : null);
+ HttpExportRecord exportRecord = new HttpExportRecord(httpExportMetadata, arr.succeeded() ? arr.result().bodyAsString() : null);
// add the data to the queue
addDataToQueue(exportRecord);
});
From 3dcc5a978bfc05315de31bb28b799c49c9ff9a54 Mon Sep 17 00:00:00 2001
From: zaki
Date: Mon, 29 Apr 2024 21:56:28 +0800
Subject: [PATCH 22/26] fix: update something
---
.../connector/http/sink/data/HttpConnectRecord.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
index 79d34a80b4..b1e19df5f6 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
@@ -20,6 +20,7 @@
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import java.time.LocalDateTime;
+import java.util.Map;
import java.util.UUID;
import lombok.Builder;
@@ -49,11 +50,16 @@ public class HttpConnectRecord {
* @return the converted HttpConnectRecord
*/
public static HttpConnectRecord convertConnectRecord(ConnectRecord record, String type) {
+ Map map = record.getPosition().getOffset().getOffset();
+ String offset = "0";
+ if (!map.isEmpty()) {
+ offset = map.values().iterator().next().toString();
+ }
return HttpConnectRecord.builder()
.type(type)
.time(LocalDateTime.now().toString())
.uuid(UUID.randomUUID().toString())
- .eventId(type + "-" + record.getTimestamp())
+ .eventId(type + "-" + offset)
.data(record)
.build();
}
From 6f3b36162f964120099ffd63ff1bec9b96f87b77 Mon Sep 17 00:00:00 2001
From: zaki
Date: Mon, 29 Apr 2024 23:17:24 +0800
Subject: [PATCH 23/26] fix: Optimized naming
---
.../connector/http/sink/data/HttpConnectRecord.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
index b1e19df5f6..6274db1f7c 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
@@ -50,10 +50,10 @@ public class HttpConnectRecord {
* @return the converted HttpConnectRecord
*/
public static HttpConnectRecord convertConnectRecord(ConnectRecord record, String type) {
- Map map = record.getPosition().getOffset().getOffset();
+ Map offsets = record.getPosition().getOffset().getOffset();
String offset = "0";
- if (!map.isEmpty()) {
- offset = map.values().iterator().next().toString();
+ if (!offsets.isEmpty()) {
+ offset = offsets.values().iterator().next().toString();
}
return HttpConnectRecord.builder()
.type(type)
From 71c12e841f0d0a559d9e0369bb5d01815c5cc21c Mon Sep 17 00:00:00 2001
From: zaki
Date: Tue, 30 Apr 2024 01:13:03 +0800
Subject: [PATCH 24/26] fix: fix ci
---
.../http/sink/data/HttpConnectRecord.java | 6 +-
.../connector/HttpSinkConnectorTest.java | 66 +++++++++----------
.../src/test/resources/sink-config.yml | 2 +-
tools/dependency-check/known-dependencies.txt | 2 +
4 files changed, 36 insertions(+), 40 deletions(-)
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
index 6274db1f7c..1bfd223079 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
@@ -50,10 +50,10 @@ public class HttpConnectRecord {
* @return the converted HttpConnectRecord
*/
public static HttpConnectRecord convertConnectRecord(ConnectRecord record, String type) {
- Map offsets = record.getPosition().getOffset().getOffset();
+ Map offsetMap = record.getPosition().getOffset().getOffset();
String offset = "0";
- if (!offsets.isEmpty()) {
- offset = offsets.values().iterator().next().toString();
+ if (!offsetMap.isEmpty()) {
+ offset = offsetMap.values().iterator().next().toString();
}
return HttpConnectRecord.builder()
.type(type)
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
index 87ba96f76c..be6157daea 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
@@ -21,18 +21,11 @@
import org.apache.eventmesh.connector.http.sink.HttpSinkConnector;
import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig;
-import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
import org.apache.eventmesh.openconnect.util.ConfigUtil;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
import java.net.URI;
import java.util.ArrayList;
@@ -52,7 +45,6 @@
import io.vertx.core.http.HttpMethod;
import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
public class HttpSinkConnectorTest {
@@ -129,34 +121,36 @@ void testPut() throws Exception {
.withPath(severUri.getPath()),
VerificationTimes.exactly(times));
- // verify response
- HttpWebhookConfig webhookConfig = sinkConfig.connectorConfig.getWebhookConfig();
- URI uri = new URIBuilder()
- .setScheme("http")
- .setHost(severUri.getHost())
- .setPort(webhookConfig.getPort())
- .setPath(webhookConfig.getExportPath())
- .addParameter("pageNum", "1")
- .addParameter("pageSize", "10")
- .addParameter("type", "poll")
- .build();
-
- CloseableHttpClient httpClient = HttpClients.createDefault();
- HttpGet httpGet = new HttpGet(uri);
- httpGet.setHeader("Content-Type", "application/json");
- CloseableHttpResponse response = httpClient.execute(httpGet);
- String body = EntityUtils.toString(response.getEntity());
- assert body != null;
- JSONArray pageItems = JSON.parseObject(body).getJSONArray("pageItems");
- assert pageItems != null && pageItems.size() == times;
- for (int i = 0; i < times; i++) {
- JSONObject pageItem = pageItems.getJSONObject(i);
- assert pageItem != null;
- assert pageItem.getJSONObject("data") != null;
- assert pageItem.getJSONObject("metadata") != null;
- }
-
- httpClient.close();
+ // The following code is only required in webhook mode
+
+// // verify response
+// HttpWebhookConfig webhookConfig = sinkConfig.connectorConfig.getWebhookConfig();
+// URI uri = new URIBuilder()
+// .setScheme("http")
+// .setHost(severUri.getHost())
+// .setPort(webhookConfig.getPort())
+// .setPath(webhookConfig.getExportPath())
+// .addParameter("pageNum", "1")
+// .addParameter("pageSize", "10")
+// .addParameter("type", "poll")
+// .build();
+//
+// CloseableHttpClient httpClient = HttpClients.createDefault();
+// HttpGet httpGet = new HttpGet(uri);
+// httpGet.setHeader("Content-Type", "application/json");
+// CloseableHttpResponse response = httpClient.execute(httpGet);
+// String body = EntityUtils.toString(response.getEntity());
+// assert body != null;
+// JSONArray pageItems = JSON.parseObject(body).getJSONArray("pageItems");
+// assert pageItems != null && pageItems.size() == times;
+// for (int i = 0; i < times; i++) {
+// JSONObject pageItem = pageItems.getJSONObject(i);
+// assert pageItem != null;
+// assert pageItem.getJSONObject("data") != null;
+// assert pageItem.getJSONObject("metadata") != null;
+// }
+//
+// httpClient.close();
}
private ConnectRecord createConnectRecord() {
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml
index 149ad7681b..8f6623f298 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml
+++ b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml
@@ -39,7 +39,7 @@ connectorConfig:
interval: 1000
retryOnNonSuccess: true
webhookConfig:
- activate: true
+ activate: false
exportPath: /export
port: 8988
serverIdleTimeout: 5000
diff --git a/tools/dependency-check/known-dependencies.txt b/tools/dependency-check/known-dependencies.txt
index 6922791cea..7d6e4a4606 100644
--- a/tools/dependency-check/known-dependencies.txt
+++ b/tools/dependency-check/known-dependencies.txt
@@ -46,6 +46,7 @@ checker-qual-3.12.0.jar
classmate-1.5.1.jar
cloudevents-api-2.4.2.jar
cloudevents-core-2.4.2.jar
+cloudevents-http-vertx-2.3.0.jar
cloudevents-http-vertx-3.0.0.jar
cloudevents-json-jackson-2.4.2.jar
cloudevents-kafka-2.4.2.jar
@@ -348,6 +349,7 @@ vertx-auth-common-4.4.6.jar
vertx-bridge-common-4.4.6.jar
vertx-core-4.4.6.jar
vertx-web-4.4.6.jar
+vertx-web-client-4.0.0.jar
vertx-web-client-4.4.6.jar
vertx-web-common-4.4.6.jar
vertx-uri-template-4.4.6.jar
From 6732e53d09b12ded8483638540e9df0cac68173a Mon Sep 17 00:00:00 2001
From: zaki
Date: Tue, 30 Apr 2024 01:19:41 +0800
Subject: [PATCH 25/26] fix: fix style check error
---
.../connector/HttpSinkConnectorTest.java | 64 ++++++++++---------
1 file changed, 33 insertions(+), 31 deletions(-)
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
index be6157daea..fe8f9f9109 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
@@ -26,7 +26,6 @@
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
import org.apache.eventmesh.openconnect.util.ConfigUtil;
-
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@@ -121,36 +120,39 @@ void testPut() throws Exception {
.withPath(severUri.getPath()),
VerificationTimes.exactly(times));
- // The following code is only required in webhook mode
-
-// // verify response
-// HttpWebhookConfig webhookConfig = sinkConfig.connectorConfig.getWebhookConfig();
-// URI uri = new URIBuilder()
-// .setScheme("http")
-// .setHost(severUri.getHost())
-// .setPort(webhookConfig.getPort())
-// .setPath(webhookConfig.getExportPath())
-// .addParameter("pageNum", "1")
-// .addParameter("pageSize", "10")
-// .addParameter("type", "poll")
-// .build();
-//
-// CloseableHttpClient httpClient = HttpClients.createDefault();
-// HttpGet httpGet = new HttpGet(uri);
-// httpGet.setHeader("Content-Type", "application/json");
-// CloseableHttpResponse response = httpClient.execute(httpGet);
-// String body = EntityUtils.toString(response.getEntity());
-// assert body != null;
-// JSONArray pageItems = JSON.parseObject(body).getJSONArray("pageItems");
-// assert pageItems != null && pageItems.size() == times;
-// for (int i = 0; i < times; i++) {
-// JSONObject pageItem = pageItems.getJSONObject(i);
-// assert pageItem != null;
-// assert pageItem.getJSONObject("data") != null;
-// assert pageItem.getJSONObject("metadata") != null;
-// }
-//
-// httpClient.close();
+ /*
+ **The following code is only required in webhook mode**
+
+ // verify response
+ HttpWebhookConfig webhookConfig = sinkConfig.connectorConfig.getWebhookConfig();
+ URI uri = new URIBuilder()
+ .setScheme("http")
+ .setHost(severUri.getHost())
+ .setPort(webhookConfig.getPort())
+ .setPath(webhookConfig.getExportPath())
+ .addParameter("pageNum", "1")
+ .addParameter("pageSize", "10")
+ .addParameter("type", "poll")
+ .build();
+
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ HttpGet httpGet = new HttpGet(uri);
+ httpGet.setHeader("Content-Type", "application/json");
+ CloseableHttpResponse response = httpClient.execute(httpGet);
+ String body = EntityUtils.toString(response.getEntity());
+ assert body != null;
+ JSONArray pageItems = JSON.parseObject(body).getJSONArray("pageItems");
+ assert pageItems != null && pageItems.size() == times;
+ for (int i = 0; i < times; i++) {
+ JSONObject pageItem = pageItems.getJSONObject(i);
+ assert pageItem != null;
+ assert pageItem.getJSONObject("data") != null;
+ assert pageItem.getJSONObject("metadata") != null;
+ }
+
+ httpClient.close();
+
+ */
}
private ConnectRecord createConnectRecord() {
From 2f2e81e4176f120c58414b4a2ab5bf01473cb186 Mon Sep 17 00:00:00 2001
From: zaki
Date: Tue, 30 Apr 2024 22:19:06 +0800
Subject: [PATCH 26/26] test: update HttpSinkConnectorTest
---
.../eventmesh-connector-http/build.gradle | 1 +
.../connector/HttpSinkConnectorTest.java | 86 ++++++++++---------
.../src/test/resources/sink-config.yml | 2 +-
3 files changed, 48 insertions(+), 41 deletions(-)
diff --git a/eventmesh-connectors/eventmesh-connector-http/build.gradle b/eventmesh-connectors/eventmesh-connector-http/build.gradle
index cee0c0623f..786ac4518d 100644
--- a/eventmesh-connectors/eventmesh-connector-http/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-http/build.gradle
@@ -26,6 +26,7 @@ dependencies {
testImplementation "org.apache.httpcomponents:httpclient"
testImplementation 'org.mock-server:mockserver-netty:5.15.0'
+ testImplementation 'com.squareup.okhttp3:okhttp:4.12.0'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
index fe8f9f9109..738df6430b 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
@@ -21,6 +21,7 @@
import org.apache.eventmesh.connector.http.sink.HttpSinkConnector;
import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig;
+import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
@@ -34,18 +35,21 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.mockserver.client.MockServerClient;
import org.mockserver.integration.ClientAndServer;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;
import org.mockserver.model.MediaType;
-import org.mockserver.verify.VerificationTimes;
-
-import io.vertx.core.http.HttpMethod;
import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+
public class HttpSinkConnectorTest {
private HttpSinkConnector sinkConnector;
@@ -68,8 +72,7 @@ void before() throws Exception {
this.severUri = URI.create(sinkConfig.connectorConfig.getUrls()[0]);
// start mockServer
mockServer = ClientAndServer.startClientAndServer(severUri.getPort());
- // mockServer response
- new MockServerClient(severUri.getHost(), severUri.getPort())
+ mockServer.reset()
.when(
request()
.withMethod("POST")
@@ -113,46 +116,49 @@ void testPut() throws Exception {
Thread.sleep(5000);
// verify request
- new MockServerClient(severUri.getHost(), severUri.getPort())
- .verify(
- HttpRequest.request()
- .withMethod(HttpMethod.POST.name())
- .withPath(severUri.getPath()),
- VerificationTimes.exactly(times));
-
- /*
- **The following code is only required in webhook mode**
+ HttpRequest[] recordedRequests = mockServer.retrieveRecordedRequests(null);
+ assert recordedRequests.length == times;
// verify response
HttpWebhookConfig webhookConfig = sinkConfig.connectorConfig.getWebhookConfig();
- URI uri = new URIBuilder()
- .setScheme("http")
- .setHost(severUri.getHost())
- .setPort(webhookConfig.getPort())
- .setPath(webhookConfig.getExportPath())
- .addParameter("pageNum", "1")
- .addParameter("pageSize", "10")
- .addParameter("type", "poll")
+ String url = new HttpUrl.Builder()
+ .scheme("http")
+ .host(severUri.getHost())
+ .port(webhookConfig.getPort())
+ .addPathSegments(webhookConfig.getExportPath())
+ .addQueryParameter("pageNum", "1")
+ .addQueryParameter("pageSize", "10")
+ .addQueryParameter("type", "poll")
+ .build().toString();
+
+ // build request
+ Request request = new Request.Builder()
+ .url(url)
+ .addHeader("Content-Type", "application/json")
.build();
- CloseableHttpClient httpClient = HttpClients.createDefault();
- HttpGet httpGet = new HttpGet(uri);
- httpGet.setHeader("Content-Type", "application/json");
- CloseableHttpResponse response = httpClient.execute(httpGet);
- String body = EntityUtils.toString(response.getEntity());
- assert body != null;
- JSONArray pageItems = JSON.parseObject(body).getJSONArray("pageItems");
- assert pageItems != null && pageItems.size() == times;
- for (int i = 0; i < times; i++) {
- JSONObject pageItem = pageItems.getJSONObject(i);
- assert pageItem != null;
- assert pageItem.getJSONObject("data") != null;
- assert pageItem.getJSONObject("metadata") != null;
+ OkHttpClient client = new OkHttpClient();
+ try (Response response = client.newCall(request).execute()) {
+ // check response code
+ if (!response.isSuccessful()) {
+ throw new RuntimeException("Unexpected response code: " + response);
+ }
+ // check response body
+ ResponseBody responseBody = response.body();
+ if (responseBody != null) {
+ JSONObject jsonObject = JSON.parseObject(responseBody.string());
+ JSONArray pageItems = jsonObject.getJSONArray("pageItems");
+
+ assert pageItems != null && pageItems.size() == times;
+
+ for (int i = 0; i < times; i++) {
+ JSONObject pageItem = pageItems.getJSONObject(i);
+ assert pageItem != null;
+ assert pageItem.getJSONObject("data") != null;
+ assert pageItem.getJSONObject("metadata") != null;
+ }
+ }
}
-
- httpClient.close();
-
- */
}
private ConnectRecord createConnectRecord() {
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml
index 8f6623f298..149ad7681b 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml
+++ b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml
@@ -39,7 +39,7 @@ connectorConfig:
interval: 1000
retryOnNonSuccess: true
webhookConfig:
- activate: false
+ activate: true
exportPath: /export
port: 8988
serverIdleTimeout: 5000