Skip to content

Commit 4b9bd82

Browse files
authored
branch-3.0: [fix](mysql protocol)Set more stmt exists flag correctly when forward to master. (#55711) (#55868)
backport: #55711
1 parent 5ae407c commit 4b9bd82

File tree

5 files changed

+70
-0
lines changed

5 files changed

+70
-0
lines changed

fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,15 @@ public void executeQuery(String originStmt) throws Exception {
354354
executor = new StmtExecutor(ctx, parsedStmt);
355355
executor.getProfile().getSummaryProfile().setParseSqlStartTime(parseSqlStartTime);
356356
executor.getProfile().getSummaryProfile().setParseSqlFinishTime(parseSqlFinishTime);
357+
// Here we set the MoreStmtExists flag without considering CLIENT_MULTI_STATEMENTS.
358+
// So the master will always set SERVER_MORE_RESULTS_EXISTS when the statement is not the last one.
359+
// When the Follower/Observer received the return result of Master, the Follower/Observer
360+
// will check CLIENT_MULTI_STATEMENTS is set or not. It sends SERVER_MORE_RESULTS_EXISTS back to client
361+
// only when CLIENT_MULTI_STATEMENTS is set.
362+
// See the code below : if (getConnectContext().getMysqlChannel().clientMultiStatements())
363+
if (i != stmts.size() - 1 && connectType.equals(ConnectType.MYSQL)) {
364+
executor.setMoreStmtExists(true);
365+
}
357366
ctx.setExecutor(executor);
358367

359368
if (cacheKeyType != null) {
@@ -771,6 +780,9 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException
771780
result.setQueryId(ctx.queryId());
772781
}
773782
result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
783+
if (request.moreResultExists) {
784+
ctx.getState().serverStatus |= MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS;
785+
}
774786
result.setPacket(getResultPacket());
775787
result.setStatus(ctx.getState().toString());
776788
if (ctx.getState().getStateType() == MysqlStateType.OK) {

fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ public class MasterOpExecutor {
6666

6767
private boolean shouldNotRetry;
6868

69+
protected boolean moreStmtExists = false;
70+
6971
public MasterOpExecutor(OriginStatement originStmt, ConnectContext ctx, RedirectStatus status, boolean isQuery) {
7072
this.originStmt = originStmt;
7173
this.ctx = ctx;
@@ -212,6 +214,7 @@ private TMasterOpRequest buildStmtForwardParams() throws AnalysisException {
212214
params.setUserIp(ctx.getRemoteIP());
213215
params.setStmtId(ctx.getStmtId());
214216
params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
217+
params.setMoreResultExists(moreStmtExists);
215218

216219
if (Config.isCloudMode()) {
217220
String cluster = "";
@@ -295,6 +298,10 @@ private TMasterOpRequest buildUpdateLoadDataParams(long tableId, long receiveDat
295298
return params;
296299
}
297300

301+
public void setMoreStmtExists(boolean moreStmtExists) {
302+
this.moreStmtExists = moreStmtExists;
303+
}
304+
298305
public ByteBuffer getOutputPacket() {
299306
if (result == null) {
300307
return null;

fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,9 @@ public class StmtExecutor {
290290
private Boolean isForwardedToMaster = null;
291291
// Flag for execute prepare statement, need to use binary protocol resultset
292292
private boolean isComStmtExecute = false;
293+
// Set to true if there are more stmt need to execute.
294+
// Mainly for forward to master, so that master can set the mysql server status correctly.
295+
private boolean moreStmtExists = false;
293296

294297
// The result schema if "dry_run_query" is true.
295298
// Only one column to indicate the real return row numbers.
@@ -433,6 +436,14 @@ public boolean isForwardToMaster() {
433436
return isForwardedToMaster;
434437
}
435438

439+
public boolean isMoreStmtExists() {
440+
return moreStmtExists;
441+
}
442+
443+
public void setMoreStmtExists(boolean moreStmtExists) {
444+
this.moreStmtExists = moreStmtExists;
445+
}
446+
436447
private boolean shouldForwardToMaster() {
437448
if (Env.getCurrentEnv().isMaster()) {
438449
return false;
@@ -1233,6 +1244,7 @@ private void forwardToMaster() throws Exception {
12331244
if (LOG.isDebugEnabled()) {
12341245
LOG.debug("need to transfer to Master. stmt: {}", context.getStmtId());
12351246
}
1247+
masterOpExecutor.setMoreStmtExists(moreStmtExists);
12361248
masterOpExecutor.execute();
12371249
if (parsedStmt instanceof SetStmt) {
12381250
SetStmt setStmt = (SetStmt) parsedStmt;

gensrc/thrift/FrontendService.thrift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,7 @@ struct TMasterOpRequest {
598598
29: optional TTxnLoadInfo txnLoadInfo
599599
30: optional TGroupCommitInfo groupCommitInfo
600600
31: optional binary prepareExecuteBuffer
601+
32: optional bool moreResultExists // Server has more result to send
601602

602603
// selectdb cloud
603604
1000: optional string cloud_cluster
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("test_multiple_stmt") {
19+
sql """drop database if exists test_multiple_stmt"""
20+
sql """create database test_multiple_stmt"""
21+
sql """use test_multiple_stmt"""
22+
sql """CREATE TABLE test_multiple_stmt (
23+
key1 int NOT NULL,
24+
value1 int NOT NULL,
25+
)ENGINE=OLAP
26+
DUPLICATE KEY(`key1`)
27+
COMMENT "OLAP"
28+
DISTRIBUTED BY HASH(`key1`) BUCKETS 2
29+
PROPERTIES (
30+
"replication_num" = "1"
31+
)
32+
"""
33+
34+
def result = sql """insert into test_multiple_stmt values (1, 1); select * from test_multiple_stmt;"""
35+
assertEquals(1, result.size())
36+
assertEquals(1, result[0][0])
37+
assertEquals(1, result[0][1])
38+
}

0 commit comments

Comments
 (0)