diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index af54a66d3df75e..9237e272e508be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -344,6 +344,15 @@ public void executeQuery(MysqlCommand mysqlCommand, String originStmt) throws Ex executor = new StmtExecutor(ctx, parsedStmt); executor.getProfile().getSummaryProfile().setParseSqlStartTime(parseSqlStartTime); executor.getProfile().getSummaryProfile().setParseSqlFinishTime(parseSqlFinishTime); + // Here we set the MoreStmtExists flag without considering CLIENT_MULTI_STATEMENTS. + // So the master will always set SERVER_MORE_RESULTS_EXISTS when the statement is not the last one. + // When the Follower/Observer received the return result of Master, the Follower/Observer + // will check CLIENT_MULTI_STATEMENTS is set or not. It sends SERVER_MORE_RESULTS_EXISTS back to client + // only when CLIENT_MULTI_STATEMENTS is set. + // See the code below : if (getConnectContext().getMysqlChannel().clientMultiStatements()) + if (i != stmts.size() - 1 && connectType.equals(ConnectType.MYSQL)) { + executor.setMoreStmtExists(true); + } ctx.setExecutor(executor); if (cacheKeyType != null) { @@ -782,6 +791,9 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException result.setQueryId(ctx.queryId()); } result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId()); + if (request.moreResultExists) { + ctx.getState().serverStatus |= MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS; + } result.setPacket(getResultPacket()); result.setStatus(ctx.getState().toString()); if (ctx.getState().getStateType() == MysqlStateType.OK) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index 0e7b5a2f473d88..2a2ea2f5f0780d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -60,6 +60,8 @@ public class MasterOpExecutor { private boolean shouldNotRetry; + protected boolean moreStmtExists = false; + public MasterOpExecutor(OriginStatement originStmt, ConnectContext ctx, RedirectStatus status, boolean isQuery) { this.originStmt = originStmt; this.ctx = ctx; @@ -175,6 +177,7 @@ private TMasterOpRequest buildStmtForwardParams() { params.setUserIp(ctx.getRemoteIP()); params.setStmtId(ctx.getStmtId()); params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + params.setMoreResultExists(moreStmtExists); // query options params.setQueryOptions(ctx.getSessionVariable().getQueryOptionVariables()); @@ -241,6 +244,10 @@ private TMasterOpRequest buildUpdateLoadDataParams(long tableId, long receiveDat return params; } + public void setMoreStmtExists(boolean moreStmtExists) { + this.moreStmtExists = moreStmtExists; + } + public ByteBuffer getOutputPacket() { if (result == null) { return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 8f0e1f41c414c5..1b1a5bfe290b43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -283,6 +283,9 @@ public class StmtExecutor { private Boolean isForwardedToMaster = null; // Flag for execute prepare statement, need to use binary protocol resultset private boolean isComStmtExecute = false; + // Set to true if there are more stmt need to execute. + // Mainly for forward to master, so that master can set the mysql server status correctly. + private boolean moreStmtExists = false; private ExecuteStmt execStmt; PrepareStmtContext preparedStmtCtx = null; @@ -429,6 +432,14 @@ public boolean isForwardToMaster() { return isForwardedToMaster; } + public boolean isMoreStmtExists() { + return moreStmtExists; + } + + public void setMoreStmtExists(boolean moreStmtExists) { + this.moreStmtExists = moreStmtExists; + } + private boolean shouldForwardToMaster() { if (Env.getCurrentEnv().isMaster()) { return false; @@ -1134,6 +1145,7 @@ private void forwardToMaster() throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("need to transfer to Master. stmt: {}", context.getStmtId()); } + masterOpExecutor.setMoreStmtExists(moreStmtExists); masterOpExecutor.execute(); if (parsedStmt instanceof SetStmt) { SetStmt setStmt = (SetStmt) parsedStmt; diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 8903eba29e4a50..45edb0ef6f56c5 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -572,6 +572,7 @@ struct TMasterOpRequest { 29: optional TTxnLoadInfo txnLoadInfo 30: optional TGroupCommitInfo groupCommitInfo 31: optional binary prepareExecuteBuffer + 32: optional bool moreResultExists // Server has more result to send } struct TColumnDefinition { diff --git a/regression-test/suites/query_p0/test_multiple_stmt.groovy b/regression-test/suites/query_p0/test_multiple_stmt.groovy new file mode 100644 index 00000000000000..4a7e75816d46c7 --- /dev/null +++ b/regression-test/suites/query_p0/test_multiple_stmt.groovy @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_multiple_stmt") { + sql """drop database if exists test_multiple_stmt""" + sql """create database test_multiple_stmt""" + sql """use test_multiple_stmt""" + sql """CREATE TABLE test_multiple_stmt ( + key1 int NOT NULL, + value1 int NOT NULL, + )ENGINE=OLAP + DUPLICATE KEY(`key1`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`key1`) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + def result = sql """insert into test_multiple_stmt values (1, 1); select * from test_multiple_stmt;""" + assertEquals(1, result.size()) + assertEquals(1, result[0][0]) + assertEquals(1, result[0][1]) +} \ No newline at end of file