@@ -115,7 +115,8 @@ public class CatalogHandlerUtils {
115
115
116
116
static {
117
117
try {
118
- LAST_SEQUENCE_NUMBER_FIELD = TableMetadata .class .getDeclaredField ("lastSequenceNumber" );
118
+ LAST_SEQUENCE_NUMBER_FIELD =
119
+ TableMetadata .Builder .class .getDeclaredField ("lastSequenceNumber" );
119
120
LAST_SEQUENCE_NUMBER_FIELD .setAccessible (true );
120
121
} catch (NoSuchFieldException e ) {
121
122
throw new RuntimeException ("Unable to access field" , e );
@@ -461,7 +462,7 @@ public TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
461
462
2.0 /* exponential */ )
462
463
.onlyRetryOn (CommitFailedException .class )
463
464
.run (
464
- ( taskOps ) -> {
465
+ taskOps -> {
465
466
TableMetadata base = isRetry .get () ? taskOps .refresh () : taskOps .current ();
466
467
467
468
TableMetadata .Builder metadataBuilder = TableMetadata .buildFrom (base );
@@ -470,6 +471,7 @@ public TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
470
471
request .requirements ().forEach ((requirement ) -> requirement .validate (base ));
471
472
} catch (CommitFailedException e ) {
472
473
if (!isRollbackCompactionEnabled ()) {
474
+ // wrap and rethrow outside of tasks to avoid unnecessary retry
473
475
throw new ValidationFailureException (e );
474
476
}
475
477
LOGGER .debug (
@@ -486,6 +488,7 @@ public TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
486
488
"Giving up on Rollback replace operations for table={}, with current-snapshot-id={}, as operation doesn't attempts to add a single snapshot" ,
487
489
base .uuid (),
488
490
base .currentSnapshot ().snapshotId ());
491
+ // wrap and rethrow outside of tasks to avoid unnecessary retry
489
492
throw new ValidationFailureException (e );
490
493
}
491
494
@@ -495,6 +498,7 @@ public TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
495
498
MetadataUpdate .AddSnapshot snapshotToBeAdded = findAddSnapshotUpdate (request );
496
499
if (snapshotToBeAdded == null ) {
497
500
// Re-throw if, there's no snapshot data to be added.
501
+ // wrap and rethrow outside of tasks to avoid unnecessary retry
498
502
throw new ValidationFailureException (e );
499
503
}
500
504
@@ -512,6 +516,7 @@ public TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
512
516
// Nothing can be done as this implies that there were not all
513
517
// No-op snapshots (REPLACE) between expectedCurrentSnapshotId and
514
518
// currentSnapshotId. hence re-throw the exception caught.
519
+ // wrap and rethrow outside of tasks to avoid unnecessary retry
515
520
throw new ValidationFailureException (e );
516
521
}
517
522
// Set back the ref we wanted to set, back to the snapshot-id
@@ -522,11 +527,17 @@ public TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
522
527
// apply the remove snapshots update in the current metadata.
523
528
// NOTE: we need to setRef to expectedCurrentSnapshotId first and then apply
524
529
// remove, as otherwise the remove will drop the reference.
525
- // NOTE: we can skip removing the now orphan base. Its not a hard requirement.
530
+ // NOTE: we can skip removing the now orphan base. It's not a hard requirement.
526
531
// just something good to do, and not leave for Remove Orphans.
527
532
// Ref rolled back update correctly to snapshot to be committed parent now.
528
533
metadataUpdates .forEach ((update -> update .applyTo (metadataBuilder )));
529
- newBase = setAppropriateLastSeqNumber (metadataBuilder .build ());
534
+ newBase =
535
+ setAppropriateLastSeqNumber (
536
+ metadataBuilder ,
537
+ base .uuid (),
538
+ base .lastSequenceNumber (),
539
+ base .snapshot (expectedCurrentSnapshotId ).sequenceNumber ())
540
+ .build ();
530
541
LOGGER .info (
531
542
"Successfully roll-backed replace operation for table={}, with current-snapshot-id={}, to snapshot={}" ,
532
543
base .uuid (),
@@ -540,13 +551,17 @@ public TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
540
551
.requirements ()
541
552
.forEach ((requirement ) -> requirement .validate (baseWithRemovedSnaps ));
542
553
} catch (CommitFailedException e ) {
554
+ // wrap and rethrow outside of tasks to avoid unnecessary retry
543
555
throw new ValidationFailureException (e );
544
556
}
545
557
546
558
TableMetadata .Builder newMetadataBuilder = TableMetadata .buildFrom (newBase );
547
559
request .updates ().forEach ((update ) -> update .applyTo (newMetadataBuilder ));
548
560
TableMetadata updated = newMetadataBuilder .build ();
549
- // always commit this
561
+ if (updated .changes ().isEmpty ()) {
562
+ // do not commit if the metadata has not changed
563
+ return ;
564
+ }
550
565
taskOps .commit (base , updated );
551
566
});
552
567
} catch (ValidationFailureException e ) {
@@ -595,19 +610,20 @@ private List<MetadataUpdate> generateUpdatesToRemoveNoopSnapshot(
595
610
Long snapshotId = base .ref (updateRefName ).snapshotId (); // current tip of the given branch
596
611
// ensure this branch has the latest sequence number.
597
612
long expectedSequenceNumber = base .lastSequenceNumber ();
613
+ // Unexpected state as table's current sequence number is not equal to the
614
+ // most recent snapshot the ref points to.
615
+ if (expectedSequenceNumber != base .snapshot (snapshotId ).sequenceNumber ()) {
616
+ LOGGER .debug (
617
+ "Giving up rolling back table {} to snapshot {}, ref current snapshot sequence number {} is not equal expected sequence number {}" ,
618
+ base .uuid (),
619
+ snapshotId ,
620
+ base .snapshot (snapshotId ).sequenceNumber (),
621
+ expectedSequenceNumber );
622
+ return null ;
623
+ }
598
624
Set <Long > snapshotsToRemove = new LinkedHashSet <>();
599
625
while (snapshotId != null && !Objects .equals (snapshotId , expectedCurrentSnapshotId )) {
600
626
Snapshot snap = base .snapshot (snapshotId );
601
- // catch un-expected state the commit sequence number are
602
- // not continuous can happen for a table with multiple branches.
603
- if (expectedSequenceNumber != snap .sequenceNumber ()) {
604
- LOGGER .debug (
605
- "Giving up rolling back table {} to snapshot {}, Sequence Number are not continuous from {}" ,
606
- base .uuid (),
607
- snapshotId ,
608
- expectedSequenceNumber );
609
- break ;
610
- }
611
627
if (!isRollbackSnapshot (snap ) || idsToRetain .contains (snapshotId )) {
612
628
// Either encountered a non no-op snapshot or the snapshot is being referenced by any other
613
629
// reference either by branch or a tag.
@@ -619,8 +635,6 @@ private List<MetadataUpdate> generateUpdatesToRemoveNoopSnapshot(
619
635
}
620
636
snapshotsToRemove .add (snap .snapshotId ());
621
637
snapshotId = snap .parentId ();
622
- // we need continuous sequence number to correctly rollback
623
- expectedSequenceNumber --;
624
638
}
625
639
626
640
boolean wasExpectedSnapshotReached = Objects .equals (snapshotId , expectedCurrentSnapshotId );
@@ -665,27 +679,30 @@ private MetadataUpdate.AddSnapshot findAddSnapshotUpdate(UpdateTableRequest requ
665
679
return total != 1 ? null : addSnapshot ;
666
680
}
667
681
668
- private TableMetadata setAppropriateLastSeqNumber (TableMetadata newBase ) {
682
+ private TableMetadata .Builder setAppropriateLastSeqNumber (
683
+ TableMetadata .Builder metadataBuilder ,
684
+ String tableUUID ,
685
+ long currentSequenceNumber ,
686
+ long expectedSequenceNumber ) {
669
687
// TODO: Get rid of the reflection call once TableMetadata have API for it.
670
688
// move the lastSequenceNumber back, to apply snapshot properly on the
671
689
// current-metadata Seq number are considered increasing monotonically
672
690
// snapshot over snapshot, the client generates the manifest list and hence
673
691
// the sequence number can't be changed for a snapshot the only possible option
674
692
// then is to change the sequenceNumber tracked by metadata.json
675
693
try {
676
- long lastSeqNumber = newBase .lastSequenceNumber ();
677
694
// this should point to the sequence number that current tip of the
678
695
// branch belongs to, as the new commit will be applied on top of this.
679
- LAST_SEQUENCE_NUMBER_FIELD .set (newBase , newBase . currentSnapshot (). sequenceNumber () );
696
+ LAST_SEQUENCE_NUMBER_FIELD .set (metadataBuilder , expectedSequenceNumber );
680
697
LOGGER .info (
681
- "Setting table :{} last sequence number from {} to {}" ,
682
- newBase . uuid () ,
683
- lastSeqNumber ,
684
- newBase . lastSequenceNumber () );
698
+ "Setting table uuid :{} last sequence number from: {} to {}" ,
699
+ tableUUID ,
700
+ currentSequenceNumber ,
701
+ expectedSequenceNumber );
685
702
} catch (IllegalAccessException ex ) {
686
703
throw new RuntimeException (ex );
687
704
}
688
- return newBase ;
705
+ return metadataBuilder ;
689
706
}
690
707
691
708
private BaseView asBaseView (View view ) {
0 commit comments