From fa3edfddb94923b318e28178ed0e4fe2607daf3a Mon Sep 17 00:00:00 2001 From: gaoliang Date: Tue, 25 Jun 2024 16:50:14 +0800 Subject: [PATCH] [Feature][chunjun-core] Supports capturing dirty data from the source and when the source sends it downstream #1901 --- .../source/DtInputFormatSourceFunction.java | 22 ++++++++++++++----- .../source/format/BaseRichInputFormat.java | 11 ++-------- .../throwable/ReadRecordException.java | 4 +++- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/source/DtInputFormatSourceFunction.java b/chunjun-core/src/main/java/com/dtstack/chunjun/source/DtInputFormatSourceFunction.java index 9e589b4089..221ac4f57b 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/source/DtInputFormatSourceFunction.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/source/DtInputFormatSourceFunction.java @@ -18,6 +18,9 @@ package com.dtstack.chunjun.source; +import com.dtstack.chunjun.constants.Metrics; +import com.dtstack.chunjun.dirty.manager.DirtyManager; +import com.dtstack.chunjun.metrics.AccumulatorCollector; import com.dtstack.chunjun.restore.FormatState; import com.dtstack.chunjun.source.format.BaseRichInputFormat; import com.dtstack.chunjun.util.ExceptionUtil; @@ -112,19 +115,28 @@ public void run(SourceContext ctx) throws Exception { if (isRunning && format instanceof RichInputFormat) { ((RichInputFormat) format).openInputFormat(); } - OUT nextElement = serializer.createInstance(); while (isRunning) { format.open(splitIterator.next()); - + AccumulatorCollector accumulatorCollector = + ((BaseRichInputFormat) format).getAccumulatorCollector(); + DirtyManager dirtyManager = ((BaseRichInputFormat) format).getDirtyManager(); // for each element we also check if cancel // was called by checking the isRunning flag while (isRunning && !format.reachedEnd()) { synchronized (ctx.getCheckpointLock()) { - nextElement = format.nextRecord(nextElement); - if (nextElement != null) { - ctx.collect(nextElement); + try { + nextElement = format.nextRecord(nextElement); + if (nextElement != null) { + ctx.collect(nextElement); + } + } catch (Exception e) { + // 脏数据总数应是所有slot的脏数据总数,而不是单个的 + long globalErrors = + accumulatorCollector.getAccumulatorValue( + Metrics.NUM_ERRORS, false); + dirtyManager.collect(nextElement, e, null, globalErrors); } } } diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/source/format/BaseRichInputFormat.java b/chunjun-core/src/main/java/com/dtstack/chunjun/source/format/BaseRichInputFormat.java index ee2c77f4b4..210645b548 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/source/format/BaseRichInputFormat.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/source/format/BaseRichInputFormat.java @@ -190,18 +190,11 @@ public void openInputFormat() throws IOException { } @Override - public RowData nextRecord(RowData rowData) { + public RowData nextRecord(RowData rowData) throws ReadRecordException { if (byteRateLimiter != null) { byteRateLimiter.acquire(); } - RowData internalRow = null; - try { - internalRow = nextRecordInternal(rowData); - } catch (ReadRecordException e) { - // 脏数据总数应是所有slot的脏数据总数,而不是单个的 - long globalErrors = accumulatorCollector.getAccumulatorValue(Metrics.NUM_ERRORS, false); - dirtyManager.collect(e.getRowData(), e, null, globalErrors); - } + RowData internalRow = nextRecordInternal(rowData); if (internalRow != null) { updateDuration(); if (numReadCounter != null) { diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/throwable/ReadRecordException.java b/chunjun-core/src/main/java/com/dtstack/chunjun/throwable/ReadRecordException.java index ff760bca0b..0beae83051 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/throwable/ReadRecordException.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/throwable/ReadRecordException.java @@ -18,8 +18,10 @@ package com.dtstack.chunjun.throwable; +import java.io.IOException; + /** The Exception describing errors when read a record */ -public class ReadRecordException extends Exception { +public class ReadRecordException extends IOException { private static final long serialVersionUID = 453087894656079820L; private final int colIndex;