Skip to content

Commit a0bef77

Browse files
author
Deng Ziwu
committed
add_be_host_mapping_list_config
Signed-off-by: Deng Ziwu <[email protected]>
1 parent 846f1e4 commit a0bef77

File tree

4 files changed

+54
-8
lines changed

4 files changed

+54
-8
lines changed

src/main/java/com/starrocks/connector/spark/cfg/ConfigurationOptions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ public interface ConfigurationOptions {
2323
// starrocks fe node address
2424
String STARROCKS_FENODES = "starrocks.fenodes";
2525

26+
String STARROCKS_BE_HOST_MAPPING_LIST = "starrocks.be.host.mapping.list";
27+
2628
String STARROCKS_DEFAULT_CLUSTER = "default_cluster";
2729

2830
String STARROCKS_TIMEZONE = "starrocks.timezone";

src/main/java/com/starrocks/connector/spark/serialization/Routing.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,15 @@
2121

2222
import static com.starrocks.connector.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE;
2323

24+
import com.starrocks.connector.spark.cfg.ConfigurationOptions;
25+
import com.starrocks.connector.spark.cfg.Settings;
2426
import com.starrocks.connector.spark.exception.IllegalArgumentException;
2527
import org.slf4j.Logger;
2628
import org.slf4j.LoggerFactory;
2729

30+
import java.util.HashMap;
31+
import java.util.Map;
32+
2833
/**
2934
* present an StarRocks BE address.
3035
*/
@@ -34,12 +39,30 @@ public class Routing {
3439
private String host;
3540
private int port;
3641

37-
public Routing(String routing) throws IllegalArgumentException {
38-
parseRouting(routing);
42+
public Routing(String routing, Settings settings) throws IllegalArgumentException {
43+
parseRouting(routing, settings);
3944
}
4045

41-
private void parseRouting(String routing) throws IllegalArgumentException {
46+
private void parseRouting(String routing, Settings settings) throws IllegalArgumentException {
4247
logger.debug("Parse StarRocks BE address: '{}'.", routing);
48+
String beHostMappingList = settings.getProperty(ConfigurationOptions.STARROCKS_BE_HOST_MAPPING_LIST, "");
49+
if (beHostMappingList.length() > 0) {
50+
String list = beHostMappingList;
51+
Map<String, String> mappingMap = new HashMap<>();
52+
String[] beHostMappingInfos = list.split(";");
53+
for (String beHostMappingInfo : beHostMappingInfos) {
54+
String[] mapping = beHostMappingInfo.split(",");
55+
mappingMap.put(mapping[1].trim(), mapping[0].trim());
56+
}
57+
if (!mappingMap.containsKey(routing)) {
58+
throw new RuntimeException("Not find be node info from the be port mappping list");
59+
}
60+
routing = mappingMap.get(routing);
61+
logger.info("query data from be by using be-hostname {}", routing);
62+
} else {
63+
logger.info("query data from be by using be-ip {}", routing);
64+
}
65+
4366
String[] hostPort = routing.split(":");
4467
if (hostPort.length != 2) {
4568
logger.error("Format of StarRocks BE address '{}' is illegal.", routing);

src/main/scala/com/starrocks/connector/spark/rdd/ScalaValueReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
4848
protected val logger = Logger.getLogger(classOf[ScalaValueReader])
4949

5050
protected val timeZone = ZoneId.of(settings.getProperty(STARROCKS_TIMEZONE, ZoneId.systemDefault.toString))
51-
protected val client = new BackendClient(new Routing(partition.getBeAddress), settings)
51+
protected val client = new BackendClient(new Routing(partition.getBeAddress, settings), settings)
5252
protected var offset = 0
5353
protected var eos: AtomicBoolean = new AtomicBoolean(false)
5454
protected var rowBatch: RowBatch = _

src/test/java/com/starrocks/connector/spark/serialization/TestRouting.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121

2222
import static org.hamcrest.core.StringStartsWith.startsWith;
2323

24+
import com.starrocks.connector.spark.cfg.ConfigurationOptions;
25+
import com.starrocks.connector.spark.cfg.PropertiesSettings;
26+
import com.starrocks.connector.spark.cfg.Settings;
2427
import com.starrocks.connector.spark.exception.IllegalArgumentException;
2528

2629
import org.junit.Assert;
@@ -32,18 +35,36 @@ public class TestRouting {
3235
@Rule
3336
public ExpectedException thrown = ExpectedException.none();
3437

38+
3539
@Test
36-
public void testRouting() throws Exception {
37-
Routing r1 = new Routing("10.11.12.13:1234");
40+
public void testRoutingNoBeMappingList() throws Exception {
41+
Settings settings = new PropertiesSettings();
42+
Routing r1 = new Routing("10.11.12.13:1234", settings);
3843
Assert.assertEquals("10.11.12.13", r1.getHost());
3944
Assert.assertEquals(1234, r1.getPort());
4045

4146
thrown.expect(IllegalArgumentException.class);
4247
thrown.expectMessage(startsWith("argument "));
43-
new Routing("10.11.12.13:wxyz");
48+
new Routing("10.11.12.13:wxyz", settings);
4449

4550
thrown.expect(IllegalArgumentException.class);
4651
thrown.expectMessage(startsWith("Parse "));
47-
new Routing("10.11.12.13");
52+
new Routing("10.11.12.13", settings);
53+
}
54+
55+
@Test
56+
public void testRoutingBeMappingList() throws Exception {
57+
Settings settings = new PropertiesSettings();
58+
String mappingList = "20.11.12.13:6666,10.11.12.13:1234;21.11.12.13:5555,11.11.12.13:1234";
59+
settings.setProperty(ConfigurationOptions.STARROCKS_BE_HOST_MAPPING_LIST, mappingList);
60+
61+
Routing r1 = new Routing("10.11.12.13:1234", settings);
62+
Assert.assertEquals("20.11.12.13", r1.getHost());
63+
Assert.assertEquals(6666, r1.getPort());
64+
65+
Routing r2 = new Routing("11.11.12.13:1234", settings);
66+
Assert.assertEquals("21.11.12.13", r2.getHost());
67+
Assert.assertEquals(5555, r2.getPort());
68+
4869
}
4970
}

0 commit comments

Comments
 (0)