Skip to content

Commit 979e8ff

Browse files
authored
Merge branch 'apache:master' into master
2 parents ae58773 + d9c3fc9 commit 979e8ff

File tree

77 files changed

+5312
-203
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+5312
-203
lines changed

external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ private static void makeMqttPluginDir() throws IOException {
7979
}
8080

8181
private static void buildMqttPluginMap() throws IOException {
82-
ServiceLoader<PayloadFormatter> payloadFormatters = ServiceLoader.load(PayloadFormatter.class);
82+
ServiceLoader<PayloadFormatter> payloadFormatters =
83+
ServiceLoader.load(PayloadFormatter.class, PayloadFormatManager.class.getClassLoader());
8384
for (PayloadFormatter formatter : payloadFormatters) {
8485
if (formatter == null) {
8586
logger.error("PayloadFormatManager(), formatter is null.");

integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeForecastIT.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public static void forecastTableFunctionErrorTest(
133133
"701: The OUTPUT_START_TIME should be greater than the maximum timestamp of target time series. Expected greater than [5759] but found [5759].");
134134

135135
// OUTPUT_LENGTH error
136-
String invalidOutputLengthSQL =
136+
String invalidOutputLengthSQLWithZero =
137137
String.format(
138138
FORECAST_TABLE_FUNCTION_SQL_TEMPLATE,
139139
modelInfo.getModelId(),
@@ -144,7 +144,24 @@ public static void forecastTableFunctionErrorTest(
144144
0,
145145
1,
146146
"time");
147-
errorTest(statement, invalidOutputLengthSQL, "701: OUTPUT_LENGTH should be greater than 0");
147+
errorTest(
148+
statement, invalidOutputLengthSQLWithZero, "701: OUTPUT_LENGTH should be greater than 0");
149+
150+
String invalidOutputLengthSQLWithOutOfRange =
151+
String.format(
152+
FORECAST_TABLE_FUNCTION_SQL_TEMPLATE,
153+
modelInfo.getModelId(),
154+
0,
155+
5760,
156+
2880,
157+
5760,
158+
2881,
159+
1,
160+
"time");
161+
errorTest(
162+
statement,
163+
invalidOutputLengthSQLWithOutOfRange,
164+
"1599: Error occurred while executing forecast:[Attribute output_length expect value between 1 and 2880, got 2881 instead.]");
148165

149166
// OUTPUT_INTERVAL error
150167
String invalidOutputIntervalSQL =

integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeInstanceManagementIT.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,5 +155,13 @@ private void failTest(Statement statement) {
155155
statement,
156156
"UNLOAD MODEL sundial FROM DEVICES \"unknown\"",
157157
"1507: Device ID [unknown] is not available. You can use 'SHOW AI_DEVICES' to retrieve the available devices.");
158+
errorTest(
159+
statement,
160+
"LOAD MODEL sundial TO DEVICES \"0,0\"",
161+
"1509: Device ID list contains duplicate entries.");
162+
errorTest(
163+
statement,
164+
"UNLOAD MODEL sundial FROM DEVICES \"0,0\"",
165+
"1510: Device ID list contains duplicate entries.");
158166
}
159167
}

integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,4 +192,46 @@ public void testLastQueryWithBlobType() {
192192
};
193193
resultSetEqualTest("select last s1 from root.sg.d1;", expectedHeader, retArray);
194194
}
195+
196+
@Test
197+
public void testNonAlignedLastQueryWithTimeFilterWithoutCache() throws SQLException {
198+
try (Connection connection = EnvFactory.getEnv().getConnection();
199+
Statement statement = connection.createStatement()) {
200+
statement.execute("insert into root.db1.g1.d3(time,s_3) values (1,1)");
201+
statement.execute("insert into root.db1.g1.d3(time,s_3) values (2,2)");
202+
statement.execute("insert into root.db1.g1.d3(time,s_3) values (3,3)");
203+
statement.execute("insert into root.db1.g1.d3(time,s_3) values (4,4)");
204+
statement.execute("insert into root.db1.g1.d3(time,s_1) values (1,1)");
205+
statement.execute("insert into root.db1.g1.d3(time,s_1) values (2,2)");
206+
}
207+
String[] expectedHeader =
208+
new String[] {TIMESTAMP_STR, TIMESERIES_STR, VALUE_STR, DATA_TYPE_STR};
209+
String[] retArray =
210+
new String[] {
211+
"4,root.db1.g1.d3.s_3,4.0,DOUBLE,",
212+
};
213+
resultSetEqualTest(
214+
"select last s_1, s_3 from root.db1.g1.d3 where time > 2;", expectedHeader, retArray);
215+
}
216+
217+
@Test
218+
public void testAlignedLastQueryWithTimeFilterWithoutCache() throws SQLException {
219+
try (Connection connection = EnvFactory.getEnv().getConnection();
220+
Statement statement = connection.createStatement()) {
221+
statement.execute("insert into root.db1.g1.d4(time,s_3) aligned values (1,1)");
222+
statement.execute("insert into root.db1.g1.d4(time,s_3) aligned values (2,2)");
223+
statement.execute("insert into root.db1.g1.d4(time,s_3) aligned values (3,3)");
224+
statement.execute("insert into root.db1.g1.d4(time,s_3) aligned values (4,4)");
225+
statement.execute("insert into root.db1.g1.d4(time,s_1) aligned values (1,1)");
226+
statement.execute("insert into root.db1.g1.d4(time,s_1) aligned values (2,2)");
227+
}
228+
String[] expectedHeader =
229+
new String[] {TIMESTAMP_STR, TIMESERIES_STR, VALUE_STR, DATA_TYPE_STR};
230+
String[] retArray =
231+
new String[] {
232+
"4,root.db1.g1.d4.s_3,4.0,DOUBLE,",
233+
};
234+
resultSetEqualTest(
235+
"select last s_1, s_3 from root.db1.g1.d4 where time > 2;", expectedHeader, retArray);
236+
}
195237
}

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,13 @@ public void testSourcePermission() {
323323
"create database root.db", "create timeSeries root.db.device.measurement int32"),
324324
null);
325325

