Skip to content

Commit 2b4cfd4

Browse files
committed
[hotfix-407][all] fix bugs and add dirty-plugins.
1 parent ef4bfee commit 2b4cfd4

File tree

3 files changed

+33
-1
lines changed

3 files changed

+33
-1
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ FlinkStreamSQL
3131

3232
## 目录
3333

34-
[ 1.0 变更记录](docs/changelog.md)
3534
[ 1.1 demo](docs/demo.md)
3635
[ 1.2 快速开始](docs/quickStart.md)
3736
[ 1.3 参数配置](docs/config.md)

core/src/main/java/com/dtstack/flink/sql/exception/ExceptionTrace.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,21 @@ public static String traceOriginalCause(Throwable e) {
2020
}
2121
return errorMsg;
2222
}
23+
24+
/**
25+
* 根据异常的种类来判断是否需要强制跳过Flink的重启{@link SuppressRestartsException}
26+
* @param e exception
27+
* @param errorMsg 需要抛出的异常信息
28+
*/
29+
public static void dealExceptionWithSuppressStart(Exception e, String errorMsg) {
30+
if (e instanceof SuppressRestartsException) {
31+
throw new SuppressRestartsException(
32+
new Throwable(
33+
errorMsg
34+
)
35+
);
36+
} else {
37+
throw new RuntimeException(errorMsg);
38+
}
39+
}
2340
}

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,22 @@ private void checkSupport(SqlIdentifier identifier) {
218218
Preconditions.checkState(isSide, errorMsg);
219219
}
220220

221+
private void checkSupport(SqlIdentifier identifier) {
222+
String tableName = identifier.getComponent(0).getSimple();
223+
String sideTableName;
224+
String sideTableAlias;
225+
if (joinInfo.isLeftIsSideTable()) {
226+
sideTableName = joinInfo.getLeftTableName();
227+
sideTableAlias = joinInfo.getLeftTableAlias();
228+
} else {
229+
sideTableName = joinInfo.getRightTableName();
230+
sideTableAlias = joinInfo.getRightTableAlias();
231+
}
232+
boolean isSide = tableName.equals(sideTableName) || tableName.equals(sideTableAlias);
233+
String errorMsg = "only support set side table constant field, error field " + identifier;
234+
Preconditions.checkState(isSide, errorMsg);
235+
}
236+
221237
private void associateField(String sourceTableField, String sideTableField, SqlNode sqlNode) {
222238
String errorMsg = "can't deal equal field: " + sqlNode;
223239
equalFieldList.add(sideTableField);

0 commit comments

Comments
 (0)