Skip to content

Commit

Permalink
Merge branch 'master' into pil0txia_enhance_4697
Browse files Browse the repository at this point in the history
# Conflicts:
#	eventmesh-common/src/main/java/org/apache/eventmesh/common/file/WatchFileTask.java
#	eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
  • Loading branch information
Pil0tXia committed Jan 11, 2024
2 parents 56b3d16 + 21731e8 commit a88e2f7
Show file tree
Hide file tree
Showing 28 changed files with 283 additions and 151 deletions.
20 changes: 20 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
*.sh text eol=lf
gradlew text eol=lf
*.{cmd,[cC][mM][dD]} text eol=crlf
*.{bat,[bB][aA][tT]} text eol=crlf
14 changes: 4 additions & 10 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,8 @@ jobs:
git submodule update
make -C ./eventmesh-sdks/eventmesh-sdk-c
- name: Cache Gradle packages
uses: actions/cache@v3
with:
path: |
~/.gradle/caches
~/.gradle/wrapper
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }}
restore-keys: ${{ runner.os }}-gradle-
- name: Setup Gradle
uses: gradle/gradle-build-action@v2

- name: Set up JDK 11
uses: actions/setup-java@v3
Expand All @@ -82,7 +76,7 @@ jobs:

# https://docs.gradle.org/current/userguide/performance.html
- name: Build
run: ./gradlew clean build jar dist jacocoTestReport -x spotlessJava -x generateGrammarSource --parallel --daemon
run: ./gradlew clean build dist jacocoTestReport -x spotlessJava -x generateGrammarSource --parallel --daemon
env:
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}

Expand Down Expand Up @@ -112,4 +106,4 @@ jobs:

