Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ private long getPushDownCount() {
return tableLevelRowCount;
}

public long getTotalFileSize() {
return totalFileSize;
}

@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,11 @@ public void setLoadFileInfo(int fileNum, long fileSize) {
this.loadStatistic.totalFileSizeB = fileSize;
}

public void addLoadFileInfo(int fileNum, long fileSize) {
this.loadStatistic.fileNum += fileNum;
this.loadStatistic.totalFileSizeB += fileSize;
}

/**
* Show table names for frontend
* If table name could not be found by id, the table id will be used instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
import org.apache.doris.common.profile.ProfileManager.ProfileType;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.FileScanNode;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.dictionary.Dictionary;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.CascadesContext;
Expand Down Expand Up @@ -69,6 +71,7 @@
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.Coordinator;
Expand Down Expand Up @@ -529,11 +532,29 @@ protected void doDistribute(boolean canUseNereidsDistributePlanner, ExplainLevel
LOG.debug("insert into plan for query_id: {} is: {}.", DebugUtil.printId(ctx.queryId()),
planner.getPhysicalPlan().treeString());
}

// step 4
BuildInsertExecutorResult build = executorFactoryRef.get().build();

// apply insert plan Statistic
applyInsertPlanStatistic(planner);
return build;
}

private void applyInsertPlanStatistic(FastInsertIntoValuesPlanner planner) {
LoadJob loadJob = Env.getCurrentEnv().getLoadManager().getLoadJob(getJobId());
if (loadJob == null) {
return;
}
for (PlanFragment fragment : planner.getFragments()) {
if (fragment.getPlanRoot() instanceof FileScanNode) {
FileScanNode fileScanNode = (FileScanNode) fragment.getPlanRoot();
Env.getCurrentEnv().getLoadManager().getLoadJob(getJobId())
.addLoadFileInfo((int) fileScanNode.getSelectedSplitNum(), fileScanNode.getTotalFileSize());
}
}
}

private void runInternal(ConnectContext ctx, StmtExecutor executor) throws Exception {
AbstractInsertExecutor insertExecutor = initPlan(ctx, executor);
// if the insert stmt data source is empty, directly return, no need to be executed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ suite("test_streaming_insert_job") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) == "{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) == "{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(2) == "{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}"
assert jobInfo.get(0).get(2) == "{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}"

// alter streaming job
sql """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ suite("test_streaming_insert_job_offset") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) == "{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) == "{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(2) == "{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":0,\"fileSize\":0}"
assert jobInfo.get(0).get(2) == "{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":1,\"fileSize\":138}"
assert jobInfo.get(0).get(3) == "{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/example_0.csv\\\"}\"}"

// alter job init offset, Lexicographic order includes example_[0-1]
Expand Down Expand Up @@ -211,7 +211,7 @@ suite("test_streaming_insert_job_offset") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) == "{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) == "{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(2) == "{\"scannedRows\":30,\"loadBytes\":643,\"fileNumber\":0,\"fileSize\":0}"
assert jobInfo.get(0).get(2) == "{\"scannedRows\":30,\"loadBytes\":643,\"fileNumber\":3,\"fileSize\":394}"
assert jobInfo.get(0).get(3) == "{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"

// has double example_1.csv and example_0.csv data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ suite("test_streaming_job_restart_fe", "docker") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) == "{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) == "{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(2) == "{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}"
assert jobInfo.get(0).get(2) == "{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}"

// Restart FE
cluster.restartFrontends()
Expand All @@ -115,7 +115,7 @@ suite("test_streaming_job_restart_fe", "docker") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) == "{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) == "{\"endFile\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(2) == "{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}"
assert jobInfo.get(0).get(2) == "{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}"

sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """
sql """drop table if exists `${tableName}` force"""
Expand Down
Loading