Skip to content

Commit 7a59633

Browse files
authored
[fix](mysql protocol)Set more stmt exists flag correctly when forward to master. (#55711)
When execute multiple statements in one batch and CLIENT_MULTI_STATEMENTS is set, Doris server need to set SERVER_MORE_RESULTS_EXISTS flag in the return packet before the last statement. But when the Observer forward stmt to Master, this SERVER_MORE_RESULTS_EXISTS is not set, cause the following statements failed to execute. This pr forward a boolean value to Master, so the Master FE knows it is the last statement or not, and could set SERVER_MORE_RESULTS_EXISTS correctly.
1 parent d23a87d commit 7a59633

File tree

5 files changed

+68
-0
lines changed

5 files changed

+68
-0
lines changed

fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,15 @@ public void executeQuery(String originStmt) throws Exception {
348348
executor.getProfile().getSummaryProfile().setParseSqlStartTime(parseSqlStartTime);
349349
executor.getProfile().getSummaryProfile().setParseSqlFinishTime(parseSqlFinishTime);
350350
executor.getProfile().getSummaryProfile().parsedByConnectionProcess = true;
351+
// Here we set the MoreStmtExists flag without considering CLIENT_MULTI_STATEMENTS.
352+
// So the master will always set SERVER_MORE_RESULTS_EXISTS when the statement is not the last one.
353+
// When the Follower/Observer received the return result of Master, the Follower/Observer
354+
// will check CLIENT_MULTI_STATEMENTS is set or not. It sends SERVER_MORE_RESULTS_EXISTS back to client
355+
// only when CLIENT_MULTI_STATEMENTS is set.
356+
// See the code below : if (getConnectContext().getMysqlChannel().clientMultiStatements())
357+
if (i != stmts.size() - 1 && connectType.equals(ConnectType.MYSQL)) {
358+
executor.setMoreStmtExists(true);
359+
}
351360
ctx.setExecutor(executor);
352361

353362
if (cacheKeyType != null) {
@@ -706,6 +715,9 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException
706715
result.setQueryId(ctx.queryId());
707716
}
708717
result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
718+
if (request.moreResultExists) {
719+
ctx.getState().serverStatus |= MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS;
720+
}
709721
result.setPacket(getResultPacket());
710722
result.setStatus(ctx.getState().toString());
711723
if (ctx.getState().getStateType() == MysqlStateType.OK) {

fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class FEOpExecutor {
6363
protected int thriftTimeoutMs;
6464

6565
protected boolean shouldNotRetry;
66+
protected boolean moreStmtExists = false;
6667

6768
public FEOpExecutor(TNetworkAddress feAddress, OriginStatement originStmt, ConnectContext ctx, boolean isQuery) {
6869
this.feAddr = feAddress;
@@ -173,6 +174,7 @@ protected TMasterOpRequest buildStmtForwardParams() throws AnalysisException {
173174
params.setStmtId(ctx.getStmtId());
174175
params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
175176
params.setSessionId(ctx.getSessionId());
177+
params.setMoreResultExists(moreStmtExists);
176178

177179
if (Config.isCloudMode()) {
178180
String cluster = "";
@@ -224,6 +226,9 @@ public String getErrMsg() {
224226
return result.getErrMessage();
225227
}
226228

229+
public void setMoreStmtExists(boolean moreStmtExists) {
230+
this.moreStmtExists = moreStmtExists;
231+
}
227232

228233
public ByteBuffer getOutputPacket() {
229234
if (result == null) {

fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ public class StmtExecutor {
201201
private Boolean isForwardedToMaster = null;
202202
// Flag for execute prepare statement, need to use binary protocol resultset
203203
private boolean isComStmtExecute = false;
204+
// Set to true if there are more stmt need to execute.
205+
// Mainly for forward to master, so that master can set the mysql server status correctly.
206+
private boolean moreStmtExists = false;
204207

205208
// The result schema if "dry_run_query" is true.
206209
// Only one column to indicate the real return row numbers.
@@ -354,6 +357,14 @@ public boolean isForwardToMaster() {
354357
return isForwardedToMaster;
355358
}
356359

360+
public boolean isMoreStmtExists() {
361+
return moreStmtExists;
362+
}
363+
364+
public void setMoreStmtExists(boolean moreStmtExists) {
365+
this.moreStmtExists = moreStmtExists;
366+
}
367+
357368
private boolean shouldForwardToMaster() {
358369
if (Env.getCurrentEnv().isMaster()) {
359370
return false;
@@ -945,6 +956,7 @@ private void forwardToMaster() throws Exception {
945956
if (LOG.isDebugEnabled()) {
946957
LOG.debug("need to transfer to Master. stmt: {}", context.getStmtId());
947958
}
959+
masterOpExecutor.setMoreStmtExists(moreStmtExists);
948960
masterOpExecutor.execute();
949961
if (parsedStmt instanceof LogicalPlanAdapter) {
950962
// for nereids command

gensrc/thrift/FrontendService.thrift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,7 @@ struct TMasterOpRequest {
383383
29: optional TTxnLoadInfo txnLoadInfo
384384
30: optional TGroupCommitInfo groupCommitInfo
385385
31: optional binary prepareExecuteBuffer
386+
32: optional bool moreResultExists // Server has more result to send
386387

387388
// selectdb cloud
388389
1000: optional string cloud_cluster
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("test_multiple_stmt") {
19+
sql """drop database if exists test_multiple_stmt"""
20+
sql """create database test_multiple_stmt"""
21+
sql """use test_multiple_stmt"""
22+
sql """CREATE TABLE test_multiple_stmt (
23+
key1 int NOT NULL,
24+
value1 int NOT NULL,
25+
)ENGINE=OLAP
26+
DUPLICATE KEY(`key1`)
27+
COMMENT "OLAP"
28+
DISTRIBUTED BY HASH(`key1`) BUCKETS 2
29+
PROPERTIES (
30+
"replication_num" = "1"
31+
)
32+
"""
33+
34+
def result = sql """insert into test_multiple_stmt values (1, 1); select * from test_multiple_stmt;"""
35+
assertEquals(1, result.size())
36+
assertEquals(1, result[0][0])
37+
assertEquals(1, result[0][1])
38+
}

0 commit comments

Comments
 (0)