diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index 8cfe183ebf0701..d09c1ef1c85ca2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -89,6 +89,10 @@ private long getPushDownCount() { return tableLevelRowCount; } + public long getTotalFileSize() { + return totalFileSize; + } + @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { StringBuilder output = new StringBuilder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 02a7af54da0f73..9e90df00403042 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -248,6 +248,11 @@ public void setLoadFileInfo(int fileNum, long fileSize) { this.loadStatistic.totalFileSizeB = fileSize; } + public void addLoadFileInfo(int fileNum, long fileSize) { + this.loadStatistic.fileNum += fileNum; + this.loadStatistic.totalFileSizeB += fileSize; + } + /** * Show table names for frontend * If table name could not be found by id, the table id will be used instead. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index ba666a1d5c632c..e618d1dffbcf2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -29,10 +29,12 @@ import org.apache.doris.common.profile.ProfileManager.ProfileType; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.FileScanNode; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.jdbc.JdbcExternalTable; import org.apache.doris.dictionary.Dictionary; +import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.load.loadv2.LoadStatistic; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.CascadesContext; @@ -67,6 +69,7 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.RelationUtil; import org.apache.doris.planner.DataSink; +import org.apache.doris.planner.PlanFragment; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ConnectContext.ConnectType; import org.apache.doris.qe.Coordinator; @@ -508,11 +511,29 @@ protected void doDistribute(boolean canUseNereidsDistributePlanner, ExplainLevel LOG.debug("insert into plan for query_id: {} is: {}.", DebugUtil.printId(ctx.queryId()), planner.getPhysicalPlan().treeString()); } + // step 4 BuildInsertExecutorResult build = executorFactoryRef.get().build(); + + // apply insert plan Statistic + applyInsertPlanStatistic(planner); return build; } + private void applyInsertPlanStatistic(FastInsertIntoValuesPlanner planner) { + LoadJob loadJob = Env.getCurrentEnv().getLoadManager().getLoadJob(getJobId()); + if (loadJob == null) { + return; + } + for (PlanFragment fragment : planner.getFragments()) { + if (fragment.getPlanRoot() instanceof FileScanNode) { + FileScanNode fileScanNode = (FileScanNode) fragment.getPlanRoot(); + Env.getCurrentEnv().getLoadManager().getLoadJob(getJobId()) + .addLoadFileInfo((int) fileScanNode.getSelectedSplitNum(), fileScanNode.getTotalFileSize()); + } + } + } + private void runInternal(ConnectContext ctx, StmtExecutor executor) throws Exception { AbstractInsertExecutor insertExecutor = initPlan(ctx, executor); // if the insert stmt data source is empty, directly return, no need to be executed. diff --git a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy index 93dc64dfd4ef96..d87b5aa17b8e42 100644 --- a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy +++ b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy @@ -108,7 +108,7 @@ suite("test_streaming_insert_job") { log.info("jobInfo: " + jobInfo) assert jobInfo.get(0).get(0) == "{\"endFile\":\"regression/load/data/example_1.csv\"}"; assert jobInfo.get(0).get(1) == "{\"endFile\":\"regression/load/data/example_1.csv\"}"; - assert jobInfo.get(0).get(2) == "{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}" + assert jobInfo.get(0).get(2) == "{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}" // alter streaming job sql """ diff --git a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy index 31f8421ff4213a..2971222e9a0ede 100644 --- a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy +++ b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy @@ -173,7 +173,7 @@ suite("test_streaming_insert_job_offset") { log.info("jobInfo: " + jobInfo) assert jobInfo.get(0).get(0) == "{\"endFile\":\"regression/load/data/example_1.csv\"}"; assert jobInfo.get(0).get(1) == "{\"endFile\":\"regression/load/data/example_1.csv\"}"; - assert jobInfo.get(0).get(2) == "{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":0,\"fileSize\":0}" + assert jobInfo.get(0).get(2) == "{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":1,\"fileSize\":138}" assert jobInfo.get(0).get(3) == "{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/example_0.csv\\\"}\"}" // alter job init offset, Lexicographic order includes example_[0-1] @@ -211,7 +211,7 @@ suite("test_streaming_insert_job_offset") { log.info("jobInfo: " + jobInfo) assert jobInfo.get(0).get(0) == "{\"endFile\":\"regression/load/data/example_1.csv\"}"; assert jobInfo.get(0).get(1) == "{\"endFile\":\"regression/load/data/example_1.csv\"}"; - assert jobInfo.get(0).get(2) == "{\"scannedRows\":30,\"loadBytes\":643,\"fileNumber\":0,\"fileSize\":0}" + assert jobInfo.get(0).get(2) == "{\"scannedRows\":30,\"loadBytes\":643,\"fileNumber\":3,\"fileSize\":394}" assert jobInfo.get(0).get(3) == "{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}" // has double example_1.csv and example_0.csv data diff --git a/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy index 34941325ca7189..f2e8bef87ab99e 100644 --- a/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy +++ b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy @@ -97,7 +97,7 @@ suite("test_streaming_job_restart_fe", "docker") { log.info("jobInfo: " + jobInfo) assert jobInfo.get(0).get(0) == "{\"endFile\":\"regression/load/data/example_1.csv\"}"; assert jobInfo.get(0).get(1) == "{\"endFile\":\"regression/load/data/example_1.csv\"}"; - assert jobInfo.get(0).get(2) == "{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}" + assert jobInfo.get(0).get(2) == "{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}" // Restart FE cluster.restartFrontends() @@ -115,7 +115,7 @@ suite("test_streaming_job_restart_fe", "docker") { log.info("jobInfo: " + jobInfo) assert jobInfo.get(0).get(0) == "{\"endFile\":\"regression/load/data/example_1.csv\"}"; assert jobInfo.get(0).get(1) == "{\"endFile\":\"regression/load/data/example_1.csv\"}"; - assert jobInfo.get(0).get(2) == "{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}" + assert jobInfo.get(0).get(2) == "{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}" sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """ sql """drop table if exists `${tableName}` force"""