Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import static java.util.Collections.emptyMap;
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.CHECK_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.CLASSIFIER_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
Expand Down Expand Up @@ -522,6 +523,10 @@ public static boolean isRegistry(URL url) {
|| (url.getProtocol() != null && url.getProtocol().endsWith("-registry-protocol"));
}

public static boolean isCheck(URL url) {
return url.getParameter(CHECK_KEY, true) && url.getPort() != 0;
}

/**
* The specified {@link URL} is service discovery registry type or not
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ public void run(Timeout timeout) throws Exception {
logger.info(taskName + " : " + url);
}
try {
if (!registry.isAvailable()) {
throw new IllegalStateException("Registry is not available.");
}
doRetry(url, registry, timeout);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.RegistryFactory;
import org.apache.dubbo.registry.RegistryService;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ScopeModelAware;

import static org.apache.dubbo.common.constants.CommonConstants.CHECK_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_FAILED_CREATE_INSTANCE;
Expand Down Expand Up @@ -74,7 +74,7 @@ public Registry getRegistry(URL url) {

String key = createRegistryCacheKey(url);
Registry registry = null;
boolean check = url.getParameter(CHECK_KEY, true) && url.getPort() != 0;
boolean check = UrlUtils.isCheck(url);

// Lock the registry access process to ensure a single instance of the registry
registryManager.getRegistryLock().lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,14 @@ public class NacosConnectionManager {

private final boolean check;

private final Properties nacosProperties;

public NacosConnectionManager(URL connectionURL, boolean check, int retryTimes, int sleepMsBetweenRetries) {
this.connectionURL = connectionURL;
this.check = check;
this.retryTimes = retryTimes;
this.sleepMsBetweenRetries = sleepMsBetweenRetries;
this.nacosProperties = buildNacosProperties(this.connectionURL);
// create default one
this.namingServiceList.add(createNamingService());
}
Expand All @@ -78,6 +81,7 @@ protected NacosConnectionManager(NamingService namingService) {
this.retryTimes = 0;
this.sleepMsBetweenRetries = 0;
this.check = false;
this.nacosProperties = null;
// create default one
this.namingServiceList.add(namingService);
}
Expand Down Expand Up @@ -116,7 +120,6 @@ public synchronized void shutdownAll() {
* @return {@link NamingService}
*/
protected NamingService createNamingService() {
Properties nacosProperties = buildNacosProperties(this.connectionURL);
NamingService namingService = null;
try {
for (int i = 0; i < retryTimes + 1; i++) {
Expand Down Expand Up @@ -177,6 +180,9 @@ private boolean testNamingService(NamingService namingService) {

private Properties buildNacosProperties(URL url) {
Properties properties = new Properties();
if (StringUtils.isEmpty(url.getHost())) {
return properties;
}
setServerAddr(url, properties);
setProperties(url, properties);
return properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.nacos.NacosConnectionManager;
Expand Down Expand Up @@ -114,6 +115,9 @@ public static NacosNamingServiceWrapper createNamingService(URL connectionURL) {
boolean check = connectionURL.getParameter(NACOS_CHECK_KEY, true);
int retryTimes = connectionURL.getPositiveParameter(NACOS_RETRY_KEY, 10);
int sleepMsBetweenRetries = connectionURL.getPositiveParameter(NACOS_RETRY_WAIT_KEY, 10);
if (check && !UrlUtils.isCheck(connectionURL)) {
check = false;
}
NacosConnectionManager nacosConnectionManager =
new NacosConnectionManager(connectionURL, check, retryTimes, sleepMsBetweenRetries);
return new NacosNamingServiceWrapper(nacosConnectionManager, retryTimes, sleepMsBetweenRetries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.zookeeper.ZookeeperInstance;
Expand Down Expand Up @@ -117,7 +118,8 @@ public List<ACL> getAclForPath(String path) {
throw new IllegalStateException("zookeeper client initialization failed");
}

if (!curatorFramework.getZookeeperClient().isConnected()) {
boolean check = UrlUtils.isCheck(connectionURL);
if (check && !curatorFramework.getZookeeperClient().isConnected()) {
throw new IllegalStateException("failed to connect to zookeeper server");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -50,6 +51,7 @@
import org.mockito.internal.util.collections.Sets;

import static java.util.Arrays.asList;
import static org.apache.dubbo.common.constants.CommonConstants.CHECK_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -84,23 +86,20 @@ class ZookeeperServiceDiscoveryTest {
private ServiceDiscovery mockServiceDiscovery;
private ServiceCacheBuilder mockServiceCacheBuilder;
private ServiceCache mockServiceCache;
private static final CuratorFrameworkFactory.Builder spyBuilder = spy(CuratorFrameworkFactory.builder());

@BeforeAll
public static void beforeAll() {
zookeeperConnectionAddress1 = "zookeeper://localhost:" + "2181";
}

@BeforeEach
public void init() throws Exception {
// mock begin
// create mock bean begin
CuratorFrameworkFactory.Builder realBuilder = CuratorFrameworkFactory.builder();
CuratorFrameworkFactory.Builder spyBuilder = spy(realBuilder);
curatorFrameworkFactoryMockedStatic = mockStatic(CuratorFrameworkFactory.class);
curatorFrameworkFactoryMockedStatic
.when(CuratorFrameworkFactory::builder)
.thenReturn(spyBuilder);
serviceDiscoveryBuilderMockedStatic = mockStatic(ServiceDiscoveryBuilder.class);
}

@BeforeEach
public void init() throws Exception {
mockServiceDiscoveryBuilder = mock(ServiceDiscoveryBuilder.class);
mockServiceDiscovery = mock(ServiceDiscovery.class);
mockServiceCacheBuilder = mock(ServiceCacheBuilder.class);
Expand Down Expand Up @@ -200,6 +199,32 @@ public void onEvent(ServiceInstancesChangedEvent event) {
assertTrue(serviceInstances.isEmpty());
}

@Test
void testRegistryCheckConnectDefault() {
when(mockCuratorZookeeperClient.isConnected()).thenReturn(false);

URL registryUrl = URL.valueOf(zookeeperConnectionAddress1);
ApplicationModel applicationModel = ApplicationModel.defaultModel();
registryUrl.setScopeModel(applicationModel);

Assertions.assertThrowsExactly(IllegalStateException.class, () -> {
new ZookeeperServiceDiscovery(applicationModel, registryUrl);
});
}

@Test
void testRegistryNotCheckConnect() {
when(mockCuratorZookeeperClient.isConnected()).thenReturn(false);

URL registryUrl = URL.valueOf(zookeeperConnectionAddress1).addParameter(CHECK_KEY, false);
ApplicationModel applicationModel = ApplicationModel.defaultModel();
registryUrl.setScopeModel(applicationModel);

Assertions.assertDoesNotThrow(() -> {
new ZookeeperServiceDiscovery(applicationModel, registryUrl);
});
}

@AfterAll
public static void afterAll() throws Exception {
if (curatorFrameworkFactoryMockedStatic != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import static org.apache.dubbo.common.constants.CommonConstants.CHECK_KEY;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.EXPORTED_SERVICES_REVISION_PROPERTY_NAME;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.METADATA_STORAGE_TYPE_PROPERTY_NAME;
import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.ROOT_PATH;
Expand Down Expand Up @@ -109,6 +110,25 @@ void testBuildServiceDiscovery() throws Exception {
curatorFramework.getZookeeperClient().close();
}

@Test
void testBuildCuratorFrameworkCheckConnectDefault() {
when(mockCuratorZookeeperClient.isConnected()).thenReturn(false);
Assertions.assertThrowsExactly(IllegalStateException.class, () -> {
CuratorFramework curatorFramework = CuratorFrameworkUtils.buildCuratorFramework(registryUrl, null);
curatorFramework.getZookeeperClient().close();
});
}

@Test
void testBuildCuratorFrameworkNotCheckConnect() {
when(mockCuratorZookeeperClient.isConnected()).thenReturn(false);
URL url = registryUrl.addParameter(CHECK_KEY, false);
Assertions.assertDoesNotThrow(() -> {
CuratorFramework curatorFramework = CuratorFrameworkUtils.buildCuratorFramework(url, null);
curatorFramework.getZookeeperClient().close();
});
}

@Test
void testBuild() {
ServiceInstance dubboServiceInstance =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dubbo.common.config.configcenter.ConfigItem;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.UrlUtils;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -96,8 +97,8 @@ public List<ACL> getAclForPath(String path) {
client.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(url));
client.start();
boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);

if (!connected) {
boolean check = UrlUtils.isCheck(url);
if (check && !connected) {
// close CuratorFramework to stop re-connection.
client.close();
IllegalStateException illegalStateException =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,55 @@

import org.apache.dubbo.common.URL;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.listen.StandardListenerManager;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;

import static org.apache.dubbo.common.constants.CommonConstants.CHECK_KEY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsNot.not;
import static org.hamcrest.core.IsNull.nullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstructionWithAnswer;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

class Curator5ZookeeperClientManagerTest {
private ZookeeperClient zookeeperClient;
private static MockedConstruction<Curator5ZookeeperClient> mockedCurator5ZookeeperClientConstruction;
private static String zookeeperConnectionAddress1;
private static URL zookeeperUrl;
private static MockedStatic<CuratorFrameworkFactory> curatorFrameworkFactoryMockedStatic;
private static CuratorFramework mockCuratorFramework;

@BeforeAll
public static void beforeAll() {
zookeeperConnectionAddress1 = "zookeeper://127.0.0.1:2181";
Curator5ZookeeperClient mockCurator5ZookeeperClient = mock(Curator5ZookeeperClient.class);
mockedCurator5ZookeeperClientConstruction =
mockConstructionWithAnswer(Curator5ZookeeperClient.class, invocationOnMock -> invocationOnMock
.getMethod()
.invoke(mockCurator5ZookeeperClient, invocationOnMock.getArguments()));
String zookeeperConnectionAddress1 = "zookeeper://127.0.0.1:2181";
zookeeperUrl = URL.valueOf(zookeeperConnectionAddress1 + "/service");

CuratorFrameworkFactory.Builder realBuilder = CuratorFrameworkFactory.builder();
CuratorFrameworkFactory.Builder spyBuilder = spy(realBuilder);

curatorFrameworkFactoryMockedStatic = mockStatic(CuratorFrameworkFactory.class);
curatorFrameworkFactoryMockedStatic
.when(CuratorFrameworkFactory::builder)
.thenReturn(spyBuilder);
mockCuratorFramework = mock(CuratorFramework.class);
doReturn(mockCuratorFramework).when(spyBuilder).build();
}

@BeforeEach
public void setUp() {
zookeeperClient = new ZookeeperClientManager().connect(URL.valueOf(zookeeperConnectionAddress1 + "/service"));
public void setUp() throws InterruptedException {
when(mockCuratorFramework.blockUntilConnected(anyInt(), any())).thenReturn(true);
when(mockCuratorFramework.getConnectionStateListenable()).thenReturn(StandardListenerManager.standard());
zookeeperClient = new ZookeeperClientManager().connect(zookeeperUrl);
}

@Test
Expand All @@ -56,8 +75,29 @@ void testZookeeperClient() {
zookeeperClient.close();
}

@Test
void testRegistryCheckConnectDefault() throws InterruptedException {
when(mockCuratorFramework.blockUntilConnected(anyInt(), any())).thenReturn(false);

ZookeeperClientManager zookeeperClientManager = new ZookeeperClientManager();
Assertions.assertThrowsExactly(IllegalStateException.class, () -> {
zookeeperClientManager.connect(zookeeperUrl);
});
}

@Test
void testRegistryNotCheckConnect() throws InterruptedException {
when(mockCuratorFramework.blockUntilConnected(anyInt(), any())).thenReturn(false);

URL url = zookeeperUrl.addParameter(CHECK_KEY, false);
ZookeeperClientManager zookeeperClientManager = new ZookeeperClientManager();
Assertions.assertDoesNotThrow(() -> {
zookeeperClientManager.connect(url);
});
}

@AfterAll
public static void afterAll() {
mockedCurator5ZookeeperClientConstruction.close();
curatorFrameworkFactoryMockedStatic.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.mockito.MockedStatic;
import org.mockito.stubbing.Answer;

import static org.apache.dubbo.common.constants.CommonConstants.CHECK_KEY;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
Expand Down Expand Up @@ -216,6 +217,16 @@ void testWithInvalidServer() throws InterruptedException {
});
}

@Test
void testWithInvalidServerWithoutCheck() throws InterruptedException {
when(mockCuratorFramework.blockUntilConnected(anyInt(), any())).thenReturn(false);
URL url = URL.valueOf("zookeeper://127.0.0.1:1/service").addParameter(CHECK_KEY, false);
Assertions.assertDoesNotThrow(() -> {
curatorClient = new Curator5ZookeeperClient(url);
curatorClient.create("/testPath", true, true);
});
}

@Test
void testRemoveChildrenListener() throws Exception {
ChildListener childListener = mock(ChildListener.class);
Expand Down
Loading