- name: Check third party dependencies
run: |
./gradlew clean jar dist -x spotlessJava -x test -x checkstyleMain -x javaDoc && ./gradlew installPlugin && ./gradlew tar && sh tools/dependency-check/check-dependencies.sh && echo "Thirty party dependencies check success"
./gradlew clean dist -x spotlessJava -x test -x checkstyleMain -x javaDoc && ./gradlew installPlugin && ./gradlew tar && sh tools/dependency-check/check-dependencies.sh && echo "Thirty party dependencies check success"
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ subprojects {
licenses {
license {
name = 'The Apache License, Version 2.0'
url = 'http://www.apache.org/licenses/LICENSE-2.0.txt'
url = 'https://www.apache.org/licenses/LICENSE-2.0.txt'
}
}
developers {
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile_jdk11
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
FROM openjdk:11-jdk as builder
WORKDIR /build
COPY . .
RUN ./gradlew clean build jar dist --parallel --daemon
RUN ./gradlew clean build dist --parallel --daemon
RUN ./gradlew installPlugin

FROM openjdk:11-jdk
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile_jdk8
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ RUN ./gradlew clean generateGrammarSource --parallel --daemon
FROM openjdk:8-jdk as builder_8
WORKDIR /build
COPY --from=builder_11 /build ./
RUN ./gradlew clean build jar dist -x spotlessJava -x generateGrammarSource --parallel --daemon
RUN ./gradlew clean build dist -x spotlessJava -x generateGrammarSource --parallel --daemon
RUN ./gradlew installPlugin

FROM openjdk:8-jdk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,21 @@ public WatchFileTask(String directoryPath) {
throw new IllegalArgumentException("must be a file directory : " + directoryPath);
}

try (WatchService watchService = FILE_SYSTEM.newWatchService()) {
this.watchService = watchService;
try {
this.watchService = FILE_SYSTEM.newWatchService();
} catch (IOException ex) {
throw new RuntimeException("WatchService initialization fail", ex);
}

try {
path.register(this.watchService, StandardWatchEventKinds.OVERFLOW, StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
} catch (Exception ex) {
} catch (IOException ex) {
try {
this.watchService.close();
} catch (IOException e) {
ex.addSuppressed(e);
}
throw new UnsupportedOperationException("WatchService registry fail", ex);
}
}
Expand All @@ -70,6 +80,11 @@ public void addFileChangeListener(FileChangeListener fileChangeListener) {

public void shutdown() {
watch = false;
try {
this.watchService.close();
} catch (IOException e) {
throw new RuntimeException("Unable to close WatchService", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,39 +31,28 @@
import org.apache.commons.lang3.StringUtils;

import java.util.Arrays;
import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.ReplayingDecoder;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Codec extends ByteToMessageCodec<Package> {
public class Codec {

private static final int FRAME_MAX_LENGTH = 1024 * 1024 * 4;

private static final byte[] CONSTANT_MAGIC_FLAG = serializeBytes("EventMesh");
private static final byte[] VERSION = serializeBytes("0000");

private Encoder encoder = new Encoder();
private Decoder decoder = new Decoder();
private static final int PREFIX_LENGTH = CONSTANT_MAGIC_FLAG.length + VERSION.length; //13

@Override
protected void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws Exception {
encoder.encode(ctx, pkg, out);
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
decoder.decode(ctx, in, out);
}
private static final int PACKAGE_BYTES_FIELD_LENGTH = 4;

public static class Encoder extends MessageToByteEncoder<Package> {

Expand All @@ -86,7 +75,7 @@ public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws E
int headerLength = ArrayUtils.getLength(headerData);
int bodyLength = ArrayUtils.getLength(bodyData);

final int length = CONSTANT_MAGIC_FLAG.length + VERSION.length + headerLength + bodyLength;
final int length = PREFIX_LENGTH + headerLength + bodyLength;

if (length > FRAME_MAX_LENGTH) {
throw new IllegalArgumentException("message size is exceed limit!");
Expand All @@ -113,31 +102,62 @@ public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws E
}
}

public static class Decoder extends ReplayingDecoder<Package> {
public static class Decoder extends LengthFieldBasedFrameDecoder {

public Decoder() {
/**
* lengthAdjustment value = -9 explain:
* Header + Body, Format:
* <pre>
* ┌───────────────┬─────────────┬──────────────────┬──────────────────┬──────────────────┬─────────────────┐
* │ MAGIC_FLAG │ VERSION │ package length │ Header length │ Header │ body │
* │ (9bytes) │ (4bytes) │ (4bytes) │ (4bytes) │ (header bytes) │ (body bytes) │
* └───────────────┴─────────────┴──────────────────┴──────────────────┴──────────────────┴─────────────────┘
* </pre>
* package length = MAGIC_FLAG + VERSION + Header length + Body length,Currently,
* adding MAGIC_FLAG + VERSION + package length field (4 bytes) actually adds 17 bytes.
* However, the value of the package length field is only reduced by the four bytes of
* the package length field itself and the four bytes of the header length field.
* Therefore, the compensation value to be added to the length field value is -9,
* which means subtracting the extra 9 bytes.
* Refer to the encoding in the {@link Encoder}
*/
super(FRAME_MAX_LENGTH, PREFIX_LENGTH, PACKAGE_BYTES_FIELD_LENGTH, -9, 0);
}

@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {

ByteBuf target = null;

try {
if (null == in) {
return;
target = (ByteBuf) super.decode(ctx, in);
if (null == target) {
return null;
}

byte[] flagBytes = parseFlag(in);
byte[] versionBytes = parseVersion(in);
byte[] flagBytes = parseFlag(target);
byte[] versionBytes = parseVersion(target);
validateFlag(flagBytes, versionBytes, ctx);

final int length = in.readInt();
final int headerLength = in.readInt();
final int bodyLength = length - CONSTANT_MAGIC_FLAG.length - VERSION.length - headerLength;
Header header = parseHeader(in, headerLength);
Object body = parseBody(in, header, bodyLength);
final int length = target.readInt();
final int headerLength = target.readInt();
final int bodyLength = length - PREFIX_LENGTH - headerLength;
Header header = parseHeader(target, headerLength);
Object body = parseBody(target, header, bodyLength);

Package pkg = new Package(header, body);
out.add(pkg);
} catch (Exception e) {
log.error("decode error| received data: {}.", deserializeBytes(in.array()), e);
throw e;
return pkg;

} catch (Exception ex) {
log.error("decode error", ex);
ctx.channel().close();
} finally {
if (target != null) {
target.release();
}
}

return null;
}

private byte[] parseFlag(ByteBuf in) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,19 @@ public static String getLocalAddress() {
List<String> list = Arrays.asList(priority.split("<"));
ArrayList<String> preferList = new ArrayList<>(list);
NetworkInterface preferNetworkInterface = null;
boolean isInterfacePreferred = false;

try {
Enumeration<NetworkInterface> enumeration1 = NetworkInterface.getNetworkInterfaces();
while (enumeration1.hasMoreElements()) {
final NetworkInterface networkInterface = enumeration1.nextElement();
String interfaceName = networkInterface.getName();
if (!isInterfacePreferred && preferList.contains(interfaceName)) {
isInterfacePreferred = true;
}
if (preferNetworkInterface == null) {
preferNetworkInterface = networkInterface;
} else if (preferList.indexOf(networkInterface.getName()) // get the networkInterface that has higher priority
} else if (preferList.indexOf(interfaceName) // get the networkInterface that has higher priority
> preferList.indexOf(preferNetworkInterface.getName())) {
preferNetworkInterface = networkInterface;
}
Expand All @@ -76,7 +81,7 @@ public static String getLocalAddress() {
ArrayList<String> ipv4Result = new ArrayList<String>();
ArrayList<String> ipv6Result = new ArrayList<String>();

if (preferNetworkInterface != null) {
if (preferNetworkInterface != null && isInterfacePreferred) {
final Enumeration<InetAddress> en = preferNetworkInterface.getInetAddresses();
getIpResult(ipv4Result, ipv6Result, en);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@

package org.apache.eventmesh.common.file;

import org.apache.eventmesh.common.utils.ThreadUtils;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;

public class WatchFileManagerTest {

Expand All @@ -40,31 +38,32 @@ public class WatchFileManagerTest {
@Test
public void testWatchFile() throws IOException, InterruptedException {
String file = WatchFileManagerTest.class.getResource("/configuration.properties").getFile();
File f = new File(file);
File configFile = new File(file);
File tempConfigFile = new File(tempConfigDir, "configuration.properties");
Files.copy(f.toPath(), tempConfigFile.toPath());

final FileChangeListener fileChangeListener = new FileChangeListener() {
Files.copy(configFile.toPath(), tempConfigFile.toPath());

@Override
public void onChanged(FileChangeContext changeContext) {
Assertions.assertEquals(tempConfigFile.getName(), changeContext.getFileName());
Assertions.assertEquals(tempConfigFile.getParent(), changeContext.getDirectoryPath());
}
final FileChangeListener mockFileChangeListener = Mockito.mock(FileChangeListener.class);
Mockito.when(mockFileChangeListener.support(
Mockito.argThat(isFileUnderTest(tempConfigFile.getParent(), tempConfigFile.getName())))
).thenReturn(true);

@Override
public boolean support(FileChangeContext changeContext) {
return changeContext.getWatchEvent().context().toString().contains(tempConfigFile.getName());
}
};
WatchFileManager.registerFileChangeListener(tempConfigFile.getParent(), fileChangeListener);
WatchFileManager.registerFileChangeListener(tempConfigFile.getParent(), mockFileChangeListener);

Properties properties = new Properties();
properties.load(new BufferedReader(new FileReader(tempConfigFile)));
properties.setProperty("eventMesh.server.newAdd", "newAdd");
FileWriter fw = new FileWriter(tempConfigFile);
properties.store(fw, "newAdd");
try (BufferedReader bufferedReader = new BufferedReader(new FileReader(tempConfigFile))) {
properties.load(bufferedReader);
}

try (FileWriter fw = new FileWriter(tempConfigFile)) {
properties.setProperty("eventMesh.server.newAdd", "newAdd");
properties.store(fw, "newAdd");
}

Mockito.verify(mockFileChangeListener, Mockito.timeout(15_000).atLeastOnce())
.onChanged(Mockito.argThat(isFileUnderTest(tempConfigFile.getParent(), tempConfigFile.getName())));
}

ThreadUtils.sleep(500, TimeUnit.MILLISECONDS);
private ArgumentMatcher<FileChangeContext> isFileUnderTest(String directoryPath, String fileName) {
return argument -> argument.getDirectoryPath().equals(directoryPath) && argument.getFileName().equals(fileName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;

import java.util.ArrayList;
import org.apache.eventmesh.common.protocol.tcp.codec.Codec.Decoder;
import org.apache.eventmesh.common.protocol.tcp.codec.Codec.Encoder;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand All @@ -37,14 +37,12 @@ public void testCodec() throws Exception {
header.setCmd(Command.HELLO_REQUEST);
Package testP = new Package(header);
testP.setBody(new Object());
Codec.Encoder ce = new Codec.Encoder();
Encoder ce = new Codec.Encoder();
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer();
ce.encode(null, testP, buf);
Codec.Decoder cd = new Codec.Decoder();
ArrayList<Object> result = new ArrayList<>();
cd.decode(null, buf, result);
Assertions.assertNotNull(result.get(0));
Assertions.assertEquals(testP.getHeader(), ((Package) result.get(0)).getHeader());
Decoder cd = new Codec.Decoder();
final Package decode = (Package) cd.decode(null, buf);
Assertions.assertEquals(testP.getHeader(), decode.getHeader());
}

}
1 change: 1 addition & 0 deletions eventmesh-connectors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Connector

A connector is an image or instance that interacts with a specific external service or underlying data source (e.g., Databases) on behalf of user applications. A connector is either a Source or a Sink.

Connector runs as a standalone service by `main()`.

## Source
Expand Down
Loading

0 comments on commit a88e2f7

Please sign in to comment.