diff --git a/build.gradle b/build.gradle index ec8bc4a458..98619ed846 100644 --- a/build.gradle +++ b/build.gradle @@ -574,6 +574,9 @@ subprojects { dependency "software.amazon.awssdk:s3:2.20.29" dependency "com.github.rholder:guava-retrying:2.0.0" + dependency "org.mybatis.spring.boot:mybatis-spring-boot-starter:2.3.2" + dependency "com.alibaba:druid-spring-boot-starter:1.2.22" + dependency "org.springframework.boot:spring-boot-starter-jetty:2.7.10" } } } diff --git a/eventmesh-admin-server/.gitignore b/eventmesh-admin-server/.gitignore new file mode 100644 index 0000000000..b63da4551b --- /dev/null +++ b/eventmesh-admin-server/.gitignore @@ -0,0 +1,42 @@ +.gradle +build/ +!gradle/wrapper/gradle-wrapper.jar +!**/src/main/**/build/ +!**/src/test/**/build/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr +out/ +!**/src/main/**/out/ +!**/src/test/**/out/ + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache +bin/ +!**/src/main/**/bin/ +!**/src/test/**/bin/ + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/eventmesh-admin-server/build.gradle b/eventmesh-admin-server/build.gradle new file mode 100644 index 0000000000..63ed5a4568 --- /dev/null +++ b/eventmesh-admin-server/build.gradle @@ -0,0 +1,16 @@ +dependencies { + implementation project(":eventmesh-spi") + implementation project(":eventmesh-common") + implementation "com.alibaba.nacos:nacos-client" + implementation ("org.springframework.boot:spring-boot-starter-web") { + exclude group: "org.springframework.boot" ,module: "spring-boot-starter-tomcat" + } + implementation 'org.springframework.boot:spring-boot-starter-jetty' + + implementation "org.mybatis.spring.boot:mybatis-spring-boot-starter" + // https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter + implementation "com.alibaba:druid-spring-boot-starter" + compileOnly 'org.projectlombok:lombok' + annotationProcessor 'org.projectlombok:lombok' +} + diff --git a/eventmesh-admin-server/gradle.properties b/eventmesh-admin-server/gradle.properties new file mode 100644 index 0000000000..a9fd83fea0 --- /dev/null +++ b/eventmesh-admin-server/gradle.properties @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java new file mode 100644 index 0000000000..1090f7b592 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java @@ -0,0 +1,24 @@ +package com.apache.eventmesh.admin.server; + +import org.apache.eventmesh.common.utils.PagedList; + +import com.apache.eventmesh.admin.server.task.Task; + +public interface Admin extends ComponentLifeCycle{ + /** + * support for web or ops + **/ + boolean createOrUpdateTask(Task task); + boolean deleteTask(Long id); + Task getTask(Long id); + // paged list + PagedList getTaskPaged(Task task); + + /** + * support for task + */ + void reportHeartbeat(HeartBeat heartBeat); + + + +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminException.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminException.java new file mode 100644 index 0000000000..eca5eeb0df --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminException.java @@ -0,0 +1,11 @@ +package com.apache.eventmesh.admin.server; + +public class AdminException extends RuntimeException { + public AdminException(String message) { + super(message); + } + + public AdminException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java new file mode 100644 index 0000000000..a00182361a --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java @@ -0,0 +1,57 @@ +package com.apache.eventmesh.admin.server; + +import com.apache.eventmesh.admin.server.registry.EventMeshAdminServerRegisterInfo; +import com.apache.eventmesh.admin.server.registry.RegistryService; +import org.apache.eventmesh.common.utils.PagedList; + +import com.apache.eventmesh.admin.server.task.Task; + +public class AdminServer implements Admin { + + private RegistryService registryService; + + private EventMeshAdminServerRegisterInfo registerInfo; + + public AdminServer(RegistryService registryService, EventMeshAdminServerRegisterInfo registerInfo) { + this.registryService = registryService; + this.registerInfo = registerInfo; + } + + public static final String ConfigurationKey = "admin-server"; + @Override + public boolean createOrUpdateTask(Task task) { + return false; + } + + @Override + public boolean deleteTask(Long id) { + return false; + } + + @Override + public Task getTask(Long id) { + return null; + } + + @Override + public PagedList getTaskPaged(Task task) { + return null; + } + + @Override + public void reportHeartbeat(HeartBeat heartBeat) { + + } + + @Override + public void start() { + + registryService.register(registerInfo); + } + + @Override + public void destroy() { + registryService.unRegister(registerInfo); + registryService.shutdown(); + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ComponentLifeCycle.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ComponentLifeCycle.java new file mode 100644 index 0000000000..76abd005be --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ComponentLifeCycle.java @@ -0,0 +1,6 @@ +package com.apache.eventmesh.admin.server; + +public interface ComponentLifeCycle { + void start(); + void destroy(); +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/HeartBeat.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/HeartBeat.java new file mode 100644 index 0000000000..b8a28c4bd3 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/HeartBeat.java @@ -0,0 +1,12 @@ +package com.apache.eventmesh.admin.server; + +import com.apache.eventmesh.admin.server.task.JobState; +import com.apache.eventmesh.admin.server.task.Position; + +public class HeartBeat { + private String address; + private String reportedTimeStamp; + private String jobID; + private Position position; + private JobState state; +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/AbstractRegistryListener.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/AbstractRegistryListener.java new file mode 100644 index 0000000000..cdcc169790 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/AbstractRegistryListener.java @@ -0,0 +1,14 @@ +package com.apache.eventmesh.admin.server.registry; + +public abstract class AbstractRegistryListener implements RegistryListener { + protected abstract boolean checkType(Object data); + @Override + @SuppressWarnings("unchecked") + public void onChange(Object data) { + if (!checkType(data)) { + return; + } + process((T)data); + } + protected abstract void process(T data); +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/EventMeshAdminServerConfiguration.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/EventMeshAdminServerConfiguration.java new file mode 100644 index 0000000000..dc436b28de --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/EventMeshAdminServerConfiguration.java @@ -0,0 +1,32 @@ +package com.apache.eventmesh.admin.server.registry; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import org.apache.eventmesh.common.config.CommonConfiguration; +import org.apache.eventmesh.common.config.Config; +import org.apache.eventmesh.common.config.ConfigFiled; + +@Data +@NoArgsConstructor +@EqualsAndHashCode(callSuper = true) +@Config(prefix = "eventMesh.admin") +public class EventMeshAdminServerConfiguration extends CommonConfiguration { + @ConfigFiled(field = "server.http.port") + private int eventMeshHttpServerPort = 10000; + + @ConfigFiled(field = "server.gRPC.port") + private int eventMeshGrpcServerPort = 10000; + + @ConfigFiled(field = "registry.plugin.server-addr", notEmpty = true) + private String registryCenterAddr = ""; + + @ConfigFiled(field = "registry.plugin.type", notEmpty = true) + private String eventMeshRegistryPluginType = "nacos"; + + @ConfigFiled(field = "registry.plugin.username") + private String eventMeshRegistryPluginUsername = ""; + + @ConfigFiled(field = "registry.plugin.password") + private String eventMeshRegistryPluginPassword = ""; +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/EventMeshAdminServerRegisterInfo.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/EventMeshAdminServerRegisterInfo.java new file mode 100644 index 0000000000..c51ae64178 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/EventMeshAdminServerRegisterInfo.java @@ -0,0 +1,14 @@ +package com.apache.eventmesh.admin.server.registry; + +import lombok.Data; + +import java.util.Map; + +@Data +public class EventMeshAdminServerRegisterInfo { + private String eventMeshClusterName; + private String eventMeshName; + private String address; + + private Map metadata; +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/NacosDiscoveryService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/NacosDiscoveryService.java new file mode 100644 index 0000000000..cd4fb11032 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/NacosDiscoveryService.java @@ -0,0 +1,197 @@ +package com.apache.eventmesh.admin.server.registry; + +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.NamingService; +import com.alibaba.nacos.api.naming.listener.Event; +import com.alibaba.nacos.api.naming.listener.EventListener; +import com.alibaba.nacos.api.naming.pojo.Instance; +import com.alibaba.nacos.api.naming.pojo.ServiceInfo; +import com.alibaba.nacos.client.naming.utils.UtilAndComs; +import com.apache.eventmesh.admin.server.AdminException; +import com.apache.eventmesh.admin.server.AdminServer; +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.common.config.CommonConfiguration; +import org.apache.eventmesh.common.config.ConfigService; +import org.apache.eventmesh.common.utils.ConfigurationContextUtil; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +@Slf4j +public class NacosDiscoveryService implements RegistryService { + private final AtomicBoolean initFlag = new AtomicBoolean(false); + + private EventMeshAdminServerConfiguration adminConf; + + private NacosRegistryConfiguration nacosConf; + + private NamingService namingService; + + private final Map> listeners = new HashMap<>(); + + private final Lock lock = new ReentrantLock(); + private static final String GROUP_NAME = "admin"; + + @Override + public void init() throws AdminException { + if (!initFlag.compareAndSet(false, true)) { + return; + } + CommonConfiguration configuration = ConfigurationContextUtil.get(AdminServer.ConfigurationKey); + if (!(configuration instanceof EventMeshAdminServerConfiguration)) { + throw new AdminException("registry config instance is null or not match type"); + } + + adminConf = (EventMeshAdminServerConfiguration)configuration; + NacosRegistryConfiguration nacosConf = ConfigService.getInstance().buildConfigInstance(NacosRegistryConfiguration.class); + if (nacosConf != null) { + this.nacosConf = nacosConf; + } + Properties properties = buildProperties(); + // registry + try { + this.namingService = NacosFactory.createNamingService(properties); + } catch (NacosException e) { + log.error("[NacosRegistryService][start] error", e); + throw new AdminException(e.getMessage()); + } + } + + private Properties buildProperties() { + Properties properties = new Properties(); + properties.setProperty(PropertyKeyConst.SERVER_ADDR, adminConf.getRegistryCenterAddr()); + properties.setProperty(PropertyKeyConst.USERNAME, adminConf.getEventMeshRegistryPluginUsername()); + properties.setProperty(PropertyKeyConst.PASSWORD, adminConf.getEventMeshRegistryPluginPassword()); + if (nacosConf == null) { + return properties; + } + String endpoint = nacosConf.getEndpoint(); + if (Objects.nonNull(endpoint) && endpoint.contains(":")) { + int index = endpoint.indexOf(":"); + properties.put(PropertyKeyConst.ENDPOINT, endpoint.substring(0, index)); + properties.put(PropertyKeyConst.ENDPOINT_PORT, endpoint.substring(index + 1)); + } else { + Optional.ofNullable(endpoint).ifPresent(value -> properties.put(PropertyKeyConst.ENDPOINT, endpoint)); + String endpointPort = nacosConf.getEndpointPort(); + Optional.ofNullable(endpointPort).ifPresent(value -> properties.put(PropertyKeyConst.ENDPOINT_PORT, endpointPort)); + } + String accessKey = nacosConf.getAccessKey(); + Optional.ofNullable(accessKey).ifPresent(value -> properties.put(PropertyKeyConst.ACCESS_KEY, accessKey)); + String secretKey = nacosConf.getSecretKey(); + Optional.ofNullable(secretKey).ifPresent(value -> properties.put(PropertyKeyConst.SECRET_KEY, secretKey)); + String clusterName = nacosConf.getClusterName(); + Optional.ofNullable(clusterName).ifPresent(value -> properties.put(PropertyKeyConst.CLUSTER_NAME, clusterName)); + String logFileName = nacosConf.getLogFileName(); + Optional.ofNullable(logFileName).ifPresent(value -> properties.put(UtilAndComs.NACOS_NAMING_LOG_NAME, logFileName)); + String logLevel = nacosConf.getLogLevel(); + Optional.ofNullable(logLevel).ifPresent(value -> properties.put(UtilAndComs.NACOS_NAMING_LOG_LEVEL, logLevel)); + Integer pollingThreadCount = nacosConf.getPollingThreadCount(); + Optional.ofNullable(pollingThreadCount).ifPresent(value -> properties.put(PropertyKeyConst.NAMING_POLLING_THREAD_COUNT, pollingThreadCount)); + String namespace = nacosConf.getNamespace(); + Optional.ofNullable(namespace).ifPresent(value -> properties.put(PropertyKeyConst.NAMESPACE, namespace)); + return properties; + } + + @Override + public void shutdown() throws AdminException { + if (this.namingService != null) { + try { + namingService.shutDown(); + } catch (NacosException e) { + log.warn("shutdown nacos naming service fail", e); + } + } + } + + @Override + public void subscribe(RegistryListener listener, String serviceName) { + lock.lock(); + try { + ServiceInfo serviceInfo = ServiceInfo.fromKey(serviceName); + Map eventListenerMap = listeners.computeIfAbsent(serviceName, k -> new HashMap<>()); + if (eventListenerMap.containsKey(listener)) { + log.warn("already use same listener subscribe service name {}" ,serviceName); + return; + } + EventListener eventListener = listener::onChange; + List clusters ; + if (serviceInfo.getClusters() == null || serviceInfo.getClusters().isEmpty()) { + clusters = new ArrayList<>(); + } else { + clusters = Arrays.stream(serviceInfo.getClusters().split(",")).collect(Collectors.toList()); + } + namingService.subscribe(serviceInfo.getName(),serviceInfo.getGroupName(), clusters, eventListener); + eventListenerMap.put(listener, eventListener); + } catch (Exception e) { + log.error("subscribe service name {} fail", serviceName, e); + } finally { + lock.unlock(); + } + } + + @Override + public void unsubscribe(RegistryListener registryListener, String serviceName) { + lock.lock(); + try { + ServiceInfo serviceInfo = ServiceInfo.fromKey(serviceName); + Map map = listeners.get(serviceName); + if (map == null) { + return; + } + List clusters ; + if (serviceInfo.getClusters() == null || serviceInfo.getClusters().isEmpty()) { + clusters = new ArrayList<>(); + } else { + clusters = Arrays.stream(serviceInfo.getClusters().split(",")).collect(Collectors.toList()); + } + EventListener eventListener = map.get(registryListener); + namingService.unsubscribe(serviceInfo.getName(), serviceInfo.getGroupName(), clusters, eventListener); + map.remove(registryListener); + } catch (Exception e) { + log.error("unsubscribe service name {} fail", serviceName, e); + } finally { + lock.unlock(); + } + } + + @Override + public boolean register(EventMeshAdminServerRegisterInfo eventMeshRegisterInfo) throws AdminException { + try { + String[] ipPort = eventMeshRegisterInfo.getAddress().split(":"); + if (ipPort.length < 2) { + return false; + } + Instance instance = new Instance(); + instance.setClusterName(eventMeshRegisterInfo.getEventMeshClusterName()); + instance.setEnabled(true); + instance.setEphemeral(true); + instance.setHealthy(true); + instance.setWeight(1.0); + instance.setIp(ipPort[0]); + instance.setPort(Integer.parseInt(ipPort[1])); + instance.setMetadata(eventMeshRegisterInfo.getMetadata()); + namingService.registerInstance(eventMeshRegisterInfo.getEventMeshName(), GROUP_NAME, instance); + return true; + } catch (Exception e) { + log.error("register instance service {} group {} cluster {} fail", eventMeshRegisterInfo.getEventMeshName(), GROUP_NAME, eventMeshRegisterInfo.getEventMeshClusterName(), e); + return false; + } + } + + @Override + public boolean unRegister(EventMeshAdminServerRegisterInfo eventMeshRegisterInfo) throws AdminException { + try { + namingService.registerInstance(eventMeshRegisterInfo.getEventMeshName(), GROUP_NAME, new Instance()); + return true; + } catch (Exception e) { + log.error("register instance service {} group {} cluster {} fail", eventMeshRegisterInfo.getEventMeshName(), GROUP_NAME, eventMeshRegisterInfo.getEventMeshClusterName(), e); + return false; + } + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/NacosRegistryConfiguration.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/NacosRegistryConfiguration.java new file mode 100644 index 0000000000..45932e9fd4 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/NacosRegistryConfiguration.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apache.eventmesh.admin.server.registry; + +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.client.naming.utils.UtilAndComs; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.eventmesh.common.config.Config; +import org.apache.eventmesh.common.config.ConfigFiled; + +@Data +@NoArgsConstructor +@Config(prefix = "eventMesh.registry.nacos") +public class NacosRegistryConfiguration { + + @ConfigFiled(field = PropertyKeyConst.ENDPOINT) + private String endpoint; + + @ConfigFiled(field = PropertyKeyConst.ENDPOINT_PORT) + private String endpointPort; + + @ConfigFiled(field = PropertyKeyConst.ACCESS_KEY) + private String accessKey; + + @ConfigFiled(field = PropertyKeyConst.SECRET_KEY) + private String secretKey; + + @ConfigFiled(field = PropertyKeyConst.CLUSTER_NAME) + private String clusterName; + + @ConfigFiled(field = PropertyKeyConst.NAMESPACE) + private String namespace; + + @ConfigFiled(field = PropertyKeyConst.NAMING_POLLING_THREAD_COUNT) + private Integer pollingThreadCount = Runtime.getRuntime().availableProcessors() / 2 + 1; + + @ConfigFiled(field = UtilAndComs.NACOS_NAMING_LOG_NAME) + private String logFileName; + + @ConfigFiled(field = UtilAndComs.NACOS_NAMING_LOG_LEVEL) + private String logLevel; + +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/Registry.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/Registry.java new file mode 100644 index 0000000000..771b45f2e8 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/Registry.java @@ -0,0 +1,70 @@ +package com.apache.eventmesh.admin.server.registry; + +import com.apache.eventmesh.admin.server.AdminException; +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.spi.EventMeshExtensionFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +@Slf4j +public class Registry implements RegistryService { + private static final Map META_CACHE = new HashMap<>(16); + private RegistryService registryService; + + private final AtomicBoolean initFlag = new AtomicBoolean(false); + private final AtomicBoolean shutdownFlag = new AtomicBoolean(false); + + public static Registry getInstance(String registryPluginType) { + return META_CACHE.computeIfAbsent(registryPluginType, Registry::registryBuilder); + } + + private static Registry registryBuilder(String registryPluginType) { + RegistryService registryServiceExt = EventMeshExtensionFactory.getExtension(RegistryService.class, registryPluginType); + if (registryServiceExt == null) { + String errorMsg = "can't load the metaService plugin, please check."; + log.error(errorMsg); + throw new RuntimeException(errorMsg); + } + Registry metaStorage = new Registry(); + metaStorage.registryService = registryServiceExt; + + return metaStorage; + } + + @Override + public void init() throws AdminException { + if (initFlag.compareAndSet(false, true)) { + return; + } + this.registryService.init(); + } + + @Override + public void shutdown() throws AdminException { + if (shutdownFlag.compareAndSet(false, true)) { + this.registryService.shutdown(); + } + } + + @Override + public void subscribe(RegistryListener registryListener, String serviceName) { + this.registryService.subscribe(registryListener, serviceName); + } + + @Override + public void unsubscribe(RegistryListener registryListener, String serviceName) { + this.registryService.unsubscribe(registryListener, serviceName); + } + + @Override + public boolean register(EventMeshAdminServerRegisterInfo eventMeshRegisterInfo) throws AdminException { + return this.registryService.register(eventMeshRegisterInfo); + } + + @Override + public boolean unRegister(EventMeshAdminServerRegisterInfo eventMeshUnRegisterInfo) throws AdminException { + return this.registryService.unRegister(eventMeshUnRegisterInfo); + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/RegistryListener.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/RegistryListener.java new file mode 100644 index 0000000000..2d339497f5 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/RegistryListener.java @@ -0,0 +1,5 @@ +package com.apache.eventmesh.admin.server.registry; + +public interface RegistryListener { + void onChange(Object data); +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/RegistryService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/RegistryService.java new file mode 100644 index 0000000000..0cddd009a0 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/RegistryService.java @@ -0,0 +1,20 @@ +package com.apache.eventmesh.admin.server.registry; + +import com.apache.eventmesh.admin.server.AdminException; +import org.apache.eventmesh.spi.EventMeshExtensionType; +import org.apache.eventmesh.spi.EventMeshSPI; + +@EventMeshSPI(eventMeshExtensionType = EventMeshExtensionType.REGISTRY) +public interface RegistryService { + void init() throws AdminException; + + void shutdown() throws AdminException; + + void subscribe(RegistryListener registryListener, String serviceName); + + void unsubscribe(RegistryListener registryListener, String serviceName); + + boolean register(EventMeshAdminServerRegisterInfo eventMeshRegisterInfo) throws AdminException; + + boolean unRegister(EventMeshAdminServerRegisterInfo eventMeshUnRegisterInfo) throws AdminException; +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Job.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Job.java new file mode 100644 index 0000000000..1fe5b08976 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Job.java @@ -0,0 +1,8 @@ +package com.apache.eventmesh.admin.server.task; + +public class Job { + private long id; + private long taskID; + private JobType type; + private JobState state; +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobState.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobState.java new file mode 100644 index 0000000000..845d91c4a3 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobState.java @@ -0,0 +1,10 @@ +package com.apache.eventmesh.admin.server.task; + +public enum JobState { + INIT, + STARaTED, + PAUSE, + COMPLETE, + DELETE, + FAIL +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobType.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobType.java new file mode 100644 index 0000000000..b694803988 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobType.java @@ -0,0 +1,7 @@ +package com.apache.eventmesh.admin.server.task; + +public enum JobType { + FULL, + INCREASE, + STRUCT_SYNC +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Position.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Position.java new file mode 100644 index 0000000000..491f796a9b --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Position.java @@ -0,0 +1,5 @@ +package com.apache.eventmesh.admin.server.task; + +public class Position { + +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Task.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Task.java new file mode 100644 index 0000000000..4f6cb7cfe9 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Task.java @@ -0,0 +1,17 @@ +package com.apache.eventmesh.admin.server.task; + +// task : job = 1 : m +public class Task { + private long id; + private String name; + private String desc; + private String uid; + private String sourceUser; + private String sourcePasswd; + private String targetUser; + private String targetPasswd; + private int sourceType; + private int targetType; + + +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/Request.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/Request.java new file mode 100644 index 0000000000..d36c292d6f --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/Request.java @@ -0,0 +1,22 @@ +package com.apache.eventmesh.admin.server.web; + +public class Request { + private String uid; + private T data; + + public String getUid() { + return uid; + } + + public void setUid(String uid) { + this.uid = uid; + } + + public T getData() { + return data; + } + + public void setData(T data) { + this.data = data; + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/Response.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/Response.java new file mode 100644 index 0000000000..4502ad792b --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/Response.java @@ -0,0 +1,33 @@ +package com.apache.eventmesh.admin.server.web; + +public class Response { + private boolean success; + + private String desc; + + private T data; + + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public String getDesc() { + return desc; + } + + public void setDesc(String desc) { + this.desc = desc; + } + + public T getData() { + return data; + } + + public void setData(T data) { + this.data = data; + } +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/ServerController.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/ServerController.java new file mode 100644 index 0000000000..6f6e5fc7cb --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/ServerController.java @@ -0,0 +1,9 @@ +package com.apache.eventmesh.admin.server.web; + +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/eventmesh/admin") +public class ServerController { +} diff --git a/eventmesh-admin-server/src/main/resources/META-INF.eventmesh/com.apache.eventmesh.admin.server.registry.RegistryService b/eventmesh-admin-server/src/main/resources/META-INF.eventmesh/com.apache.eventmesh.admin.server.registry.RegistryService new file mode 100644 index 0000000000..656fec8f37 --- /dev/null +++ b/eventmesh-admin-server/src/main/resources/META-INF.eventmesh/com.apache.eventmesh.admin.server.registry.RegistryService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +nacos=com.apache.eventmesh.admin.server.registry.NacosDiscoveryService \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/resources/application.yaml b/eventmesh-admin-server/src/main/resources/application.yaml new file mode 100644 index 0000000000..aa72432b64 --- /dev/null +++ b/eventmesh-admin-server/src/main/resources/application.yaml @@ -0,0 +1,8 @@ +spring: + datasource: + url: jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2B8&characterEncoding=utf-8&useSSL=false + username: sodafang + password: asdfasdf + driver-class-name: com.mysql.cj.jdbc.Driver +mybatis: + mapper-locations: classpath:mapper/*.xml \ No newline at end of file diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/PagedList.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/PagedList.java new file mode 100644 index 0000000000..0046764ada --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/PagedList.java @@ -0,0 +1,52 @@ +package org.apache.eventmesh.common.utils; + +import java.util.List; + +public class PagedList { + private int totalSize; + private int totalPage; + private int size; + private int page; + + private List data; + + public int getTotalSize() { + return totalSize; + } + + public void setTotalSize(int totalSize) { + this.totalSize = totalSize; + } + + public int getTotalPage() { + return totalPage; + } + + public void setTotalPage(int totalPage) { + this.totalPage = totalPage; + } + + public int getSize() { + return size; + } + + public void setSize(int size) { + this.size = size; + } + + public int getPage() { + return page; + } + + public void setPage(int page) { + this.page = page; + } + + public List getData() { + return data; + } + + public void setData(List data) { + this.data = data; + } +} diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionType.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionType.java index f76379f9e8..8de4e1ecfd 100644 --- a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionType.java +++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionType.java @@ -26,6 +26,7 @@ public enum EventMeshExtensionType { CONNECTOR("connector"), STORAGE("storage"), META("metaStorage"), + REGISTRY("registryCenter"), SECURITY("security"), PROTOCOL("protocol"), METRICS("metrics"), diff --git a/settings.gradle b/settings.gradle index 645e6fb365..6162f91f7a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -126,3 +126,4 @@ include 'eventmesh-webhook:eventmesh-webhook-receive' include 'eventmesh-retry' include 'eventmesh-retry:eventmesh-retry-api' include 'eventmesh-retry:eventmesh-retry-rocketmq' +include 'eventmesh-admin-server'