Skip to content
This repository was archived by the owner on Dec 14, 2022. It is now read-only.

Commit 74cd95a

Browse files
author
Jianyun Zhao
authored
Fix txn not found (#443)
1 parent fbb8d17 commit 74cd95a

File tree

3 files changed

+277
-1
lines changed

3 files changed

+277
-1
lines changed

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ public void close() throws Exception {
314314
checkErroneous();
315315
producerClose();
316316
checkErroneous();
317+
super.close();
317318
}
318319

319320
protected Producer<T> getProducer(String topic) {
@@ -375,7 +376,7 @@ public void producerFlush(PulsarTransactionState<T> transaction) throws Exceptio
375376
}
376377
}
377378

378-
if (transaction.isTransactional()) {
379+
if (transaction != null && transaction.isTransactional()) {
379380
// we check the future was completed and add the messageId to list for persistence.
380381
List<CompletableFuture<MessageId>> futureList =
381382
tid2FuturesMap.get(transaction.transactionalId);
@@ -538,6 +539,14 @@ protected void recoverAndCommit(PulsarTransactionState<T> transaction) {
538539
// abort the transaction again, then Pulsar will throw a duplicate operation error,
539540
// we catch the error without doing anything to deal with it
540541
log.debug("transaction {} is already committed...", transaction.transactionalId);
542+
} catch (
543+
TransactionCoordinatorClientException.TransactionNotFoundException
544+
notFoundException) {
545+
// In some cases, the transaction has been committed or aborted before the recovery,
546+
// but Flink has not yet sensed it. When flink recover this job, it will commit or
547+
// abort the transaction again, then Pulsar will throw a duplicate operation error,
548+
// we catch the error without doing anything to deal with it
549+
log.debug("transaction {} is not found...", transaction.transactionalId);
541550
} catch (TransactionCoordinatorClientException e) {
542551
throw new IllegalStateException(e);
543552
}
@@ -564,6 +573,14 @@ protected void recoverAndAbort(PulsarTransactionState<T> transaction) {
564573
// abort the transaction again, then Pulsar will throw a duplicate operation error,
565574
// we catch the error without doing anything to deal with it
566575
log.debug("transaction {} is already aborted...", transaction.transactionalId);
576+
} catch (
577+
TransactionCoordinatorClientException.TransactionNotFoundException
578+
notFoundException) {
579+
// In some cases, the transaction has been committed or aborted before the recovery,
580+
// but Flink has not yet sensed it. When flink recover this job, it will commit or
581+
// abort the transaction again, then Pulsar will throw a duplicate operation error,
582+
// we catch the error without doing anything to deal with it
583+
log.debug("transaction {} is not found...", transaction.transactionalId);
567584
} catch (TransactionCoordinatorClientException e) {
568585
throw new IllegalStateException(e);
569586
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package org.apache.flink.streaming.connectors.pulsar;
16+
17+
import org.apache.flink.api.common.serialization.SimpleStringSchema;
18+
import org.apache.flink.api.common.typeutils.base.StringSerializer;
19+
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
20+
import org.apache.flink.streaming.api.operators.StreamSink;
21+
import org.apache.flink.streaming.connectors.pulsar.serialization.PulsarSerializationSchema;
22+
import org.apache.flink.streaming.connectors.pulsar.serialization.PulsarSerializationSchemaWrapper;
23+
import org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic;
24+
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
25+
import org.apache.flink.table.api.DataTypes;
26+
27+
import org.apache.commons.io.IOUtils;
28+
import org.apache.pulsar.client.api.Consumer;
29+
import org.apache.pulsar.client.api.Message;
30+
import org.apache.pulsar.client.api.Schema;
31+
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
32+
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
33+
import org.junit.After;
34+
import org.junit.Before;
35+
import org.junit.Test;
36+
37+
import java.util.ArrayList;
38+
import java.util.Arrays;
39+
import java.util.Collections;
40+
import java.util.List;
41+
import java.util.Optional;
42+
import java.util.Properties;
43+
import java.util.concurrent.CompletableFuture;
44+
import java.util.concurrent.ExecutionException;
45+
import java.util.concurrent.TimeUnit;
46+
import java.util.concurrent.TimeoutException;
47+
48+
import static org.junit.Assert.assertEquals;
49+
import static org.junit.Assert.fail;
50+
51+
/** test exactly-once for pulsar sink. */
52+
public class PulsarSinkExactlyOnceTest extends PulsarTestBaseWithFlink {
53+
54+
private TestFlinkPulsarSink<String> sinkFunction;
55+
private String topic;
56+
private OneInputStreamOperatorTestHarness<String, Object> harness;
57+
58+
@Before
59+
public void setUp() throws Exception {
60+
this.topic = newTopic();
61+
this.setUpTestHarness();
62+
}
63+
64+
@After
65+
public void tearDown() throws Exception {
66+
this.closeTestHarness();
67+
}
68+
69+
private void setUpTestHarness() throws Exception {
70+
final Properties properties = new Properties();
71+
final PulsarSerializationSchemaWrapper<String> schemaWrapper =
72+
new PulsarSerializationSchemaWrapper.Builder<>(new SimpleStringSchema())
73+
.useAtomicMode(DataTypes.STRING())
74+
.build();
75+
this.sinkFunction =
76+
new TestFlinkPulsarSink<String>(
77+
getAdminUrl(),
78+
Optional.of(topic),
79+
clientConfigurationData,
80+
properties,
81+
schemaWrapper,
82+
PulsarSinkSemantic.EXACTLY_ONCE);
83+
this.harness =
84+
new OneInputStreamOperatorTestHarness(
85+
new StreamSink(this.sinkFunction), StringSerializer.INSTANCE);
86+
this.harness.setup();
87+
}
88+
89+
private void closeTestHarness() throws Exception {
90+
this.harness.close();
91+
}
92+
93+
@Test
94+
public void testSubsumedNotificationOfPreviousCheckpoint() throws Exception {
95+
this.harness.open();
96+
this.harness.processElement("42", 0L);
97+
this.harness.snapshot(0L, 1L);
98+
this.harness.processElement("43", 2L);
99+
this.harness.snapshot(1L, 3L);
100+
this.harness.processElement("44", 4L);
101+
this.harness.snapshot(2L, 5L);
102+
this.harness.notifyOfCompletedCheckpoint(2L);
103+
this.harness.notifyOfCompletedCheckpoint(1L);
104+
assertEquals(Arrays.asList("42", "43", "44"), getActualValues(3));
105+
}
106+
107+
@Test
108+
public void testNotifyOfCompletedCheckpoint() throws Exception {
109+
this.harness.open();
110+
this.harness.processElement("42", 0L);
111+
this.harness.snapshot(0L, 1L);
112+
this.harness.processElement("43", 2L);
113+
this.harness.snapshot(1L, 3L);
114+
this.harness.processElement("44", 4L);
115+
this.harness.snapshot(2L, 5L);
116+
this.harness.notifyOfCompletedCheckpoint(1L);
117+
assertEquals(Arrays.asList("42", "43"), getActualValues(2));
118+
}
119+
120+
@Test
121+
public void testRestoreCheckpoint() throws Exception {
122+
this.harness.open();
123+
this.harness.processElement("42", 0L);
124+
this.harness.snapshot(0L, 1L);
125+
this.harness.processElement("43", 2L);
126+
final OperatorSubtaskState snapshot = this.harness.snapshot(1L, 3L);
127+
this.harness.notifyOfCompletedCheckpoint(1L);
128+
129+
int count = 100;
130+
for (int i = 3; i < count; i++) {
131+
this.harness.processElement(Integer.toString(41 + i), i);
132+
this.harness.snapshot(i, i);
133+
this.harness.notifyOfCompletedCheckpoint(i);
134+
}
135+
this.closeTestHarness();
136+
this.setUpTestHarness();
137+
this.harness.initializeState(snapshot);
138+
assertEquals(Arrays.asList("42", "43"), getActualValues(2));
139+
}
140+
141+
@Test
142+
public void testFailBeforeNotify() throws Exception {
143+
this.harness.open();
144+
this.harness.processElement("42", 0L);
145+
this.harness.snapshot(0L, 1L);
146+
this.harness.processElement("43", 2L);
147+
OperatorSubtaskState snapshot = this.harness.snapshot(1L, 3L);
148+
this.sinkFunction.setWritable(false);
149+
150+
try {
151+
this.harness.processElement("44", 4L);
152+
this.harness.snapshot(2L, 5L);
153+
fail("something should fail");
154+
} catch (NotWritableException ignore) {
155+
}
156+
157+
this.closeTestHarness();
158+
this.sinkFunction.setWritable(true);
159+
this.setUpTestHarness();
160+
this.harness.initializeState(snapshot);
161+
assertEquals(Arrays.asList("42", "43"), getActualValues(2));
162+
this.closeTestHarness();
163+
}
164+
165+
private List<String> getActualValues(int expectedSize) throws Exception {
166+
final List<String> actualValues = consumeMessage(topic, Schema.STRING, expectedSize, 2000);
167+
Collections.sort(actualValues);
168+
return actualValues;
169+
}
170+
171+
public <T> List<T> consumeMessage(String topic, Schema<T> schema, int count, int timeout)
172+
throws TimeoutException, ExecutionException, InterruptedException {
173+
return CompletableFuture.supplyAsync(
174+
() -> {
175+
Consumer<T> consumer = null;
176+
try {
177+
consumer =
178+
getPulsarClient()
179+
.newConsumer(schema)
180+
.topic(topic)
181+
.subscriptionInitialPosition(
182+
SubscriptionInitialPosition.Earliest)
183+
.subscriptionName("test")
184+
.subscribe();
185+
List<T> result = new ArrayList<>(count);
186+
for (int i = 0; i < count; i++) {
187+
final Message<T> message = consumer.receive();
188+
result.add(message.getValue());
189+
consumer.acknowledge(message);
190+
}
191+
consumer.close();
192+
return result;
193+
} catch (Exception e) {
194+
sneakyThrow(e);
195+
return null;
196+
} finally {
197+
IOUtils.closeQuietly(consumer, i -> {});
198+
}
199+
})
200+
.get(timeout, TimeUnit.MILLISECONDS);
201+
}
202+
203+
/** javac hack for unchecking the checked exception. */
204+
@SuppressWarnings("unchecked")
205+
public static <T extends Exception> void sneakyThrow(Exception t) throws T {
206+
throw (T) t;
207+
}
208+
209+
/**
210+
* mock Pulsar Sink,Support for throwing unwritable exceptions.
211+
*
212+
* @param <T> record
213+
*/
214+
public static class TestFlinkPulsarSink<T> extends FlinkPulsarSink<T> {
215+
216+
private boolean writable = true;
217+
218+
public TestFlinkPulsarSink(
219+
String adminUrl,
220+
Optional<String> defaultTopicName,
221+
ClientConfigurationData clientConf,
222+
Properties properties,
223+
PulsarSerializationSchema serializationSchema,
224+
PulsarSinkSemantic semantic) {
225+
super(
226+
adminUrl,
227+
defaultTopicName,
228+
clientConf,
229+
properties,
230+
serializationSchema,
231+
semantic);
232+
}
233+
234+
@Override
235+
public void invoke(PulsarTransactionState<T> transactionState, T value, Context context)
236+
throws Exception {
237+
if (!writable) {
238+
throw new NotWritableException("TestFlinkPulsarSink");
239+
}
240+
super.invoke(transactionState, value, context);
241+
}
242+
243+
public void setWritable(boolean writable) {
244+
this.writable = writable;
245+
}
246+
}
247+
248+
/** not writable exception. */
249+
public static class NotWritableException extends RuntimeException {
250+
public NotWritableException(String name) {
251+
super(String.format("Pulsar [%s] is not writable", name));
252+
}
253+
}
254+
}

pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
4848
import org.junit.AfterClass;
4949
import org.junit.BeforeClass;
50+
import org.testcontainers.containers.BindMode;
5051
import org.testcontainers.containers.PulsarContainer;
5152
import org.testcontainers.containers.output.Slf4jLogConsumer;
5253
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
@@ -112,6 +113,10 @@ public static void prepare() throws Exception {
112113
DockerImageName.parse(pulsarImage)
113114
.asCompatibleSubstituteFor("apachepulsar/pulsar");
114115
pulsarService = new PulsarContainer(pulsar);
116+
pulsarService.withClasspathResourceMapping(
117+
"pulsar/txnStandalone.conf",
118+
"/pulsar/conf/standalone.conf",
119+
BindMode.READ_ONLY);
115120
pulsarService.addExposedPort(2181);
116121
pulsarService.waitingFor(
117122
new HttpWaitStrategy()

0 commit comments

Comments
 (0)