326+
// Write some aligned historical data
327+
TestUtils.executeNonQueries(
328+
senderEnv,
329+
Arrays.asList(
330+
"create aligned timeSeries root.vehicle.plane(temperature DOUBLE, pressure INT32)",
331+
"insert into root.vehicle.plane(temperature, pressure) values (36.5, 1103)"));
332+
326333
// Transfer snapshot
327334
try (final Connection connection = senderEnv.getConnection();
328335
final Statement statement = connection.createStatement()) {
@@ -384,6 +391,13 @@ public void testSourcePermission() {
384391
"count(root.vehicle.car.pressure),",
385392
Collections.singleton("0,"));
386393

394+
// Exception, skip
395+
TestUtils.assertDataAlwaysOnEnv(
396+
receiverEnv,
397+
"select count(temperature) from root.vehicle.plane",
398+
"count(root.vehicle.plane.temperature),",
399+
Collections.singleton("0,"));
400+
387401
// Alter pipe, throw exception if no privileges
388402
try (final Connection connection = senderEnv.getConnection();
389403
final Statement statement = connection.createStatement()) {

integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,16 @@ public void testOPCUAServerSink() throws Exception {
9494
final Map<String, String> sinkAttributes = new HashMap<>();
9595

9696
sinkAttributes.put("sink", "opc-ua-sink");
97-
sinkAttributes.put("opcua.model", "client-server");
98-
sinkAttributes.put("security-policy", "None");
97+
sinkAttributes.put("model", "client-server");
98+
sinkAttributes.put("opcua.security-policy", "None");
9999

100100
OpcUaClient opcUaClient;
101101
DataValue value;
102102
while (true) {
103103
final int[] ports = EnvUtils.searchAvailablePorts();
104104
tcpPort = ports[0];
105105
httpsPort = ports[1];
106-
sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
106+
sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort));
107107
sinkAttributes.put("https.port", Integer.toString(httpsPort));
108108

109109
Assert.assertEquals(
@@ -146,7 +146,7 @@ public void testOPCUAServerSink() throws Exception {
146146
final int[] ports = EnvUtils.searchAvailablePorts();
147147
tcpPort = ports[0];
148148
httpsPort = ports[1];
149-
sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
149+
sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort));
150150
sinkAttributes.put("https.port", Integer.toString(httpsPort));
151151
sinkAttributes.put("with-quality", "true");
152152

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.relational.it.db.it;
21+
22+
import org.apache.iotdb.it.env.EnvFactory;
23+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
24+
import org.apache.iotdb.itbase.category.TableClusterIT;
25+
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
26+
27+
import org.junit.AfterClass;
28+
import org.junit.BeforeClass;
29+
import org.junit.Test;
30+
import org.junit.experimental.categories.Category;
31+
import org.junit.runner.RunWith;
32+
33+
import java.sql.Connection;
34+
import java.sql.Statement;
35+
36+
import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
37+
import static org.junit.Assert.fail;
38+
39+
@RunWith(IoTDBTestRunner.class)
40+
@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
41+
public class IoTDBWindowFunction3IT {
42+
private static final String DATABASE_NAME = "test";
43+
private static final String[] sqls =
44+
new String[] {
45+
"CREATE DATABASE " + DATABASE_NAME,
46+
"USE " + DATABASE_NAME,
47+
"create table demo (device string tag, value double field)",
48+
"insert into demo values (2021-01-01T09:05:00, 'd1', 3)",
49+
"insert into demo values (2021-01-01T09:07:00, 'd1', 5)",
50+
"insert into demo values (2021-01-01T09:09:00, 'd1', 3)",
51+
"insert into demo values (2021-01-01T09:10:00, 'd1', 1)",
52+
"insert into demo values (2021-01-01T09:08:00, 'd2', 2)",
53+
"insert into demo values (2021-01-01T09:15:00, 'd2', 4)",
54+
"FLUSH",
55+
"CLEAR ATTRIBUTE CACHE",
56+
};
57+
58+
protected static void insertData() {
59+
try (Connection connection = EnvFactory.getEnv().getTableConnection();
60+
Statement statement = connection.createStatement()) {
61+
for (String sql : sqls) {
62+
statement.execute(sql);
63+
}
64+
} catch (Exception e) {
65+
fail("insertData failed.");
66+
}
67+
}
68+
69+
@BeforeClass
70+
public static void setUp() {
71+
EnvFactory.getEnv().getConfig().getCommonConfig().setSortBufferSize(1024 * 1024);
72+
EnvFactory.getEnv().initClusterEnvironment();
73+
insertData();
74+
}
75+
76+
@AfterClass
77+
public static void tearDown() {
78+
EnvFactory.getEnv().cleanClusterEnvironment();
79+
}
80+
81+
@Test
82+
public void testMergeWindowFunctions() {
83+
String[] expectedHeader = new String[] {"time", "device", "value", "a", "b"};
84+
String[] retArray =
85+
new String[] {
86+
"2021-01-01T09:05:00.000Z,d1,3.0,3.0,4.0,",
87+
"2021-01-01T09:07:00.000Z,d1,5.0,5.0,6.0,",
88+
"2021-01-01T09:09:00.000Z,d1,3.0,3.0,4.0,",
89+
"2021-01-01T09:10:00.000Z,d1,1.0,1.0,2.0,",
90+
"2021-01-01T09:08:00.000Z,d2,2.0,2.0,4.0,",
91+
"2021-01-01T09:15:00.000Z,d2,4.0,4.0,6.0,",
92+
};
93+
tableResultSetEqualTest(
94+
"SELECT *, a + min(value) OVER (PARTITION BY device ORDER BY value) as b FROM (SELECT *, max(value) OVER (PARTITION BY device ORDER BY value) as a FROM demo) ORDER BY device, time",
95+
expectedHeader,
96+
retArray,
97+
DATABASE_NAME);
98+
}
99+
100+
@Test
101+
public void testSwapWindowFunctions() {
102+
String[] expectedHeader = new String[] {"time", "device", "value", "p1", "p2"};
103+
String[] retArray =
104+
new String[] {
105+
"2021-01-01T09:05:00.000Z,d1,3.0,1.0,6.0,",
106+
"2021-01-01T09:07:00.000Z,d1,5.0,1.0,5.0,",
107+
"2021-01-01T09:09:00.000Z,d1,3.0,1.0,6.0,",
108+
"2021-01-01T09:10:00.000Z,d1,1.0,1.0,1.0,",
109+
"2021-01-01T09:08:00.000Z,d2,2.0,2.0,2.0,",
110+
"2021-01-01T09:15:00.000Z,d2,4.0,2.0,4.0,",
111+
};
112+
tableResultSetEqualTest(
113+
"SELECT *, min(value) OVER (PARTITION BY device) as p1, sum(value) OVER (PARTITION BY device, value) as p2 FROM demo ORDER BY device, time",
114+
expectedHeader,
115+
retArray,
116+
DATABASE_NAME);
117+
}
118+
119+
@Test
120+
public void testPushDownFilterIntoWindow() {
121+
String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
122+
String[] retArray =
123+
new String[] {
124+
"2021-01-01T09:10:00.000Z,d1,1.0,1,",
125+
"2021-01-01T09:05:00.000Z,d1,3.0,2,",
126+
"2021-01-01T09:08:00.000Z,d2,2.0,1,",
127+
"2021-01-01T09:15:00.000Z,d2,4.0,2,",
128+
};
129+
tableResultSetEqualTest(
130+
"SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY device ORDER BY value) as rn FROM demo) WHERE rn <= 2 ORDER BY device, time",
131+
expectedHeader,
132+
retArray,
133+
DATABASE_NAME);
134+
}
135+
136+
@Test
137+
public void testPushDownLimitIntoWindow() {
138+
String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
139+
String[] retArray =
140+
new String[] {
141+
"2021-01-01T09:05:00.000Z,d1,3.0,2,", "2021-01-01T09:07:00.000Z,d1,5.0,4,",
142+
};
143+
tableResultSetEqualTest(
144+
"SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY device ORDER BY value) as rn FROM demo) ORDER BY device, time LIMIT 2 ",
145+
expectedHeader,
146+
retArray,
147+
DATABASE_NAME);
148+
}
149+
150+
@Test
151+
public void testReplaceWindowWithRowNumber() {
152+
String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
153+
String[] retArray =
154+
new String[] {
155+
"2021-01-01T09:05:00.000Z,d1,3.0,1,",
156+
"2021-01-01T09:07:00.000Z,d1,5.0,2,",
157+
"2021-01-01T09:09:00.000Z,d1,3.0,3,",
158+
"2021-01-01T09:10:00.000Z,d1,1.0,4,",
159+
"2021-01-01T09:08:00.000Z,d2,2.0,1,",
160+
"2021-01-01T09:15:00.000Z,d2,4.0,2,",
161+
};
162+
tableResultSetEqualTest(
163+
"SELECT *, row_number() OVER (PARTITION BY device) AS rn FROM demo ORDER BY device, time",
164+
expectedHeader,
165+
retArray,
166+
DATABASE_NAME);
167+
}
168+
169+
@Test
170+
public void testRemoveRedundantWindow() {
171+
String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
172+
String[] retArray = new String[] {};
173+
tableResultSetEqualTest(
174+
"SELECT *, row_number() OVER (PARTITION BY device) AS rn FROM demo WHERE 1 = 2",
175+
expectedHeader,
176+
retArray,
177+
DATABASE_NAME);
178+
}
179+
}

iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ public Map<String, String> getAttribute() {
6060
}
6161

6262
public boolean hasAttribute(final String key) {
63-
return attributes.containsKey(key) || attributes.containsKey(KeyReducer.reduce(key));
63+
return attributes.containsKey(key)
64+
|| attributes.containsKey(KeyReducer.shallowReduce(key))
65+
|| attributes.containsKey(KeyReducer.reduce(key));
6466
}
6567

6668
public boolean hasAnyAttributes(final String... keys) {
@@ -92,7 +94,11 @@ public void computeAttributeIfExists(
9294
}
9395

9496
public String getString(final String key) {
95-
final String value = attributes.get(key);
97+
String value = attributes.get(key);
98+
if (Objects.nonNull(value)) {
99+
return value;
100+
}
101+
value = attributes.get(KeyReducer.shallowReduce(key));
96102
return value != null ? value : attributes.get(KeyReducer.reduce(key));
97103
}
98104

@@ -380,6 +386,19 @@ private static class KeyReducer {
380386
SECOND_PREFIXES.add("opcua.");
381387
}
382388

389+
static String shallowReduce(String key) {
390+
if (key == null) {
391+
return null;
392+
}
393+
final String lowerCaseKey = key.toLowerCase();
394+
for (final String prefix : FIRST_PREFIXES) {
395+
if (lowerCaseKey.startsWith(prefix)) {
396+
return key.substring(prefix.length());
397+
}
398+
}
399+
return key;
400+
}
401+
383402
static String reduce(String key) {
384403
if (key == null) {
385404
return null;

0 commit comments

Comments
 (0)