Skip to content

Commit 5493d06

Browse files
committed
Set more stmt exists flag correctly when forward to master.
1 parent f3c94ed commit 5493d06

File tree

5 files changed

+68
-0
lines changed

5 files changed

+68
-0
lines changed

fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,15 @@ public void executeQuery(String originStmt) throws Exception {
347347
executor = new StmtExecutor(ctx, parsedStmt);
348348
executor.getProfile().getSummaryProfile().setParseSqlStartTime(parseSqlStartTime);
349349
executor.getProfile().getSummaryProfile().setParseSqlFinishTime(parseSqlFinishTime);
350+
// Here we set the MoreStmtExists flag without considering CLIENT_MULTI_STATEMENTS.
351+
// So the master will always set SERVER_MORE_RESULTS_EXISTS when the statement is not the last one.
352+
// When the Follower/Observer received the return result of Master, the Follower/Observer
353+
// will check CLIENT_MULTI_STATEMENTS is set or not. It sends SERVER_MORE_RESULTS_EXISTS back to client
354+
// only when CLIENT_MULTI_STATEMENTS is set.
355+
// See the code below : if (getConnectContext().getMysqlChannel().clientMultiStatements())
356+
if (i != stmts.size() - 1 && connectType.equals(ConnectType.MYSQL)) {
357+
executor.setMoreStmtExists(true);
358+
}
350359
ctx.setExecutor(executor);
351360

352361
if (cacheKeyType != null) {
@@ -705,6 +714,9 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException
705714
result.setQueryId(ctx.queryId());
706715
}
707716
result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
717+
if (request.moreResultExists) {
718+
ctx.getState().serverStatus |= MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS;
719+
}
708720
result.setPacket(getResultPacket());
709721
result.setStatus(ctx.getState().toString());
710722
if (ctx.getState().getStateType() == MysqlStateType.OK) {

fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class FEOpExecutor {
6363
protected int thriftTimeoutMs;
6464

6565
protected boolean shouldNotRetry;
66+
protected boolean moreStmtExists = false;
6667

6768
public FEOpExecutor(TNetworkAddress feAddress, OriginStatement originStmt, ConnectContext ctx, boolean isQuery) {
6869
this.feAddr = feAddress;
@@ -173,6 +174,7 @@ protected TMasterOpRequest buildStmtForwardParams() throws AnalysisException {
173174
params.setStmtId(ctx.getStmtId());
174175
params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
175176
params.setSessionId(ctx.getSessionId());
177+
params.setMoreResultExists(moreStmtExists);
176178

177179
if (Config.isCloudMode()) {
178180
String cluster = "";
@@ -224,6 +226,9 @@ public String getErrMsg() {
224226
return result.getErrMessage();
225227
}
226228

229+
public void setMoreStmtExists(boolean moreStmtExists) {
230+
this.moreStmtExists = moreStmtExists;
231+
}
227232

228233
public ByteBuffer getOutputPacket() {
229234
if (result == null) {

fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ public class StmtExecutor {
201201
private Boolean isForwardedToMaster = null;
202202
// Flag for execute prepare statement, need to use binary protocol resultset
203203
private boolean isComStmtExecute = false;
204+
// Set to true if there are more stmt need to execute.
205+
// Mainly for forward to master, so that master can set the mysql server status correctly.
206+
private boolean moreStmtExists = false;
204207

205208
// The result schema if "dry_run_query" is true.
206209
// Only one column to indicate the real return row numbers.
@@ -347,6 +350,14 @@ public boolean isForwardToMaster() {
347350
return isForwardedToMaster;
348351
}
349352

353+
public boolean isMoreStmtExists() {
354+
return moreStmtExists;
355+
}
356+
357+
public void setMoreStmtExists(boolean moreStmtExists) {
358+
this.moreStmtExists = moreStmtExists;
359+
}
360+
350361
private boolean shouldForwardToMaster() {
351362
if (Env.getCurrentEnv().isMaster()) {
352363
return false;
@@ -937,6 +948,7 @@ private void forwardToMaster() throws Exception {
937948
if (LOG.isDebugEnabled()) {
938949
LOG.debug("need to transfer to Master. stmt: {}", context.getStmtId());
939950
}
951+
masterOpExecutor.setMoreStmtExists(moreStmtExists);
940952
masterOpExecutor.execute();
941953
if (parsedStmt instanceof LogicalPlanAdapter) {
942954
// for nereids command

gensrc/thrift/FrontendService.thrift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,7 @@ struct TMasterOpRequest {
383383
29: optional TTxnLoadInfo txnLoadInfo
384384
30: optional TGroupCommitInfo groupCommitInfo
385385
31: optional binary prepareExecuteBuffer
386+
32: optional bool moreResultExists // Server has more result to send
386387

387388
// selectdb cloud
388389
1000: optional string cloud_cluster
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("test_multiple_stmt") {
19+
sql """drop database if exists test_multiple_stmt"""
20+
sql """create database test_multiple_stmt"""
21+
sql """use test_multiple_stmt"""
22+
sql """CREATE TABLE test_multiple_stmt (
23+
key1 int NOT NULL,
24+
value1 int NOT NULL,
25+
)ENGINE=OLAP
26+
DUPLICATE KEY(`key1`)
27+
COMMENT "OLAP"
28+
DISTRIBUTED BY HASH(`key1`) BUCKETS 2
29+
PROPERTIES (
30+
"replication_num" = "1"
31+
)
32+
"""
33+
34+
def result = sql """insert into test_multiple_stmt values (1, 1); select * from test_multiple_stmt;"""
35+
assertEquals(1, result.size())
36+
assertEquals(1, result[0][0])
37+
assertEquals(1, result[0][1])
38+
}

0 commit comments

Comments
 (0)