Skip to content

Commit b74cf18

Browse files
committed
[FLINK-38488][tests] Use 'throws Exception' instead of try-catch-fail in tests in flink-tests
1 parent 1f34a07 commit b74cf18

15 files changed

+968
-1137
lines changed

flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java

Lines changed: 264 additions & 285 deletions
Large diffs are not rendered by default.

flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java

Lines changed: 351 additions & 382 deletions
Large diffs are not rendered by default.

flink-tests/src/test/java/org/apache/flink/test/checkpointing/ProcessingTimeWindowCheckpointingITCase.java

Lines changed: 163 additions & 189 deletions
Large diffs are not rendered by default.

flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -141,16 +141,11 @@ public static void shutDownExistingCluster() {
141141
* completed checkpoint's.
142142
*/
143143
@Test(timeout = 60000)
144-
public void testMultiRegionFailover() {
145-
try {
146-
JobGraph jobGraph = createJobGraph();
147-
ClusterClient<?> client = cluster.getClusterClient();
148-
submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
149-
verifyAfterJobExecuted();
150-
} catch (Exception e) {
151-
e.printStackTrace();
152-
Assert.fail(e.getMessage());
153-
}
144+
public void testMultiRegionFailover() throws Exception {
145+
JobGraph jobGraph = createJobGraph();
146+
ClusterClient<?> client = cluster.getClusterClient();
147+
submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
148+
verifyAfterJobExecuted();
154149
}
155150

156151
private void verifyAfterJobExecuted() {

flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java

Lines changed: 45 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import static org.junit.Assert.assertFalse;
5454
import static org.junit.Assert.assertNotEquals;
5555
import static org.junit.Assert.assertTrue;
56-
import static org.junit.Assert.fail;
5756

5857
/**
5958
* Integration test for the {@link CheckpointListener} interface. The test ensures that {@link
@@ -83,74 +82,67 @@ public class StreamCheckpointNotifierITCase extends AbstractTestBaseJUnit4 {
8382
* </pre>
8483
*/
8584
@Test
86-
public void testProgram() {
87-
try {
88-
final StreamExecutionEnvironment env =
89-
StreamExecutionEnvironment.getExecutionEnvironment();
90-
assertEquals("test setup broken", PARALLELISM, env.getParallelism());
85+
public void testProgram() throws Exception {
86+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
87+
assertEquals("test setup broken", PARALLELISM, env.getParallelism());
9188

92-
env.enableCheckpointing(500);
93-
RestartStrategyUtils.configureFixedDelayRestartStrategy(env, Integer.MAX_VALUE, 0L);
89+
env.enableCheckpointing(500);
90+
RestartStrategyUtils.configureFixedDelayRestartStrategy(env, Integer.MAX_VALUE, 0L);
9491

95-
final int numElements = 10000;
96-
final int numTaskTotal = PARALLELISM * 5;
92+
final int numElements = 10000;
93+
final int numTaskTotal = PARALLELISM * 5;
9794

98-
DataStream<Long> stream =
99-
env.addSource(new GeneratingSourceFunction(numElements, numTaskTotal));
95+
DataStream<Long> stream =
96+
env.addSource(new GeneratingSourceFunction(numElements, numTaskTotal));
10097

101-
stream
102-
// -------------- first vertex, chained to the src ----------------
103-
.filter(new LongRichFilterFunction())
98+
stream
99+
// -------------- first vertex, chained to the src ----------------
100+
.filter(new LongRichFilterFunction())
104101

105-
// -------------- second vertex, applying the co-map ----------------
106-
.connect(stream)
107-
.flatMap(new LeftIdentityCoRichFlatMapFunction())
102+
// -------------- second vertex, applying the co-map ----------------
103+
.connect(stream)
104+
.flatMap(new LeftIdentityCoRichFlatMapFunction())
108105

109-
// -------------- third vertex - the stateful one that also fails
110-
// ----------------
111-
.map(new IdentityMapFunction())
112-
.startNewChain()
106+
// -------------- third vertex - the stateful one that also fails
107+
// ----------------
108+
.map(new IdentityMapFunction())
109+
.startNewChain()
113110

114-
// -------------- fourth vertex - reducer and the sink ----------------
115-
.keyBy(x -> x.f0)
116-
.reduce(new OnceFailingReducer(numElements))
117-
.sinkTo(new DiscardingSink<>());
111+
// -------------- fourth vertex - reducer and the sink ----------------
112+
.keyBy(x -> x.f0)
113+
.reduce(new OnceFailingReducer(numElements))
114+
.sinkTo(new DiscardingSink<>());
118115

119-
env.execute();
116+
env.execute();
120117

121-
final long failureCheckpointID = OnceFailingReducer.failureCheckpointID;
122-
assertNotEquals(0L, failureCheckpointID);
118+
final long failureCheckpointID = OnceFailingReducer.failureCheckpointID;
119+
assertNotEquals(0L, failureCheckpointID);
123120

124-
List<List<Long>[]> allLists =
125-
Arrays.asList(
126-
GeneratingSourceFunction.COMPLETED_CHECKPOINTS,
127-
LongRichFilterFunction.COMPLETED_CHECKPOINTS,
128-
LeftIdentityCoRichFlatMapFunction.COMPLETED_CHECKPOINTS,
129-
IdentityMapFunction.COMPLETED_CHECKPOINTS,
130-
OnceFailingReducer.COMPLETED_CHECKPOINTS);
121+
List<List<Long>[]> allLists =
122+
Arrays.asList(
123+
GeneratingSourceFunction.COMPLETED_CHECKPOINTS,
124+
LongRichFilterFunction.COMPLETED_CHECKPOINTS,
125+
LeftIdentityCoRichFlatMapFunction.COMPLETED_CHECKPOINTS,
126+
IdentityMapFunction.COMPLETED_CHECKPOINTS,
127+
OnceFailingReducer.COMPLETED_CHECKPOINTS);
131128

132-
for (List<Long>[] parallelNotifications : allLists) {
133-
for (List<Long> notifications : parallelNotifications) {
129+
for (List<Long>[] parallelNotifications : allLists) {
130+
for (List<Long> notifications : parallelNotifications) {
134131

135-
assertTrue(
136-
"No checkpoint notification was received.", notifications.size() > 0);
132+
assertTrue("No checkpoint notification was received.", notifications.size() > 0);
137133

138-
assertFalse(
139-
"Failure checkpoint was marked as completed.",
140-
notifications.contains(failureCheckpointID));
134+
assertFalse(
135+
"Failure checkpoint was marked as completed.",
136+
notifications.contains(failureCheckpointID));
141137

142-
assertFalse(
143-
"No checkpoint received after failure.",
144-
notifications.get(notifications.size() - 1) == failureCheckpointID);
138+
assertFalse(
139+
"No checkpoint received after failure.",
140+
notifications.get(notifications.size() - 1) == failureCheckpointID);
145141

146-
assertTrue(
147-
"Checkpoint notification was received multiple times",
148-
notifications.size() == new HashSet<Long>(notifications).size());
149-
}
142+
assertTrue(
143+
"Checkpoint notification was received multiple times",
144+
notifications.size() == new HashSet<Long>(notifications).size());
150145
}
151-
} catch (Exception e) {
152-
e.printStackTrace();
153-
fail(e.getMessage());
154146
}
155147
}
156148

flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -123,28 +123,22 @@ public void shutDownExistingCluster() {
123123
*/
124124
@Test
125125
public void runCheckpointedProgram() throws Exception {
126+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
127+
env.setParallelism(PARALLELISM);
128+
env.enableCheckpointing(500);
129+
RestartStrategyUtils.configureFixedDelayRestartStrategy(env, Integer.MAX_VALUE, 0L);
130+
131+
testProgram(env);
132+
133+
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
126134
try {
127-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
128-
env.setParallelism(PARALLELISM);
129-
env.enableCheckpointing(500);
130-
RestartStrategyUtils.configureFixedDelayRestartStrategy(env, Integer.MAX_VALUE, 0L);
131-
132-
testProgram(env);
133-
134-
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
135-
try {
136-
submitJobAndWaitForResult(
137-
cluster.getClusterClient(), jobGraph, getClass().getClassLoader());
138-
} catch (Exception e) {
139-
Assert.assertTrue(
140-
ExceptionUtils.findThrowable(e, SuccessException.class).isPresent());
141-
}
142-
143-
postSubmit();
135+
submitJobAndWaitForResult(
136+
cluster.getClusterClient(), jobGraph, getClass().getClassLoader());
144137
} catch (Exception e) {
145-
e.printStackTrace();
146-
Assert.fail(e.getMessage());
138+
Assert.assertTrue(ExceptionUtils.findThrowable(e, SuccessException.class).isPresent());
147139
}
140+
141+
postSubmit();
148142
}
149143

150144
// --------------------------------------------------------------------------------------------

flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,16 +87,10 @@ public void testSplitComparison() {
8787
Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0);
8888
}
8989

90-
@Test
90+
@Test(expected = IllegalArgumentException.class)
9191
public void testIllegalArgument() {
92-
try {
93-
new TimestampedFileInputSplit(
94-
-10, 2, new Path("test"), 0, 100, null); // invalid modification time
95-
} catch (Exception e) {
96-
if (!(e instanceof IllegalArgumentException)) {
97-
Assert.fail(e.getMessage());
98-
}
99-
}
92+
new TimestampedFileInputSplit(
93+
-10, 2, new Path("test"), 0, 100, null); // invalid modification time
10094
}
10195

10296
@Test

flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.apache.flink.test.testfunctions.Tokenizer;
3737
import org.apache.flink.util.TestLogger;
3838

39-
import org.junit.Assert;
4039
import org.junit.Before;
4140
import org.junit.Test;
4241

@@ -67,32 +66,26 @@ public void before() {
6766
}
6867

6968
@Test(timeout = 60_000)
70-
public void testLocalExecutorWithWordCount() throws InterruptedException {
71-
try {
72-
// set up the files
73-
File inFile = File.createTempFile("wctext", ".in");
74-
File outFile = File.createTempFile("wctext", ".out");
75-
inFile.deleteOnExit();
76-
outFile.deleteOnExit();
77-
78-
try (FileWriter fw = new FileWriter(inFile)) {
79-
fw.write(WordCountData.TEXT);
80-
}
81-
82-
final Configuration config = new Configuration();
83-
config.set(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
84-
config.set(DeploymentOptions.ATTACHED, true);
85-
86-
StreamGraph wcStreamGraph = getWordCountStreamGraph(inFile, outFile, parallelism);
87-
JobClient jobClient =
88-
executor.execute(wcStreamGraph, config, ClassLoader.getSystemClassLoader())
89-
.get();
90-
jobClient.getJobExecutionResult().get();
91-
} catch (Exception e) {
92-
e.printStackTrace();
93-
Assert.fail(e.getMessage());
69+
public void testLocalExecutorWithWordCount() throws Exception {
70+
// set up the files
71+
File inFile = File.createTempFile("wctext", ".in");
72+
File outFile = File.createTempFile("wctext", ".out");
73+
inFile.deleteOnExit();
74+
outFile.deleteOnExit();
75+
76+
try (FileWriter fw = new FileWriter(inFile)) {
77+
fw.write(WordCountData.TEXT);
9478
}
9579

80+
final Configuration config = new Configuration();
81+
config.set(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
82+
config.set(DeploymentOptions.ATTACHED, true);
83+
84+
StreamGraph wcStreamGraph = getWordCountStreamGraph(inFile, outFile, parallelism);
85+
JobClient jobClient =
86+
executor.execute(wcStreamGraph, config, ClassLoader.getSystemClassLoader()).get();
87+
jobClient.getJobExecutionResult().get();
88+
9689
assertThat(miniCluster.isRunning(), is(false));
9790
}
9891

flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import org.apache.flink.streaming.api.functions.sink.legacy.OutputFormatSinkFunction;
2525
import org.apache.flink.test.util.JavaProgramTestBaseJUnit4;
2626

27-
import static org.junit.Assert.fail;
28-
2927
/**
3028
* Tests for non rich DataSource and DataSink input output formats being correctly used at runtime.
3129
*/
@@ -38,12 +36,7 @@ protected void testProgram() throws Exception {
3836
TestNonRichOutputFormat output = new TestNonRichOutputFormat();
3937
env.createInput(new TestNonRichInputFormat())
4038
.addSink(new OutputFormatSinkFunction<>(output));
41-
try {
42-
env.execute();
43-
} catch (Exception e) {
44-
// we didn't break anything by making everything rich.
45-
e.printStackTrace();
46-
fail(e.getMessage());
47-
}
39+
env.execute();
40+
// we didn't break anything by making everything rich.
4841
}
4942
}

flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040

4141
import static org.apache.flink.util.ExceptionUtils.findThrowable;
4242
import static org.junit.Assert.assertTrue;
43-
import static org.junit.Assert.fail;
4443

4544
/**
4645
* Test for proper error messages in case user-defined serialization is broken and detected in the
@@ -67,7 +66,7 @@ public static Configuration getConfiguration() {
6766
}
6867

6968
@Test
70-
public void testIncorrectSerializer1() {
69+
public void testIncorrectSerializer1() throws Exception {
7170
try {
7271
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
7372
env.setParallelism(PARLLELISM);
@@ -93,14 +92,11 @@ public ConsumesTooMuch map(Long value) throws Exception {
9392
.getMessage()
9493
.contains("broken serialization."))
9594
.isPresent());
96-
} catch (Exception e) {
97-
e.printStackTrace();
98-
fail(e.getMessage());
9995
}
10096
}
10197

10298
@Test
103-
public void testIncorrectSerializer2() {
99+
public void testIncorrectSerializer2() throws Exception {
104100
try {
105101
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
106102
env.setParallelism(PARLLELISM);
@@ -126,14 +122,11 @@ public ConsumesTooMuchSpanning map(Long value) throws Exception {
126122
.getMessage()
127123
.contains("broken serialization."))
128124
.isPresent());
129-
} catch (Exception e) {
130-
e.printStackTrace();
131-
fail(e.getMessage());
132125
}
133126
}
134127

135128
@Test
136-
public void testIncorrectSerializer3() {
129+
public void testIncorrectSerializer3() throws Exception {
137130
try {
138131
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
139132
env.setParallelism(PARLLELISM);
@@ -159,14 +152,11 @@ public ConsumesTooLittle map(Long value) throws Exception {
159152
.getMessage()
160153
.contains("broken serialization."))
161154
.isPresent());
162-
} catch (Exception e) {
163-
e.printStackTrace();
164-
fail(e.getMessage());
165155
}
166156
}
167157

168158
@Test
169-
public void testIncorrectSerializer4() {
159+
public void testIncorrectSerializer4() throws Exception {
170160
try {
171161
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
172162
env.setParallelism(PARLLELISM);
@@ -192,9 +182,6 @@ public ConsumesTooLittleSpanning map(Long value) throws Exception {
192182
.getMessage()
193183
.contains("broken serialization."))
194184
.isPresent());
195-
} catch (Exception e) {
196-
e.printStackTrace();
197-
fail(e.getMessage());
198185
}
199186
}
200187

0 commit comments

Comments
 (0)