Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,15 @@ public void executeQuery(String originStmt) throws Exception {
executor.getProfile().getSummaryProfile().setParseSqlStartTime(parseSqlStartTime);
executor.getProfile().getSummaryProfile().setParseSqlFinishTime(parseSqlFinishTime);
executor.getProfile().getSummaryProfile().parsedByConnectionProcess = true;
// Here we set the MoreStmtExists flag without considering CLIENT_MULTI_STATEMENTS.
// So the master will always set SERVER_MORE_RESULTS_EXISTS when the statement is not the last one.
// When the Follower/Observer received the return result of Master, the Follower/Observer
// will check CLIENT_MULTI_STATEMENTS is set or not. It sends SERVER_MORE_RESULTS_EXISTS back to client
// only when CLIENT_MULTI_STATEMENTS is set.
// See the code below : if (getConnectContext().getMysqlChannel().clientMultiStatements())
if (i != stmts.size() - 1 && connectType.equals(ConnectType.MYSQL)) {
executor.setMoreStmtExists(true);
}
ctx.setExecutor(executor);

if (cacheKeyType != null) {
Expand Down Expand Up @@ -706,6 +715,9 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException
result.setQueryId(ctx.queryId());
}
result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
if (request.moreResultExists) {
ctx.getState().serverStatus |= MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS;
}
result.setPacket(getResultPacket());
result.setStatus(ctx.getState().toString());
if (ctx.getState().getStateType() == MysqlStateType.OK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class FEOpExecutor {
protected int thriftTimeoutMs;

protected boolean shouldNotRetry;
protected boolean moreStmtExists = false;

public FEOpExecutor(TNetworkAddress feAddress, OriginStatement originStmt, ConnectContext ctx, boolean isQuery) {
this.feAddr = feAddress;
Expand Down Expand Up @@ -173,6 +174,7 @@ protected TMasterOpRequest buildStmtForwardParams() throws AnalysisException {
params.setStmtId(ctx.getStmtId());
params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
params.setSessionId(ctx.getSessionId());
params.setMoreResultExists(moreStmtExists);

if (Config.isCloudMode()) {
String cluster = "";
Expand Down Expand Up @@ -224,6 +226,9 @@ public String getErrMsg() {
return result.getErrMessage();
}

public void setMoreStmtExists(boolean moreStmtExists) {
this.moreStmtExists = moreStmtExists;
}

public ByteBuffer getOutputPacket() {
if (result == null) {
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ public class StmtExecutor {
private Boolean isForwardedToMaster = null;
// Flag for execute prepare statement, need to use binary protocol resultset
private boolean isComStmtExecute = false;
// Set to true if there are more stmt need to execute.
// Mainly for forward to master, so that master can set the mysql server status correctly.
private boolean moreStmtExists = false;

// The result schema if "dry_run_query" is true.
// Only one column to indicate the real return row numbers.
Expand Down Expand Up @@ -354,6 +357,14 @@ public boolean isForwardToMaster() {
return isForwardedToMaster;
}

public boolean isMoreStmtExists() {
return moreStmtExists;
}

public void setMoreStmtExists(boolean moreStmtExists) {
this.moreStmtExists = moreStmtExists;
}

private boolean shouldForwardToMaster() {
if (Env.getCurrentEnv().isMaster()) {
return false;
Expand Down Expand Up @@ -945,6 +956,7 @@ private void forwardToMaster() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("need to transfer to Master. stmt: {}", context.getStmtId());
}
masterOpExecutor.setMoreStmtExists(moreStmtExists);
masterOpExecutor.execute();
if (parsedStmt instanceof LogicalPlanAdapter) {
// for nereids command
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ struct TMasterOpRequest {
29: optional TTxnLoadInfo txnLoadInfo
30: optional TGroupCommitInfo groupCommitInfo
31: optional binary prepareExecuteBuffer
32: optional bool moreResultExists // Server has more result to send

// selectdb cloud
1000: optional string cloud_cluster
Expand Down
38 changes: 38 additions & 0 deletions regression-test/suites/query_p0/test_multiple_stmt.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_multiple_stmt") {
sql """drop database if exists test_multiple_stmt"""
sql """create database test_multiple_stmt"""
sql """use test_multiple_stmt"""
sql """CREATE TABLE test_multiple_stmt (
key1 int NOT NULL,
value1 int NOT NULL,
)ENGINE=OLAP
DUPLICATE KEY(`key1`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`key1`) BUCKETS 2
PROPERTIES (
"replication_num" = "1"
)
"""

def result = sql """insert into test_multiple_stmt values (1, 1); select * from test_multiple_stmt;"""
assertEquals(1, result.size())
assertEquals(1, result[0][0])
assertEquals(1, result[0][1])
}
Loading