diff --git a/eventmesh-connectors/eventmesh-connector-http/build.gradle b/eventmesh-connectors/eventmesh-connector-http/build.gradle index 734b2fc622..786ac4518d 100644 --- a/eventmesh-connectors/eventmesh-connector-http/build.gradle +++ b/eventmesh-connectors/eventmesh-connector-http/build.gradle @@ -18,10 +18,15 @@ dependencies { api project(":eventmesh-openconnect:eventmesh-openconnect-java") implementation project(":eventmesh-common") - implementation 'io.cloudevents:cloudevents-http-vertx:2.3.0' + + implementation 'io.cloudevents:cloudevents-http-vertx:3.0.0' implementation 'io.vertx:vertx-web:4.4.6' + implementation 'io.vertx:vertx-web-client:4.4.6' + implementation 'dev.failsafe:failsafe:3.3.2' testImplementation "org.apache.httpcomponents:httpclient" + testImplementation 'org.mock-server:mockserver-netty:5.15.0' + testImplementation 'com.squareup.okhttp3:okhttp:4.12.0' compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' } \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java index bd94fed126..8d753d2815 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java @@ -18,6 +18,7 @@ package org.apache.eventmesh.connector.http.server; import org.apache.eventmesh.connector.http.config.HttpServerConfig; +import org.apache.eventmesh.connector.http.sink.HttpSinkConnector; import org.apache.eventmesh.connector.http.source.connector.HttpSourceConnector; import org.apache.eventmesh.openconnect.Application; import org.apache.eventmesh.openconnect.util.ConfigUtil; @@ -33,7 +34,8 @@ public static void main(String[] args) throws Exception { } if (serverConfig.isSinkEnable()) { - // TODO support sink connector + Application httpSinkApp = new Application(); + httpSinkApp.run(HttpSinkConnector.class); } } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java new file mode 100644 index 0000000000..23d09fa141 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink; + +import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig; +import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.connector.http.sink.handle.CommonHttpSinkHandler; +import org.apache.eventmesh.connector.http.sink.handle.HttpSinkHandler; +import org.apache.eventmesh.connector.http.sink.handle.RetryHttpSinkHandler; +import org.apache.eventmesh.connector.http.sink.handle.WebhookHttpSinkHandler; +import org.apache.eventmesh.openconnect.api.config.Config; +import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; +import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; +import org.apache.eventmesh.openconnect.api.sink.Sink; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.util.List; +import java.util.Objects; + +import lombok.Getter; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class HttpSinkConnector implements Sink { + + private HttpSinkConfig httpSinkConfig; + + @Getter + private HttpSinkHandler sinkHandler; + + @Override + public Class configClass() { + return HttpSinkConfig.class; + } + + @Override + public void init(Config config) throws Exception { + this.httpSinkConfig = (HttpSinkConfig) config; + doInit(); + } + + @Override + public void init(ConnectorContext connectorContext) throws Exception { + SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) connectorContext; + this.httpSinkConfig = (HttpSinkConfig) sinkConnectorContext.getSinkConfig(); + doInit(); + } + + @SneakyThrows + private void doInit() { + // Fill default values if absent + SinkConnectorConfig.populateFieldsWithDefaults(this.httpSinkConfig.connectorConfig); + // Create different handlers for different configurations + HttpSinkHandler nonRetryHandler; + if (this.httpSinkConfig.connectorConfig.getWebhookConfig().isActivate()) { + nonRetryHandler = new WebhookHttpSinkHandler(this.httpSinkConfig.connectorConfig); + } else { + nonRetryHandler = new CommonHttpSinkHandler(this.httpSinkConfig.connectorConfig); + } + + int maxRetries = this.httpSinkConfig.connectorConfig.getRetryConfig().getMaxRetries(); + if (maxRetries == 0) { + // Use the original sink handler + this.sinkHandler = nonRetryHandler; + } else if (maxRetries > 0) { + // Wrap the sink handler with a retry handler + this.sinkHandler = new RetryHttpSinkHandler(this.httpSinkConfig.connectorConfig, nonRetryHandler); + } else { + throw new IllegalArgumentException("Max retries must be greater than or equal to 0."); + } + } + + @Override + public void start() throws Exception { + this.sinkHandler.start(); + } + + @Override + public void commit(ConnectRecord record) { + + } + + @Override + public String name() { + return this.httpSinkConfig.connectorConfig.getConnectorName(); + } + + @Override + public void stop() throws Exception { + this.sinkHandler.stop(); + } + + @Override + public void put(List sinkRecords) { + for (ConnectRecord sinkRecord : sinkRecords) { + try { + if (Objects.isNull(sinkRecord)) { + log.warn("ConnectRecord data is null, ignore."); + continue; + } + // Handle the ConnectRecord + this.sinkHandler.handle(sinkRecord); + } catch (Exception e) { + log.error("Failed to sink message via HTTP. ", e); + } + } + } +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java new file mode 100644 index 0000000000..cf6d5adcb7 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.config; + +import lombok.Data; + +@Data +public class HttpRetryConfig { + // maximum number of retries, default 3, minimum 0 + private int maxRetries = 3; + + // retry interval, default 2000ms + private int interval = 2000; + + // Default value is false, indicating that only requests with network-level errors will be retried. + // If set to true, all failed requests will be retried, including network-level errors and non-2xx responses. + private boolean retryOnNonSuccess = false; +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpSinkConfig.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpSinkConfig.java new file mode 100644 index 0000000000..3dd0c2b6a5 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpSinkConfig.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.config; + +import org.apache.eventmesh.openconnect.api.config.SinkConfig; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class HttpSinkConfig extends SinkConfig { + + public SinkConnectorConfig connectorConfig; +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpWebhookConfig.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpWebhookConfig.java new file mode 100644 index 0000000000..f15bac4568 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpWebhookConfig.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.config; + +import lombok.Data; + +@Data +public class HttpWebhookConfig { + + private boolean activate = false; + + // Path to display/export callback data + private String exportPath = "/export"; + + private int port; + + // timeunit: ms + private int serverIdleTimeout = 5000; + + // max size of the storage queue + private int maxStorageSize = 5000; +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java new file mode 100644 index 0000000000..9bb338cceb --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.config; + +import io.vertx.core.http.HttpClientOptions; + +import lombok.Data; + +@Data +public class SinkConnectorConfig { + + private String connectorName; + + private String[] urls; + + // keepAlive, default true + private boolean keepAlive = HttpClientOptions.DEFAULT_KEEP_ALIVE; + + // timeunit: ms, default 60000ms + private int keepAliveTimeout = HttpClientOptions.DEFAULT_KEEP_ALIVE_TIMEOUT * 1000; // Keep units consistent + + // timeunit: ms, default 5000ms, recommended scope: 5000ms - 10000ms + private int connectionTimeout = 5000; + + // timeunit: ms, default 5000ms + private int idleTimeout; + + // maximum number of HTTP/1 connections a client will pool, default 5 + private int maxConnectionPoolSize = HttpClientOptions.DEFAULT_MAX_POOL_SIZE; + + // retry config + private HttpRetryConfig retryConfig = new HttpRetryConfig(); + + // webhook config + private HttpWebhookConfig webhookConfig = new HttpWebhookConfig(); + + + /** + * Fill default values if absent (When there are multiple default values for a field) + * + * @param config SinkConnectorConfig + */ + public static void populateFieldsWithDefaults(SinkConnectorConfig config) { + /* + * set default values for idleTimeout + * recommended scope: common(5s - 10s), webhook(15s - 30s) + */ + final int commonHttpIdleTimeout = 5000; + final int webhookHttpIdleTimeout = 15000; + + // Set default values for idleTimeout + if (config.getIdleTimeout() == 0) { + int idleTimeout = config.webhookConfig.isActivate() ? webhookHttpIdleTimeout : commonHttpIdleTimeout; + config.setIdleTimeout(idleTimeout); + } + + } +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java new file mode 100644 index 0000000000..1bfd223079 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.data; + +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.time.LocalDateTime; +import java.util.Map; +import java.util.UUID; + +import lombok.Builder; +import lombok.Data; + +/** + * a special ConnectRecord for HttpSinkConnector + */ +@Data +@Builder +public class HttpConnectRecord { + + private String type; + + private String time; + + private String uuid; + + private String eventId; + + private ConnectRecord data; + + /** + * Convert ConnectRecord to HttpConnectRecord + * + * @param record the ConnectRecord to convert + * @return the converted HttpConnectRecord + */ + public static HttpConnectRecord convertConnectRecord(ConnectRecord record, String type) { + Map offsetMap = record.getPosition().getOffset().getOffset(); + String offset = "0"; + if (!offsetMap.isEmpty()) { + offset = offsetMap.values().iterator().next().toString(); + } + return HttpConnectRecord.builder() + .type(type) + .time(LocalDateTime.now().toString()) + .uuid(UUID.randomUUID().toString()) + .eventId(type + "-" + offset) + .data(record) + .build(); + } + +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java new file mode 100644 index 0000000000..848012f152 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.data; + +import java.time.LocalDateTime; + +import lombok.Builder; +import lombok.Data; + +/** + * Metadata for an HTTP export operation. + */ +@Data +@Builder +public class HttpExportMetadata { + private String url; + + private int code; + + private String message; + + private LocalDateTime receivedTime; + + private String uuid; + + private String retriedBy; + + private int retryNum; +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecord.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecord.java new file mode 100644 index 0000000000..b6382aee7a --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecord.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.data; + +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * Represents an HTTP export record containing metadata and data to be exported. + */ +@Data +@AllArgsConstructor +public class HttpExportRecord { + + private HttpExportMetadata metadata; + + private Object data; +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecordPage.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecordPage.java new file mode 100644 index 0000000000..5c44eb3b7f --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecordPage.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.data; + +import java.util.List; + +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * Represents a page of HTTP export records. + */ +@Data +@AllArgsConstructor +public class HttpExportRecordPage { + + private int pageNum; + + private int pageSize; + + private List pageItems; + +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java new file mode 100644 index 0000000000..e21046c4d2 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.handle; + +import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; +import org.apache.eventmesh.connector.http.util.HttpUtils; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.vertx.core.Future; +import io.vertx.core.MultiMap; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpHeaders; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * Common HTTP/HTTPS Sink Handler implementation to handle ConnectRecords by sending them over HTTP or HTTPS to configured URLs. + * + *

This handler initializes a WebClient for making HTTP requests based on the provided SinkConnectorConfig. + * It handles processing ConnectRecords by converting them to HttpConnectRecord and sending them asynchronously to each configured URL using the + * WebClient.

+ * + *

The handler uses Vert.x's WebClient to perform HTTP/HTTPS requests. It initializes the WebClient in the {@link #start()} + * method and closes it in the {@link #stop()} method to manage resources efficiently.

+ * + *

Each ConnectRecord is processed and sent to all configured URLs concurrently using asynchronous HTTP requests.

+ */ +@Slf4j +@Getter +public class CommonHttpSinkHandler implements HttpSinkHandler { + + private final SinkConnectorConfig connectorConfig; + + private final List urls; + + private WebClient webClient; + + + public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) { + this.connectorConfig = sinkConnectorConfig; + // Initialize URLs + String[] urlStrings = sinkConnectorConfig.getUrls(); + this.urls = Arrays.stream(urlStrings) + .map(URI::create) + .collect(Collectors.toList()); + } + + /** + * Initializes the WebClient for making HTTP requests based on the provided SinkConnectorConfig. + */ + @Override + public void start() { + // Create WebClient + doInitWebClient(); + } + + /** + * Initializes the WebClient with the provided configuration options. + */ + private void doInitWebClient() { + final Vertx vertx = Vertx.vertx(); + WebClientOptions options = new WebClientOptions() + .setKeepAlive(this.connectorConfig.isKeepAlive()) + .setKeepAliveTimeout(this.connectorConfig.getKeepAliveTimeout() / 1000) + .setIdleTimeout(this.connectorConfig.getIdleTimeout()) + .setIdleTimeoutUnit(TimeUnit.MILLISECONDS) + .setConnectTimeout(this.connectorConfig.getConnectionTimeout()) + .setMaxPoolSize(this.connectorConfig.getMaxConnectionPoolSize()); + this.webClient = WebClient.create(vertx, options); + } + + /** + * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed. + * + * @param record the ConnectRecord to process + */ + @Override + public void handle(ConnectRecord record) { + for (URI url : this.urls) { + // convert ConnectRecord to HttpConnectRecord + String type = String.format("%s.%s.%s", connectorConfig.getConnectorName(), url.getScheme(), "common"); + HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type); + deliver(url, httpConnectRecord); + } + } + + + /** + * Processes HttpConnectRecord on specified URL while returning its own processing logic. + * This method sends the HttpConnectRecord to the specified URL using the WebClient. + * + * @param url URI to which the HttpConnectRecord should be sent + * @param httpConnectRecord HttpConnectRecord to process + * @return processing chain + */ + @Override + public Future> deliver(URI url, HttpConnectRecord httpConnectRecord) { + // create headers + MultiMap headers = HttpHeaders.headers() + .set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8") + .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8"); + + // get timestamp and offset + Long timestamp = httpConnectRecord.getData().getTimestamp(); + Map offset = httpConnectRecord.getData().getPosition().getOffset().getOffset(); + + // send the request + return this.webClient.post(url.getPath()) + .host(url.getHost()) + .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), "https") ? 443 : 80) : url.getPort()) + .putHeaders(headers) + .ssl(Objects.equals(url.getScheme(), "https")) + .sendJson(httpConnectRecord) + .onSuccess(res -> { + log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, offset); + // log the response + if (HttpUtils.is2xxSuccessful(res.statusCode())) { + if (log.isDebugEnabled()) { + log.debug("Received successful response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}", + res.statusCode(), timestamp, offset, res.bodyAsString()); + } else { + log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset); + } + } else { + if (log.isDebugEnabled()) { + log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}", + res.statusCode(), timestamp, offset, res.bodyAsString()); + } else { + log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset); + } + } + + }) + .onFailure(err -> log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, offset, err)); + } + + + /** + * Cleans up and releases resources used by the HTTP/HTTPS handler. + */ + @Override + public void stop() { + if (this.webClient != null) { + this.webClient.close(); + } else { + log.warn("WebClient is null, ignore."); + } + } + + +} \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java new file mode 100644 index 0000000000..09fd66a762 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.handle; + +import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.net.URI; + +import io.vertx.core.Future; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpResponse; + +/** + * Interface for handling ConnectRecords via HTTP or HTTPS. Classes implementing this interface are responsible for processing ConnectRecords by + * sending them over HTTP or HTTPS, with additional support for handling multiple requests and asynchronous processing. + * + *

Any class that needs to process ConnectRecords via HTTP or HTTPS should implement this interface. + * Implementing classes must provide implementations for the {@link #start()}, {@link #handle(ConnectRecord)}, + * {@link #deliver(URI, HttpConnectRecord)}, and {@link #stop()} methods.

+ * + *

Implementing classes should ensure thread safety and handle HTTP/HTTPS communication efficiently. + * The {@link #start()} method initializes any necessary resources for HTTP/HTTPS communication. The {@link #handle(ConnectRecord)} method processes a + * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, HttpConnectRecord)} method processes HttpConnectRecord on specified URL + * while returning its own processing logic {@link #stop()} method releases any resources used for HTTP/HTTPS communication.

+ * + *

It's recommended to handle exceptions gracefully within the {@link #deliver(URI, HttpConnectRecord)} method + * to prevent message loss or processing interruptions.

+ */ +public interface HttpSinkHandler { + + /** + * Initializes the HTTP/HTTPS handler. This method should be called before using the handler. + */ + void start(); + + /** + * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed. + * + * @param record the ConnectRecord to process + */ + void handle(ConnectRecord record); + + + /** + * Processes HttpConnectRecord on specified URL while returning its own processing logic + * + * @param url URI to which the HttpConnectRecord should be sent + * @param httpConnectRecord HttpConnectRecord to process + * @return processing chain + */ + Future> deliver(URI url, HttpConnectRecord httpConnectRecord); + + /** + * Cleans up and releases resources used by the HTTP/HTTPS handler. This method should be called when the handler is no longer needed. + */ + void stop(); +} + diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java new file mode 100644 index 0000000000..06700261d5 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.handle; + +import org.apache.eventmesh.connector.http.sink.config.HttpRetryConfig; +import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; +import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata; +import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord; +import org.apache.eventmesh.connector.http.util.HttpUtils; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.net.ConnectException; +import java.net.URI; +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +import io.vertx.core.Future; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpResponse; + +import lombok.extern.slf4j.Slf4j; + +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import dev.failsafe.RetryPolicyBuilder; +import dev.failsafe.event.ExecutionEvent; + + +@Slf4j +public class RetryHttpSinkHandler implements HttpSinkHandler { + + private final SinkConnectorConfig connectorConfig; + + // Retry policy builder + private RetryPolicyBuilder> retryPolicyBuilder; + + private final List urls; + + private final HttpSinkHandler sinkHandler; + + + public RetryHttpSinkHandler(SinkConnectorConfig connectorConfig, HttpSinkHandler sinkHandler) { + this.connectorConfig = connectorConfig; + this.sinkHandler = sinkHandler; + + // Initialize retry + initRetry(); + + // Initialize URLs + String[] urlStrings = connectorConfig.getUrls(); + this.urls = Arrays.stream(urlStrings) + .map(URI::create) + .collect(Collectors.toList()); + } + + private void initRetry() { + HttpRetryConfig httpRetryConfig = this.connectorConfig.getRetryConfig(); + + this.retryPolicyBuilder = RetryPolicy.>builder() + .handleIf(e -> e instanceof ConnectException) + .handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode())) + .withMaxRetries(httpRetryConfig.getMaxRetries()) + .withDelay(Duration.ofMillis(httpRetryConfig.getInterval())); + } + + + /** + * Initializes the WebClient for making HTTP requests based on the provided SinkConnectorConfig. + */ + @Override + public void start() { + sinkHandler.start(); + } + + + /** + * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed. + * + * @param record the ConnectRecord to process + */ + @Override + public void handle(ConnectRecord record) { + for (URI url : this.urls) { + // convert ConnectRecord to HttpConnectRecord + String type = String.format("%s.%s.%s", + this.connectorConfig.getConnectorName(), url.getScheme(), + this.connectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common"); + HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type); + // handle the HttpConnectRecord + deliver(url, httpConnectRecord); + } + } + + + /** + * Processes HttpConnectRecord on specified URL while returning its own processing logic This method provides the retry power to process the + * HttpConnectRecord + * + * @param url URI to which the HttpConnectRecord should be sent + * @param httpConnectRecord HttpConnectRecord to process + * @return processing chain + */ + @Override + public Future> deliver(URI url, HttpConnectRecord httpConnectRecord) { + // Only webhook mode needs to use the UUID to identify the request + String id = httpConnectRecord.getUuid(); + + // Build the retry policy + RetryPolicy> retryPolicy = retryPolicyBuilder + .onSuccess(event -> { + if (connectorConfig.getWebhookConfig().isActivate()) { + // convert the result to an HttpExportRecord + HttpExportRecord exportRecord = covertToExportRecord(httpConnectRecord, event, event.getResult(), event.getException(), url, id); + // add the data to the queue + ((WebhookHttpSinkHandler) sinkHandler).addDataToQueue(exportRecord); + } + }) + .onRetry(event -> { + if (log.isDebugEnabled()) { + log.warn("Retrying the request to {} for the {} time. HttpConnectRecord= {}", url, event.getAttemptCount(), httpConnectRecord); + } else { + log.warn("Retrying the request to {} for the {} time.", url, event.getAttemptCount()); + } + if (connectorConfig.getWebhookConfig().isActivate()) { + HttpExportRecord exportRecord = + covertToExportRecord(httpConnectRecord, event, event.getLastResult(), event.getLastException(), url, id); + ((WebhookHttpSinkHandler) sinkHandler).addDataToQueue(exportRecord); + } + // update the HttpConnectRecord + httpConnectRecord.setTime(LocalDateTime.now().toString()); + httpConnectRecord.setUuid(UUID.randomUUID().toString()); + }) + .onFailure(event -> { + if (log.isDebugEnabled()) { + log.error("Failed to send the request to {} after {} attempts. HttpConnectRecord= {}", url, event.getAttemptCount(), + httpConnectRecord, event.getException()); + } else { + log.error("Failed to send the request to {} after {} attempts.", url, event.getAttemptCount(), event.getException()); + } + if (connectorConfig.getWebhookConfig().isActivate()) { + HttpExportRecord exportRecord = covertToExportRecord(httpConnectRecord, event, event.getResult(), event.getException(), url, id); + ((WebhookHttpSinkHandler) sinkHandler).addDataToQueue(exportRecord); + } + }).build(); + + // Handle the HttpConnectRecord with retry + Failsafe.with(retryPolicy) + .getStageAsync(() -> sinkHandler.deliver(url, httpConnectRecord).toCompletionStage()); + + return null; + } + + /** + * Converts the ExecutionCompletedEvent to an HttpExportRecord. + * + * @param httpConnectRecord HttpConnectRecord + * @param event ExecutionEvent + * @param response the response of the request, may be null + * @param e the exception thrown during the request, may be null + * @param url the URL the request was sent to + * @param id UUID + * @return the converted HttpExportRecord + */ + private HttpExportRecord covertToExportRecord(HttpConnectRecord httpConnectRecord, ExecutionEvent event, HttpResponse response, + Throwable e, URI url, String id) { + + HttpExportMetadata httpExportMetadata = HttpExportMetadata.builder() + .url(url.toString()) + .code(response != null ? response.statusCode() : -1) + .message(response != null ? response.statusMessage() : e.getMessage()) + .receivedTime(LocalDateTime.now()) + .uuid(httpConnectRecord.getUuid()) + .retriedBy(event.getAttemptCount() > 1 ? id : null) + .retryNum(event.getAttemptCount() - 1).build(); + + return new HttpExportRecord(httpExportMetadata, response == null ? null : response.bodyAsString()); + } + + /** + * Cleans up and releases resources used by the HTTP/HTTPS handler. + */ + @Override + public void stop() { + sinkHandler.stop(); + } +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java new file mode 100644 index 0000000000..e07683fcfa --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.handle; + +import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig; +import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; +import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata; +import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord; +import org.apache.eventmesh.connector.http.sink.data.HttpExportRecordPage; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import org.apache.commons.lang3.StringUtils; + +import java.net.URI; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Future; +import io.vertx.core.MultiMap; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.ext.web.handler.LoggerHandler; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONWriter; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * Extends CommonHttpSinkHandler to provide additional functionality for handling webhook features, including sending requests to callback servers, + * allowing longer response wait times, storing responses returned from callback servers, and exposing received data through an HTTP service. + */ +@Slf4j +public class WebhookHttpSinkHandler extends CommonHttpSinkHandler { + + private final SinkConnectorConfig sinkConnectorConfig; + + // the configuration for webhook + private final HttpWebhookConfig webhookConfig; + + // the server for exporting the received data + private HttpServer exportServer; + + // store the received data, when webhook is enabled + private final ConcurrentLinkedQueue receivedDataQueue; + + // the maximum queue size + private final int maxQueueSize; + + // the current queue size + private final AtomicInteger currentQueueSize; + + public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) { + super(sinkConnectorConfig); + this.sinkConnectorConfig = sinkConnectorConfig; + this.webhookConfig = sinkConnectorConfig.getWebhookConfig(); + this.maxQueueSize = this.webhookConfig.getMaxStorageSize(); + this.currentQueueSize = new AtomicInteger(0); + this.receivedDataQueue = new ConcurrentLinkedQueue<>(); + // init the export server + doInitExportServer(); + } + + /** + * Initialize the server for exporting the received data + */ + private void doInitExportServer() { + final Vertx vertx = Vertx.vertx(); + final Router router = Router.router(vertx); + // add logger handler + router.route().handler(LoggerHandler.create()); + // add export handler + router.route() + .path(this.webhookConfig.getExportPath()) + .method(HttpMethod.GET) + .produces("application/json") + .handler(ctx -> { + // Validate the request parameters + MultiMap params = ctx.request().params(); + String pageNumStr = params.get(ParamEnum.PAGE_NUM.getValue()); + String pageSizeStr = params.get(ParamEnum.PAGE_SIZE.getValue()); + String type = params.get(ParamEnum.TYPE.getValue()); + + // 1. type must be "poll" or "peek" or null + // 2. if type is "peek", pageNum must be greater than 0 + // 3. pageSize must be greater than 0 + if ((type != null && !Objects.equals(type, TypeEnum.PEEK.getValue()) && !Objects.equals(type, TypeEnum.POLL.getValue())) + || (Objects.equals(type, TypeEnum.PEEK.getValue()) && (StringUtils.isBlank(pageNumStr) || Integer.parseInt(pageNumStr) < 1)) + || (StringUtils.isBlank(pageSizeStr) || Integer.parseInt(pageSizeStr) < 1)) { + + // Return 400 Bad Request if the request parameters are invalid + ctx.response() + .putHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8") + .setStatusCode(HttpResponseStatus.BAD_REQUEST.code()) + .end(); + log.info("Invalid request parameters. pageNum: {}, pageSize: {}, type: {}", pageNumStr, pageSizeStr, type); + return; + } + + // Parse the request parameters + if (type == null) { + type = TypeEnum.PEEK.getValue(); + } + int pageNum = StringUtils.isBlank(pageNumStr) ? 1 : Integer.parseInt(pageNumStr); + int pageSize = Integer.parseInt(pageSizeStr); + + if (currentQueueSize.get() == 0) { + ctx.response() + .putHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8") + .setStatusCode(HttpResponseStatus.NO_CONTENT.code()) + .end(); + log.info("No callback data to export."); + return; + } + + // Get the received data + List exportRecords; + if (Objects.equals(type, TypeEnum.POLL.getValue())) { + // If the type is poll, only the first page of data is exported and removed + exportRecords = getDataFromQueue(0, pageSize, true); + } else { + // If the type is peek, the specified page of data is exported without removing + int startIndex = (pageNum - 1) * pageSize; + int endIndex = startIndex + pageSize; + exportRecords = getDataFromQueue(startIndex, endIndex, false); + } + + // Create HttpExportRecordPage + HttpExportRecordPage page = new HttpExportRecordPage(pageNum, exportRecords.size(), exportRecords); + + // export the received data + ctx.response() + .putHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8") + .setStatusCode(HttpResponseStatus.OK.code()) + .send(JSON.toJSONString(page, JSONWriter.Feature.WriteMapNullValue)); + if (log.isDebugEnabled()) { + log.debug("Succeed to export callback data. Data: {}", page); + } else { + log.info("Succeed to export callback data."); + } + }); + // create the export server + this.exportServer = vertx.createHttpServer(new HttpServerOptions() + .setPort(this.webhookConfig.getPort()) + .setIdleTimeout(this.webhookConfig.getServerIdleTimeout()) + .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)).requestHandler(router); + } + + /** + * Starts the HTTP/HTTPS handler by creating a WebClient with configured options and starting the export server. + */ + @Override + public void start() { + // start the webclient + super.start(); + // start the export server + Throwable t = this.exportServer.listen().cause(); + if (t != null) { + throw new EventMeshException("Failed to start Vertx server. ", t); + } + } + + /** + * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed. + * + * @param record the ConnectRecord to process + */ + @Override + public void handle(ConnectRecord record) { + for (URI url : super.getUrls()) { + // convert ConnectRecord to HttpConnectRecord + String type = String.format("%s.%s.%s", this.getConnectorConfig().getConnectorName(), url.getScheme(), "webhook"); + HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type); + // handle the HttpConnectRecord + deliver(url, httpConnectRecord); + } + } + + + /** + * Processes HttpConnectRecord on specified URL while returning its own processing logic This method sends the HttpConnectRecord to the specified + * URL by super class method and stores the received data. + * + * @param url URI to which the HttpConnectRecord should be sent + * @param httpConnectRecord HttpConnectRecord to process + * @return processing chain + */ + @Override + public Future> deliver(URI url, HttpConnectRecord httpConnectRecord) { + // send the request + Future> responseFuture = super.deliver(url, httpConnectRecord); + // store the received data + return responseFuture.onComplete(arr -> { + // If open retry, return directly and handled by RetryHttpSinkHandler + if (sinkConnectorConfig.getRetryConfig().getMaxRetries() > 0) { + return; + } + // create ExportMetadataBuilder + HttpResponse response = arr.succeeded() ? arr.result() : null; + + HttpExportMetadata httpExportMetadata = HttpExportMetadata.builder() + .url(url.toString()) + .code(response != null ? response.statusCode() : -1) + .message(response != null ? response.statusMessage() : arr.cause().getMessage()) + .receivedTime(LocalDateTime.now()) + .retriedBy(null) + .uuid(httpConnectRecord.getUuid()) + .retryNum(0) + .build(); + + // create ExportRecord + HttpExportRecord exportRecord = new HttpExportRecord(httpExportMetadata, arr.succeeded() ? arr.result().bodyAsString() : null); + // add the data to the queue + addDataToQueue(exportRecord); + }); + } + + + /** + * Adds the received data to the queue. + * + * @param exportRecord the received data to add to the queue + */ + public void addDataToQueue(HttpExportRecord exportRecord) { + // If the current queue size is greater than or equal to the maximum queue size, remove the oldest element + if (currentQueueSize.get() >= maxQueueSize) { + Object removedData = receivedDataQueue.poll(); + if (log.isDebugEnabled()) { + log.debug("The queue is full, remove the oldest element: {}", removedData); + } else { + log.info("The queue is full, remove the oldest element"); + } + currentQueueSize.decrementAndGet(); + } + // Try to put the received data into the queue + if (receivedDataQueue.offer(exportRecord)) { + currentQueueSize.incrementAndGet(); + log.debug("Successfully put the received data into the queue: {}", exportRecord); + } else { + log.error("Failed to put the received data into the queue: {}", exportRecord); + } + } + + /** + * Gets the received data from the queue. + * + * @param startIndex the start index of the data to get + * @param endIndex the end index of the data to get + * @param removed whether to remove the data from the queue + * @return the received data + */ + private List getDataFromQueue(int startIndex, int endIndex, boolean removed) { + Iterator iterator = receivedDataQueue.iterator(); + + List pageItems = new ArrayList<>(endIndex - startIndex); + int count = 0; + while (iterator.hasNext() && count < endIndex) { + HttpExportRecord item = iterator.next(); + if (count >= startIndex) { + pageItems.add(item); + if (removed) { + iterator.remove(); + currentQueueSize.decrementAndGet(); + } + } + count++; + } + return pageItems; + } + + /** + * Cleans up and releases resources used by the HTTP/HTTPS handler. + */ + @Override + public void stop() { + // stop the webclient + super.stop(); + // stop the export server + if (this.exportServer != null) { + Throwable t = this.exportServer.close().cause(); + if (t != null) { + throw new EventMeshException("Failed to stop Vertx server. ", t); + } + } else { + log.warn("Callback server is null, ignore."); + } + } + + + @Getter + public enum ParamEnum { + PAGE_NUM("pageNum"), + PAGE_SIZE("pageSize"), + TYPE("type"); + + private final String value; + + ParamEnum(String value) { + this.value = value; + } + + } + + + @Getter + public enum TypeEnum { + POLL("poll"), + PEEK("peek"); + + private final String value; + + TypeEnum(String value) { + this.value = value; + } + + } +} \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/util/HttpUtils.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/util/HttpUtils.java new file mode 100644 index 0000000000..79f9fd120d --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/util/HttpUtils.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.util; + +public class HttpUtils { + + /** + * Checks if the status code represents a successful response (2xx). + * + * @param statusCode the HTTP status code to check + * @return true if the status code is 2xx, false otherwise + */ + public static boolean is2xxSuccessful(int statusCode) { + int seriesCode = statusCode / 100; + return seriesCode == 2; + } +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml new file mode 100644 index 0000000000..f740cf7cd1 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml @@ -0,0 +1,46 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pubSubConfig: + meshAddress: 127.0.0.1:10000 + subject: TopicTest + idc: FT + env: PRD + group: httpSink + appId: 5032 + userName: httpSinkUser + passWord: httpPassWord +connectorConfig: + connectorName: httpSink + urls: + - http://127.0.0.1:8987/test + ssl: false + keepAlive: true + keepAliveTimeout: 60000 + idleTimeout: 5000 # timeunit: ms, recommended scope: common(5s - 10s), webhook(15s - 60s) + connectionTimeout: 5000 # timeunit: ms, recommended scope: 5 - 10s + maxConnectionPoolSize: 5 + retryConfig: + maxRetries: 2 + interval: 1000 + retryOnNonSuccess: false + webhookConfig: + activate: false + exportPath: /export + port: 8988 + serverIdleTimeout: 5000 + maxStorageSize: 5000 diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java new file mode 100644 index 0000000000..738df6430b --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.source.connector; + +import static org.mockserver.model.HttpRequest.request; + +import org.apache.eventmesh.connector.http.sink.HttpSinkConnector; +import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig; +import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition; +import org.apache.eventmesh.openconnect.util.ConfigUtil; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.model.HttpRequest; +import org.mockserver.model.HttpResponse; +import org.mockserver.model.MediaType; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; + +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; + +public class HttpSinkConnectorTest { + + private HttpSinkConnector sinkConnector; + + private HttpSinkConfig sinkConfig; + + private URI severUri; + + private ClientAndServer mockServer; + + + @BeforeEach + void before() throws Exception { + // init sinkConnector + this.sinkConnector = new HttpSinkConnector(); + this.sinkConfig = (HttpSinkConfig) ConfigUtil.parse(sinkConnector.configClass()); + this.sinkConnector.init(this.sinkConfig); + this.sinkConnector.start(); + + this.severUri = URI.create(sinkConfig.connectorConfig.getUrls()[0]); + // start mockServer + mockServer = ClientAndServer.startClientAndServer(severUri.getPort()); + mockServer.reset() + .when( + request() + .withMethod("POST") + .withPath(severUri.getPath()) + ) + .respond( + httpRequest -> { + JSONObject requestBody = JSON.parseObject(httpRequest.getBodyAsString()); + return HttpResponse.response() + .withContentType(MediaType.APPLICATION_JSON) + .withStatusCode(200) + .withBody(new JSONObject() + .fluentPut("code", 0) + .fluentPut("message", "success") + .fluentPut("data", requestBody.getJSONObject("data").get("data")) + .toJSONString() + ); // .withDelay(TimeUnit.SECONDS, 10); + } + ); + } + + @AfterEach + void after() throws Exception { + this.sinkConnector.stop(); + this.mockServer.close(); + } + + @Test + void testPut() throws Exception { + // Create a list of ConnectRecord + final int times = 10; + List connectRecords = new ArrayList<>(); + for (int i = 0; i < times; i++) { + ConnectRecord record = createConnectRecord(); + connectRecords.add(record); + } + // Put ConnectRecord + sinkConnector.put(connectRecords); + + // sleep 5s + Thread.sleep(5000); + + // verify request + HttpRequest[] recordedRequests = mockServer.retrieveRecordedRequests(null); + assert recordedRequests.length == times; + + // verify response + HttpWebhookConfig webhookConfig = sinkConfig.connectorConfig.getWebhookConfig(); + String url = new HttpUrl.Builder() + .scheme("http") + .host(severUri.getHost()) + .port(webhookConfig.getPort()) + .addPathSegments(webhookConfig.getExportPath()) + .addQueryParameter("pageNum", "1") + .addQueryParameter("pageSize", "10") + .addQueryParameter("type", "poll") + .build().toString(); + + // build request + Request request = new Request.Builder() + .url(url) + .addHeader("Content-Type", "application/json") + .build(); + + OkHttpClient client = new OkHttpClient(); + try (Response response = client.newCall(request).execute()) { + // check response code + if (!response.isSuccessful()) { + throw new RuntimeException("Unexpected response code: " + response); + } + // check response body + ResponseBody responseBody = response.body(); + if (responseBody != null) { + JSONObject jsonObject = JSON.parseObject(responseBody.string()); + JSONArray pageItems = jsonObject.getJSONArray("pageItems"); + + assert pageItems != null && pageItems.size() == times; + + for (int i = 0; i < times; i++) { + JSONObject pageItem = pageItems.getJSONObject(i); + assert pageItem != null; + assert pageItem.getJSONObject("data") != null; + assert pageItem.getJSONObject("metadata") != null; + } + } + } + } + + private ConnectRecord createConnectRecord() { + RecordPartition partition = new RecordPartition(); + RecordOffset offset = new RecordOffset(); + long timestamp = System.currentTimeMillis(); + return new ConnectRecord(partition, offset, timestamp, UUID.randomUUID().toString()); + } +} diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml new file mode 100644 index 0000000000..149ad7681b --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/sink-config.yml @@ -0,0 +1,46 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pubSubConfig: + meshAddress: 127.0.0.1:10000 + subject: TopicTest + idc: FT + env: PRD + group: httpSink + appId: 5032 + userName: httpSinkUser + passWord: httpPassWord +connectorConfig: + connectorName: httpSink + urls: + - http://127.0.0.1:8987/test + ssl: false + keepAlive: true + keepAliveTimeout: 60000 + idleTimeout: 15000 # timeunit: ms, recommended scope: common(5s - 10s), webhook(15s - 60s) + connectionTimeout: 5000 # timeunit: ms, recommended scope: 5 - 10s + maxConnectionPoolSize: 10 + retryConfig: + maxRetries: 2 + interval: 1000 + retryOnNonSuccess: true + webhookConfig: + activate: true + exportPath: /export + port: 8988 + serverIdleTimeout: 5000 + maxStorageSize: 5000 \ No newline at end of file diff --git a/tools/dependency-check/known-dependencies.txt b/tools/dependency-check/known-dependencies.txt index b30ca5d5ed..7d6e4a4606 100644 --- a/tools/dependency-check/known-dependencies.txt +++ b/tools/dependency-check/known-dependencies.txt @@ -47,6 +47,7 @@ classmate-1.5.1.jar cloudevents-api-2.4.2.jar cloudevents-core-2.4.2.jar cloudevents-http-vertx-2.3.0.jar +cloudevents-http-vertx-3.0.0.jar cloudevents-json-jackson-2.4.2.jar cloudevents-kafka-2.4.2.jar cloudevents-protobuf-2.4.2.jar @@ -77,6 +78,7 @@ endpoint-util-0.0.7.jar endpoints-spi-2.20.29.jar error_prone_annotations-2.9.0.jar eventstream-1.0.1.jar +failsafe-3.3.2.jar failureaccess-1.0.1.jar fastjson-1.2.69_noneautotype.jar fastjson2-2.0.48.jar @@ -348,7 +350,9 @@ vertx-bridge-common-4.4.6.jar vertx-core-4.4.6.jar vertx-web-4.4.6.jar vertx-web-client-4.0.0.jar +vertx-web-client-4.4.6.jar vertx-web-common-4.4.6.jar +vertx-uri-template-4.4.6.jar xpp3-1.1.4c.jar xsdlib-2013.6.1.jar zipkin-2.23.2.jar