@@ -257,7 +257,13 @@ pub async fn publish_kafka_schemas(
257
257
Ok ( ( key_schema_id, value_schema_id) )
258
258
}
259
259
260
- /// Ensures that the Kafka sink's data and consistency collateral exist.
260
+ /// Ensures that the Kafka sink's data and consistency collateral exist, as well
261
+ /// as returning the last complete timestamp that the last incarnation of the
262
+ /// sink committed to Kafka.
263
+ ///
264
+ /// IMPORTANT: to achieve exactly once guarantees, the producer that will resume
265
+ /// production at the returned timestamp *must* have called `init_transactions`
266
+ /// prior to calling this method.
261
267
///
262
268
/// # Errors
263
269
/// - If the [`KafkaSinkConnection`]'s consistency collateral exists and
@@ -267,95 +273,84 @@ pub async fn build_kafka(
267
273
connection : & KafkaSinkConnection ,
268
274
connection_cx : & ConnectionContext ,
269
275
) -> Result < Option < Timestamp > , ContextCreationError > {
270
- let client: AdminClient < _ > = connection
271
- . connection
272
- . create_with_context ( connection_cx, MzClientContext :: default ( ) , & BTreeMap :: new ( ) )
273
- . await
274
- . add_context ( "creating admin client failed" ) ?;
275
-
276
- // Check for existence of progress topic; if it exists and contains data for
277
- // this sink, we expect the data topic to exist, as well. Note that we don't
278
- // expect the converse to be true because we don't want to prevent users
279
- // from creating topics before setting up their sinks.
280
- let meta = client
281
- . inner ( )
282
- . fetch_metadata ( None , Duration :: from_secs ( 10 ) )
283
- . check_ssh_status ( client. inner ( ) . context ( ) )
284
- . add_context ( "fetching metadata" ) ?;
285
-
286
- // Check if the broker's metadata already contains the progress topic.
276
+ // Fetch the progress of the last incarnation of the sink, if any.
287
277
let progress_topic = match & connection. consistency_config {
288
- KafkaConsistencyConfig :: Progress { topic } => {
289
- meta. topics ( ) . iter ( ) . find ( |t| t. name ( ) == topic)
290
- }
278
+ KafkaConsistencyConfig :: Progress { topic } => topic,
291
279
} ;
292
-
293
- // If the consistency topic exists, check to see if it contains this sink's
294
- // data.
295
- let latest_ts = if let Some ( progress_topic) = progress_topic {
296
- let progress_client: BaseConsumer < _ > = connection
280
+ // For details about the two clients constructed here, see
281
+ // `determine_latest_progress_record`.
282
+ let make_progress_client = |isolation_level : & ' static str | async {
283
+ connection
297
284
. connection
298
285
. create_with_context (
299
286
connection_cx,
300
287
MzClientContext :: default ( ) ,
301
288
& btreemap ! {
302
289
"group.id" => SinkGroupId :: new( sink_id) ,
303
- "isolation.level" => "read_committed" . into( ) ,
290
+ "isolation.level" => isolation_level . into( ) ,
304
291
"enable.auto.commit" => "false" . into( ) ,
305
292
"auto.offset.reset" => "earliest" . into( ) ,
306
293
"enable.partition.eof" => "true" . into( ) ,
307
294
} ,
308
295
)
309
- . await ?;
310
-
311
- let progress_client = Arc :: new ( progress_client) ;
312
- let latest_ts = determine_latest_progress_record (
313
- format ! ( "build_kafka_{}" , sink_id) ,
314
- progress_topic. name ( ) . to_string ( ) ,
315
- ProgressKey :: new ( sink_id) ,
316
- Arc :: clone ( & progress_client) ,
317
- )
296
+ . await
297
+ } ;
298
+ let progress_client_read_committed = Arc :: new ( make_progress_client ( "read_committed" ) . await ?) ;
299
+ let progress_client_read_uncommitted =
300
+ Arc :: new ( make_progress_client ( "read_uncommitted" ) . await ?) ;
301
+ let latest_ts = determine_latest_progress_record (
302
+ format ! ( "build_kafka_{}" , sink_id) ,
303
+ Arc :: clone ( & progress_client_read_committed) ,
304
+ progress_client_read_uncommitted,
305
+ progress_topic. to_string ( ) ,
306
+ ProgressKey :: new ( sink_id) ,
307
+ )
308
+ . await
309
+ . check_ssh_status ( progress_client_read_committed. client ( ) . context ( ) ) ?;
310
+
311
+ let admin_client: AdminClient < _ > = connection
312
+ . connection
313
+ . create_with_context ( connection_cx, MzClientContext :: default ( ) , & BTreeMap :: new ( ) )
318
314
. await
319
- . check_ssh_status ( progress_client. client ( ) . context ( ) ) ?;
320
-
321
- // If we have progress data, we should have the topic listed in the
322
- // broker's metadata. If we don't, error.
323
- if latest_ts. is_some ( ) && !meta. topics ( ) . iter ( ) . any ( |t| t. name ( ) == connection. topic ) {
324
- Err ( anyhow:: anyhow!(
325
- "sink progress data exists, but sink data topic is missing"
326
- ) ) ?
327
- }
315
+ . add_context ( "creating admin client failed" ) ?;
328
316
329
- latest_ts
330
- } else {
331
- None
332
- } ;
317
+ // If the progress topic existed and contained data for this sink, we expect
318
+ // the data topic to exist, as well. Note that we don't expect the converse
319
+ // to be true because we don't want to prevent users from creating topics
320
+ // before setting up their sinks.
321
+ let meta = admin_client
322
+ . inner ( )
323
+ . fetch_metadata ( None , Duration :: from_secs ( 10 ) )
324
+ . check_ssh_status ( admin_client. inner ( ) . context ( ) )
325
+ . add_context ( "fetching metadata" ) ?;
326
+ if latest_ts. is_some ( ) && !meta. topics ( ) . iter ( ) . any ( |t| t. name ( ) == connection. topic ) {
327
+ Err ( anyhow:: anyhow!(
328
+ "sink progress data exists, but sink data topic is missing"
329
+ ) ) ?;
330
+ }
333
331
334
- // Create Kafka topic.
332
+ // Create Kafka topics.
333
+ ensure_kafka_topic (
334
+ & admin_client,
335
+ progress_topic,
336
+ 1 ,
337
+ connection. replication_factor ,
338
+ KafkaSinkConnectionRetention :: default ( ) ,
339
+ )
340
+ . await
341
+ . check_ssh_status ( admin_client. inner ( ) . context ( ) )
342
+ . add_context ( "error registering kafka progress topic for sink" ) ?;
335
343
ensure_kafka_topic (
336
- & client ,
344
+ & admin_client ,
337
345
& connection. topic ,
338
346
connection. partition_count ,
339
347
connection. replication_factor ,
340
348
connection. retention ,
341
349
)
342
350
. await
343
- . check_ssh_status ( client . inner ( ) . context ( ) )
351
+ . check_ssh_status ( admin_client . inner ( ) . context ( ) )
344
352
. add_context ( "error registering kafka topic for sink" ) ?;
345
353
346
- match & connection. consistency_config {
347
- KafkaConsistencyConfig :: Progress { topic } => ensure_kafka_topic (
348
- & client,
349
- topic,
350
- 1 ,
351
- connection. replication_factor ,
352
- KafkaSinkConnectionRetention :: default ( ) ,
353
- )
354
- . await
355
- . check_ssh_status ( client. inner ( ) . context ( ) )
356
- . add_context ( "error registering kafka consistency topic for sink" ) ?,
357
- } ;
358
-
359
354
Ok ( latest_ts)
360
355
}
361
356
@@ -376,17 +371,18 @@ pub struct ProgressRecord {
376
371
/// Determines the latest progress record from the specified topic for the given
377
372
/// progress key.
378
373
///
379
- /// IMPORTANT: to achieve exactly once guarantees, the producer that will act on
380
- /// this information *must* have called `init_transactions` prior to calling
381
- /// this method.
374
+ /// IMPORTANT: to achieve exactly once guarantees, the producer that will resume
375
+ /// production at the returned timestamp *must* have called `init_transactions`
376
+ /// prior to calling this method.
382
377
///
383
378
/// IMPORTANT: the `progress_client` must have `enable.partition.eof` set to
384
379
/// `true`.
385
380
async fn determine_latest_progress_record (
386
381
name : String ,
382
+ progress_client_read_committed : Arc < BaseConsumer < TunnelingClientContext < MzClientContext > > > ,
383
+ progress_client_read_uncommitted : Arc < BaseConsumer < TunnelingClientContext < MzClientContext > > > ,
387
384
progress_topic : String ,
388
385
progress_key : ProgressKey ,
389
- progress_client : Arc < BaseConsumer < TunnelingClientContext < MzClientContext > > > ,
390
386
) -> Result < Option < Timestamp > , anyhow:: Error > {
391
387
// ****************************** WARNING ******************************
392
388
// Be VERY careful when editing the code in this function. It is very easy
@@ -403,9 +399,10 @@ async fn determine_latest_progress_record(
403
399
///
404
400
/// Blocking so should always be called on background thread.
405
401
fn get_latest_ts < C > (
402
+ progress_client_read_committed : & BaseConsumer < C > ,
403
+ progress_client_read_uncommitted : & BaseConsumer < C > ,
406
404
progress_topic : & str ,
407
405
progress_key : & ProgressKey ,
408
- progress_client : & BaseConsumer < C > ,
409
406
) -> Result < Option < Timestamp > , anyhow:: Error >
410
407
where
411
408
C : ConsumerContext ,
@@ -414,7 +411,7 @@ async fn determine_latest_progress_record(
414
411
// guarantees ordering within a single partition, and we need a strict
415
412
// order on the progress messages we read and write.
416
413
let partitions = match mz_kafka_util:: client:: get_partitions (
417
- progress_client . client ( ) ,
414
+ progress_client_read_committed . client ( ) ,
418
415
progress_topic,
419
416
DEFAULT_FETCH_METADATA_TIMEOUT ,
420
417
) {
@@ -452,20 +449,27 @@ async fn determine_latest_progress_record(
452
449
// before we can conclude that we've seen the latest progress record for
453
450
// the specified `progress_key`. A safety argument:
454
451
//
455
- // * The producer has initialized transactions before calling this
456
- // method. At this point, any writes from a previous version of this
457
- // sink should be visible, so the offset we fetch should be larger
458
- // than any previously-written progress record and thus picked up by
459
- // our scan.
452
+ // * Our caller has initialized transactions before calling this
453
+ // method, which prevents the prior incarnation of this sink from
454
+ // committing any further progress records.
460
455
//
461
- // TODO: this is only true when fetching offsets with an isolation
462
- // level of "read uncomitted"! Fix this.
456
+ // * We use `read_uncommitted` isolation to ensure that we fetch the
457
+ // true high water mark for the topic, even if there are pending
458
+ // transactions in the topic. If we used the `read_committed`
459
+ // isolation level, we'd instead get the "last stable offset" (LSO),
460
+ // which is the offset of the first message in an open transaction,
461
+ // which might not include the last progress message committed for
462
+ // this sink! (While the caller of this function has fenced out
463
+ // older producers for this sink, *other* sinks writing using the
464
+ // same progress topic might have long-running transactions that
465
+ // hold back the LSO.)
463
466
//
464
- // * If another sink spins up and fences the producer out, we may not
465
- // see the latest progress record... but since the producer has been
466
- // fenced out, it will be unable to act on our stale information.
467
+ // * If another sink spins up and fences out the producer for this
468
+ // incarnation of the sink, we may not see the latest progress
469
+ // record... but since the producer has been fenced out, it will be
470
+ // unable to act on our stale information.
467
471
//
468
- let ( lo, hi) = progress_client
472
+ let ( lo, hi) = progress_client_read_uncommitted
469
473
. fetch_watermarks ( progress_topic, partition, DEFAULT_FETCH_METADATA_TIMEOUT )
470
474
. map_err ( |e| {
471
475
anyhow ! (
@@ -478,16 +482,18 @@ async fn determine_latest_progress_record(
478
482
let mut tps = TopicPartitionList :: new ( ) ;
479
483
tps. add_partition ( progress_topic, partition) ;
480
484
tps. set_partition_offset ( progress_topic, partition, Offset :: Beginning ) ?;
481
- progress_client. assign ( & tps) . with_context ( || {
482
- format ! (
483
- "Error seeking in progress topic {}:{}" ,
484
- progress_topic, partition
485
- )
486
- } ) ?;
485
+ progress_client_read_committed
486
+ . assign ( & tps)
487
+ . with_context ( || {
488
+ format ! (
489
+ "Error seeking in progress topic {}:{}" ,
490
+ progress_topic, partition
491
+ )
492
+ } ) ?;
487
493
488
494
// Helper to get the progress consumer's current position.
489
495
let get_position = || {
490
- let position = progress_client
496
+ let position = progress_client_read_committed
491
497
. position ( ) ?
492
498
. find_partition ( progress_topic, partition)
493
499
. ok_or_else ( || {
@@ -516,13 +522,29 @@ async fn determine_latest_progress_record(
516
522
// Read messages until the consumer is positioned at or beyond the high
517
523
// water mark.
518
524
//
525
+ // We use `read_committed` isolation to ensure we don't see progress
526
+ // records for transactions that did not commit. This means we have to
527
+ // wait for the LSO to progress to the high water mark `hi`, which means
528
+ // waiting for any open transactions for other sinks using the same
529
+ // progress topic to complete. We set a short transaction timeout (10s)
530
+ // to ensure we never need to wait more than 10s.
531
+ //
532
+ // Note that the stall time on the progress topic is not a function of
533
+ // transaction size. We've designed our transactions so that the
534
+ // progress record is always written last, after all the data has been
535
+ // written, and so the window of time in which the progress topic has an
536
+ // open transaction is quite small. The only vulnerability is if another
537
+ // sink using the same progress topic crashes in that small window
538
+ // between writing the progress record and committing the transaction,
539
+ // in which case we have to wait out the transaction timeout.
540
+ //
519
541
// Important invariant: we only exit this loop successfully (i.e., not
520
542
// returning an error) if we have positive proof of a position at or
521
543
// beyond the high water mark. To make this invariant easy to check, do
522
544
// not use `break` in the body of the loop.
523
545
let mut last_timestamp = None ;
524
546
while get_position ( ) ? < hi {
525
- let message = match progress_client . poll ( PROGRESS_RECORD_FETCH_TIMEOUT ) {
547
+ let message = match progress_client_read_committed . poll ( PROGRESS_RECORD_FETCH_TIMEOUT ) {
526
548
Some ( Ok ( message) ) => message,
527
549
Some ( Err ( KafkaError :: PartitionEOF ( _) ) ) => {
528
550
// No message, but the consumer's position may have advanced
@@ -566,7 +588,14 @@ async fn determine_latest_progress_record(
566
588
567
589
task:: spawn_blocking (
568
590
|| format ! ( "get_latest_ts:{name}" ) ,
569
- move || get_latest_ts ( & progress_topic, & progress_key, & progress_client) ,
591
+ move || {
592
+ get_latest_ts (
593
+ & progress_client_read_committed,
594
+ & progress_client_read_uncommitted,
595
+ & progress_topic,
596
+ & progress_key,
597
+ )
598
+ } ,
570
599
)
571
600
. await ?
572
601
}
0 commit comments