Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,15 @@ public void executeQuery(MysqlCommand mysqlCommand, String originStmt) throws Ex
executor = new StmtExecutor(ctx, parsedStmt);
executor.getProfile().getSummaryProfile().setParseSqlStartTime(parseSqlStartTime);
executor.getProfile().getSummaryProfile().setParseSqlFinishTime(parseSqlFinishTime);
// Here we set the MoreStmtExists flag without considering CLIENT_MULTI_STATEMENTS.
// So the master will always set SERVER_MORE_RESULTS_EXISTS when the statement is not the last one.
// When the Follower/Observer received the return result of Master, the Follower/Observer
// will check CLIENT_MULTI_STATEMENTS is set or not. It sends SERVER_MORE_RESULTS_EXISTS back to client
// only when CLIENT_MULTI_STATEMENTS is set.
// See the code below : if (getConnectContext().getMysqlChannel().clientMultiStatements())
if (i != stmts.size() - 1 && connectType.equals(ConnectType.MYSQL)) {
executor.setMoreStmtExists(true);
}
ctx.setExecutor(executor);

if (cacheKeyType != null) {
Expand Down Expand Up @@ -782,6 +791,9 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException
result.setQueryId(ctx.queryId());
}
result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
if (request.moreResultExists) {
ctx.getState().serverStatus |= MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS;
}
result.setPacket(getResultPacket());
result.setStatus(ctx.getState().toString());
if (ctx.getState().getStateType() == MysqlStateType.OK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public class MasterOpExecutor {

private boolean shouldNotRetry;

protected boolean moreStmtExists = false;

public MasterOpExecutor(OriginStatement originStmt, ConnectContext ctx, RedirectStatus status, boolean isQuery) {
this.originStmt = originStmt;
this.ctx = ctx;
Expand Down Expand Up @@ -175,6 +177,7 @@ private TMasterOpRequest buildStmtForwardParams() {
params.setUserIp(ctx.getRemoteIP());
params.setStmtId(ctx.getStmtId());
params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
params.setMoreResultExists(moreStmtExists);

// query options
params.setQueryOptions(ctx.getSessionVariable().getQueryOptionVariables());
Expand Down Expand Up @@ -241,6 +244,10 @@ private TMasterOpRequest buildUpdateLoadDataParams(long tableId, long receiveDat
return params;
}

public void setMoreStmtExists(boolean moreStmtExists) {
this.moreStmtExists = moreStmtExists;
}

public ByteBuffer getOutputPacket() {
if (result == null) {
return null;
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ public class StmtExecutor {
private Boolean isForwardedToMaster = null;
// Flag for execute prepare statement, need to use binary protocol resultset
private boolean isComStmtExecute = false;
// Set to true if there are more stmt need to execute.
// Mainly for forward to master, so that master can set the mysql server status correctly.
private boolean moreStmtExists = false;

private ExecuteStmt execStmt;
PrepareStmtContext preparedStmtCtx = null;
Expand Down Expand Up @@ -429,6 +432,14 @@ public boolean isForwardToMaster() {
return isForwardedToMaster;
}

public boolean isMoreStmtExists() {
return moreStmtExists;
}

public void setMoreStmtExists(boolean moreStmtExists) {
this.moreStmtExists = moreStmtExists;
}

private boolean shouldForwardToMaster() {
if (Env.getCurrentEnv().isMaster()) {
return false;
Expand Down Expand Up @@ -1134,6 +1145,7 @@ private void forwardToMaster() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("need to transfer to Master. stmt: {}", context.getStmtId());
}
masterOpExecutor.setMoreStmtExists(moreStmtExists);
masterOpExecutor.execute();
if (parsedStmt instanceof SetStmt) {
SetStmt setStmt = (SetStmt) parsedStmt;
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ struct TMasterOpRequest {
29: optional TTxnLoadInfo txnLoadInfo
30: optional TGroupCommitInfo groupCommitInfo
31: optional binary prepareExecuteBuffer
32: optional bool moreResultExists // Server has more result to send
}

struct TColumnDefinition {
Expand Down
38 changes: 38 additions & 0 deletions regression-test/suites/query_p0/test_multiple_stmt.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_multiple_stmt") {
sql """drop database if exists test_multiple_stmt"""
sql """create database test_multiple_stmt"""
sql """use test_multiple_stmt"""
sql """CREATE TABLE test_multiple_stmt (
key1 int NOT NULL,
value1 int NOT NULL,
)ENGINE=OLAP
DUPLICATE KEY(`key1`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`key1`) BUCKETS 2
PROPERTIES (
"replication_num" = "1"
)
"""

def result = sql """insert into test_multiple_stmt values (1, 1); select * from test_multiple_stmt;"""
assertEquals(1, result.size())
assertEquals(1, result[0][0])
assertEquals(1, result[0][1])
}
Loading