Skip to content

Commit 0caa819

Browse files
authored
[FLINK-38495][table-runtime] Introduce cache in delta join operator (#27099)
1 parent 1a7c0f1 commit 0caa819

File tree

10 files changed

+1016
-69
lines changed

10 files changed

+1016
-69
lines changed

docs/layouts/shortcodes/generated/execution_config_configuration.html

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,24 @@
156156
<td>Boolean</td>
157157
<td>Set whether to compact the changes sent downstream in row-time mini-batch. If true, Flink will compact changes and send only the latest change downstream. Note that if the downstream needs the details of versioned data, this optimization cannot be applied. If false, Flink will send all changes to downstream just like when the mini-batch is not enabled.</td>
158158
</tr>
159+
<tr>
160+
<td><h5>table.exec.delta-join.cache-enabled</h5><br> <span class="label label-primary">Streaming</span></td>
161+
<td style="word-wrap: break-word;">true</td>
162+
<td>Boolean</td>
163+
<td>Whether to enable the cache of delta join. If enabled, the delta join caches the records from remote dim table. Default is true.</td>
164+
</tr>
165+
<tr>
166+
<td><h5>table.exec.delta-join.left.cache-size</h5><br> <span class="label label-primary">Streaming</span></td>
167+
<td style="word-wrap: break-word;">10000</td>
168+
<td>Long</td>
169+
<td>The cache size used to cache the lookup results of the left table in delta join. This value must be positive when enabling cache. Default is 10000.</td>
170+
</tr>
171+
<tr>
172+
<td><h5>table.exec.delta-join.right.cache-size</h5><br> <span class="label label-primary">Streaming</span></td>
173+
<td style="word-wrap: break-word;">10000</td>
174+
<td>Long</td>
175+
<td>The cache size used to cache the lookup results of the right table in delta join. This value must be positive when enabling cache. Default is 10000.</td>
176+
</tr>
159177
<tr>
160178
<td><h5>table.exec.disabled-operators</h5><br> <span class="label label-primary">Batch</span></td>
161179
<td style="word-wrap: break-word;">(none)</td>

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -838,6 +838,33 @@ public class ExecutionConfigOptions {
838838
"Set whether to use the SQL/Table operators based on the asynchronous state api. "
839839
+ "Default value is false.");
840840

841+
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
842+
public static final ConfigOption<Boolean> TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED =
843+
key("table.exec.delta-join.cache-enabled")
844+
.booleanType()
845+
.defaultValue(true)
846+
.withDescription(
847+
"Whether to enable the cache of delta join. If enabled, the delta join caches the "
848+
+ "records from remote dim table. Default is true.");
849+
850+
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
851+
public static final ConfigOption<Long> TABLE_EXEC_DELTA_JOIN_LEFT_CACHE_SIZE =
852+
key("table.exec.delta-join.left.cache-size")
853+
.longType()
854+
.defaultValue(10000L)
855+
.withDescription(
856+
"The cache size used to cache the lookup results of the left table in delta join. "
857+
+ "This value must be positive when enabling cache. Default is 10000.");
858+
859+
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
860+
public static final ConfigOption<Long> TABLE_EXEC_DELTA_JOIN_RIGHT_CACHE_SIZE =
861+
key("table.exec.delta-join.right.cache-size")
862+
.longType()
863+
.defaultValue(10000L)
864+
.withDescription(
865+
"The cache size used to cache the lookup results of the right table in delta join. "
866+
+ "This value must be positive when enabling cache. Default is 10000.");
867+
841868
// ------------------------------------------------------------------------------------------
842869
// Enum option types
843870
// ------------------------------------------------------------------------------------------

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020

2121
import org.apache.flink.FlinkVersion;
2222
import org.apache.flink.api.dag.Transformation;
23+
import org.apache.flink.api.java.tuple.Tuple2;
2324
import org.apache.flink.configuration.ReadableConfig;
2425
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
2526
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
2627
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
28+
import org.apache.flink.table.api.config.ExecutionConfigOptions;
2729
import org.apache.flink.table.catalog.DataTypeFactory;
2830
import org.apache.flink.table.data.RowData;
2931
import org.apache.flink.table.data.conversion.DataStructureConverter;
@@ -83,6 +85,7 @@
8385
import java.util.Map;
8486
import java.util.Optional;
8587
import java.util.stream.Collectors;
88+
import java.util.stream.IntStream;
8689

8790
import static org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeltaJoin.DELTA_JOIN_TRANSFORMATION;
8891
import static org.apache.flink.table.planner.plan.utils.DeltaJoinUtil.getUnwrappedAsyncLookupFunction;
@@ -234,11 +237,17 @@ protected Transformation<RowData> translateToPlanInternal(
234237
RowDataKeySelector leftJoinKeySelector =
235238
KeySelectorUtil.getRowDataSelector(
236239
classLoader, leftJoinKeys, InternalTypeInfo.of(leftStreamType));
240+
// currently, delta join only supports consuming INSERT-ONLY stream
241+
RowDataKeySelector leftUpsertKeySelector =
242+
getUpsertKeySelector(new int[0], leftStreamType, classLoader);
237243

238244
// right side selector
239245
RowDataKeySelector rightJoinKeySelector =
240246
KeySelectorUtil.getRowDataSelector(
241247
classLoader, rightJoinKeys, InternalTypeInfo.of(rightStreamType));
248+
// currently, delta join only supports consuming INSERT-ONLY stream
249+
RowDataKeySelector rightUpsertKeySelector =
250+
getUpsertKeySelector(new int[0], rightStreamType, classLoader);
242251

243252
StreamOperatorFactory<RowData> operatorFactory =
244253
createAsyncLookupDeltaJoin(
@@ -252,7 +261,9 @@ protected Transformation<RowData> translateToPlanInternal(
252261
leftStreamType,
253262
rightStreamType,
254263
leftJoinKeySelector,
264+
leftUpsertKeySelector,
255265
rightJoinKeySelector,
266+
rightUpsertKeySelector,
256267
classLoader);
257268

258269
final TwoInputTransformation<RowData, RowData, RowData> transform =
@@ -282,7 +293,9 @@ private StreamOperatorFactory<RowData> createAsyncLookupDeltaJoin(
282293
RowType leftStreamType,
283294
RowType rightStreamType,
284295
RowDataKeySelector leftJoinKeySelector,
296+
RowDataKeySelector leftUpsertKeySelector,
285297
RowDataKeySelector rightJoinKeySelector,
298+
RowDataKeySelector rightUpsertKeySelector,
286299
ClassLoader classLoader) {
287300

288301
DataTypeFactory dataTypeFactory =
@@ -299,6 +312,10 @@ private StreamOperatorFactory<RowData> createAsyncLookupDeltaJoin(
299312
leftStreamType,
300313
rightStreamType,
301314
leftLookupKeys,
315+
leftJoinKeySelector,
316+
leftUpsertKeySelector,
317+
rightJoinKeySelector,
318+
rightUpsertKeySelector,
302319
false);
303320

304321
AsyncDeltaJoinRunner rightLookupTableAsyncFunction =
@@ -312,15 +329,23 @@ private StreamOperatorFactory<RowData> createAsyncLookupDeltaJoin(
312329
leftStreamType,
313330
rightStreamType,
314331
rightLookupKeys,
332+
leftJoinKeySelector,
333+
leftUpsertKeySelector,
334+
rightJoinKeySelector,
335+
rightUpsertKeySelector,
315336
true);
316337

338+
Tuple2<Long, Long> leftRightCacheSize = getCacheSize(config);
339+
317340
return new StreamingDeltaJoinOperatorFactory(
318341
rightLookupTableAsyncFunction,
319342
leftLookupTableAsyncFunction,
320343
leftJoinKeySelector,
321344
rightJoinKeySelector,
322345
asyncLookupOptions.asyncTimeout,
323346
asyncLookupOptions.asyncBufferCapacity,
347+
leftRightCacheSize.f0,
348+
leftRightCacheSize.f1,
324349
leftStreamType,
325350
rightStreamType);
326351
}
@@ -336,6 +361,10 @@ private AsyncDeltaJoinRunner createAsyncDeltaJoinRunner(
336361
RowType leftStreamSideType,
337362
RowType rightStreamSideType,
338363
Map<Integer, FunctionParam> lookupKeys,
364+
RowDataKeySelector leftJoinKeySelector,
365+
RowDataKeySelector leftUpsertKeySelector,
366+
RowDataKeySelector rightJoinKeySelector,
367+
RowDataKeySelector rightUpsertKeySelector,
339368
boolean treatRightAsLookupTable) {
340369
RelOptTable lookupTable = treatRightAsLookupTable ? rightTempTable : leftTempTable;
341370
RowType streamSideType = treatRightAsLookupTable ? leftStreamSideType : rightStreamSideType;
@@ -409,8 +438,13 @@ private AsyncDeltaJoinRunner createAsyncDeltaJoinRunner(
409438
(DataStructureConverter<RowData, Object>) lookupSideFetcherConverter,
410439
lookupSideGeneratedResultFuture,
411440
InternalSerializers.create(lookupTableSourceRowType),
441+
leftJoinKeySelector,
442+
leftUpsertKeySelector,
443+
rightJoinKeySelector,
444+
rightUpsertKeySelector,
412445
asyncLookupOptions.asyncBufferCapacity,
413-
treatRightAsLookupTable);
446+
treatRightAsLookupTable,
447+
enableCache(config));
414448
}
415449

416450
/**
@@ -449,4 +483,33 @@ public RexNode visitInputRef(RexInputRef inputRef) {
449483

450484
return condition.accept(converter);
451485
}
486+
487+
private RowDataKeySelector getUpsertKeySelector(
488+
int[] upsertKey, RowType rowType, ClassLoader classLoader) {
489+
final int[] rightUpsertKeys;
490+
if (upsertKey.length > 0) {
491+
rightUpsertKeys = upsertKey;
492+
} else {
493+
rightUpsertKeys = IntStream.range(0, rowType.getFields().size()).toArray();
494+
}
495+
return KeySelectorUtil.getRowDataSelector(
496+
classLoader, rightUpsertKeys, InternalTypeInfo.of(rowType));
497+
}
498+
499+
private boolean enableCache(ReadableConfig config) {
500+
return config.get(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED);
501+
}
502+
503+
/** Get the left cache size and right size. */
504+
private Tuple2<Long, Long> getCacheSize(ReadableConfig config) {
505+
long leftCacheSize =
506+
config.get(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_LEFT_CACHE_SIZE);
507+
long rightCacheSize =
508+
config.get(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_RIGHT_CACHE_SIZE);
509+
if ((leftCacheSize <= 0 || rightCacheSize <= 0) && enableCache(config)) {
510+
throw new IllegalArgumentException(
511+
"Cache size in delta join must be positive when enabling cache.");
512+
}
513+
return Tuple2.of(leftCacheSize, rightCacheSize);
514+
}
452515
}

0 commit comments

Comments
 (0)