Skip to content

Commit aa7d04b

Browse files
committed
review feedback
1 parent 7ac8448 commit aa7d04b

File tree

3 files changed

+188
-86
lines changed

3 files changed

+188
-86
lines changed

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java

+68-1
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,73 @@ public void testConcurrentWritesWithRollbackNonEmptyTable() {
643643
}
644644
}
645645

646+
@Test
647+
public void testConcurrentWritesWithRollbackWithNonReplaceSnapshotInBetween() {
648+
IcebergCatalog catalog = this.catalog();
649+
if (this.requiresNamespaceCreate()) {
650+
catalog.createNamespace(NS);
651+
}
652+
653+
Table table =
654+
catalog
655+
.buildTable(TABLE, SCHEMA)
656+
.withPartitionSpec(SPEC)
657+
.withProperties(Map.of("rollback.compaction.on-conflicts.enabled", "true"))
658+
.create();
659+
this.assertNoFiles(table);
660+
661+
// commit FILE_A
662+
catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit();
663+
this.assertFiles(catalog.loadTable(TABLE), FILE_A);
664+
table.refresh();
665+
666+
long lastSnapshotId = table.currentSnapshot().snapshotId();
667+
668+
// Apply the deletes based on FILE_A
669+
// this should conflict when we try to commit without the change.
670+
RowDelta originalRowDelta =
671+
table
672+
.newRowDelta()
673+
.addDeletes(FILE_A_DELETES)
674+
.validateFromSnapshot(lastSnapshotId)
675+
.validateDataFilesExist(List.of(FILE_A.location()));
676+
// Make client ready with updates, don't reach out to IRC server yet
677+
Snapshot s = originalRowDelta.apply();
678+
TableOperations ops = ((BaseTable) catalog.loadTable(TABLE)).operations();
679+
TableMetadata base = ops.current();
680+
TableMetadata.Builder update = TableMetadata.buildFrom(base);
681+
update.setBranchSnapshot(s, "main");
682+
TableMetadata updatedMetadata = update.build();
683+
List<MetadataUpdate> updates = updatedMetadata.changes();
684+
List<UpdateRequirement> requirements = UpdateRequirements.forUpdateTable(base, updates);
685+
UpdateTableRequest request = UpdateTableRequest.create(TABLE, requirements, updates);
686+
687+
// replace FILE_A with FILE_B
688+
// commit the transaction.
689+
catalog.loadTable(TABLE).newRewrite().addFile(FILE_B).deleteFile(FILE_A).commit();
690+
691+
// commit FILE_C
692+
catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_C).commit();
693+
Assertions.assertThatThrownBy(
694+
() ->
695+
IcebergCatalogHandler.commit(
696+
((BaseTable) catalog.loadTable(TABLE)).operations(), request))
697+
.isInstanceOf(CommitFailedException.class)
698+
.hasMessageContaining("Requirement failed: branch main has changed");
699+
700+
table.refresh();
701+
702+
// Assert only 3 snapshots
703+
Snapshot currentSnapshot = table.snapshot(table.refs().get("main").snapshotId());
704+
int totalSnapshots = 1;
705+
while (currentSnapshot.parentId() != null) {
706+
currentSnapshot = table.snapshot(currentSnapshot.parentId());
707+
totalSnapshots += 1;
708+
}
709+
assertThat(totalSnapshots).isEqualTo(3);
710+
this.assertFiles(catalog.loadTable(TABLE), FILE_B, FILE_C);
711+
}
712+
646713
@Test
647714
public void testValidateNotificationWhenTableAndNamespacesDontExist() {
648715
Assumptions.assumeTrue(
@@ -675,7 +742,7 @@ public void testValidateNotificationWhenTableAndNamespacesDontExist() {
675742
// We should be able to send the notification without creating the metadata file since it's
676743
// only validating the ability to send the CREATE/UPDATE notification possibly before actually
677744
// creating the table at all on the remote catalog.
678-
assertThat(catalog.sendNotification(table, request))
745+
Assertions.assertThat(catalog.sendNotification(table, request))
679746
.as("Notification should be sent successfully")
680747
.isTrue();
681748
Assertions.assertThat(catalog.namespaceExists(namespace))

service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java

+112-76
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@
4343
import org.apache.iceberg.DataOperations;
4444
import org.apache.iceberg.MetadataUpdate;
4545
import org.apache.iceberg.PartitionSpec;
46-
import org.apache.iceberg.SnapshotRef;
4746
import org.apache.iceberg.Schema;
4847
import org.apache.iceberg.Snapshot;
48+
import org.apache.iceberg.SnapshotRef;
4949
import org.apache.iceberg.SortOrder;
5050
import org.apache.iceberg.Table;
5151
import org.apache.iceberg.TableMetadata;
@@ -782,23 +782,6 @@ public LoadTableResponse updateTable(
782782
return updateTableWithRollback(baseCatalog, tableIdentifier, applyUpdateFilters(request));
783783
}
784784

785-
private static TableMetadata create(TableOperations ops, UpdateTableRequest request) {
786-
request.requirements().forEach((requirement) -> requirement.validate(ops.current()));
787-
Optional<Integer> formatVersion =
788-
request.updates().stream()
789-
.filter((update) -> update instanceof MetadataUpdate.UpgradeFormatVersion)
790-
.map((update) -> ((MetadataUpdate.UpgradeFormatVersion) update).formatVersion())
791-
.findFirst();
792-
TableMetadata.Builder builder =
793-
(TableMetadata.Builder)
794-
formatVersion
795-
.map(TableMetadata::buildFromEmpty)
796-
.orElseGet(TableMetadata::buildFromEmpty);
797-
request.updates().forEach((update) -> update.applyTo(builder));
798-
ops.commit((TableMetadata) null, builder.build());
799-
return ops.current();
800-
}
801-
802785
// TODO: Clean this up when CatalogHandler become extensible.
803786
// Copy of CatalogHandler#update
804787
private static LoadTableResponse updateTableWithRollback(
@@ -828,7 +811,28 @@ private static LoadTableResponse updateTableWithRollback(
828811
return LoadTableResponse.builder().withTableMetadata(finalMetadata).build();
829812
}
830813

814+
// TODO: Clean this up when CatalogHandler become extensible.
815+
// Copy of CatalogHandler#create
816+
private static TableMetadata create(TableOperations ops, UpdateTableRequest request) {
817+
request.requirements().forEach((requirement) -> requirement.validate(ops.current()));
818+
Optional<Integer> formatVersion =
819+
request.updates().stream()
820+
.filter((update) -> update instanceof MetadataUpdate.UpgradeFormatVersion)
821+
.map((update) -> ((MetadataUpdate.UpgradeFormatVersion) update).formatVersion())
822+
.findFirst();
823+
TableMetadata.Builder builder =
824+
(TableMetadata.Builder)
825+
formatVersion
826+
.map(TableMetadata::buildFromEmpty)
827+
.orElseGet(TableMetadata::buildFromEmpty);
828+
request.updates().forEach((update) -> update.applyTo(builder));
829+
ops.commit((TableMetadata) null, builder.build());
830+
return ops.current();
831+
}
832+
831833
@VisibleForTesting
834+
// TODO: Clean this up when CatalogHandler become extensible.
835+
// Copy of CatalogHandler#commit
832836
public static TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
833837
AtomicBoolean isRetry = new AtomicBoolean(false);
834838

@@ -840,8 +844,6 @@ public static TableMetadata commit(TableOperations ops, UpdateTableRequest reque
840844
.run(
841845
(taskOps) -> {
842846
TableMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current();
843-
isRetry.set(true);
844-
// Prev PR: https://github.com/apache/iceberg/pull/5888
845847
boolean rollbackCompaction =
846848
PropertyUtil.propertyAsBoolean(
847849
taskOps.current().properties(), ROLLBACK_REPLACE_ENABLED_PROPERTY, false);
@@ -854,76 +856,61 @@ public static TableMetadata commit(TableOperations ops, UpdateTableRequest reque
854856
if (!rollbackCompaction) {
855857
throw new ValidationFailureException(e);
856858
}
857-
// Since snapshot has already been created at the client end.
858-
// Nothing much can be done, we can move this
859-
// to writer specific thing, but it would be cool if catalog does this for us.
860-
// Inspect that the requirements states that snapshot
861-
// ref needs to be asserted this usually means in the update section
862-
// it has addSnapshot and setSnapshotRef
863-
UpdateRequirement.AssertRefSnapshotID addSnapshot = null;
864-
int found = 0;
865-
for (UpdateRequirement requirement : request.requirements()) {
866-
// there should be only add snapshot request
867-
if (requirement instanceof UpdateRequirement.AssertRefSnapshotID) {
868-
++found;
869-
addSnapshot = (UpdateRequirement.AssertRefSnapshotID) requirement;
870-
}
871-
}
859+
UpdateRequirement.AssertRefSnapshotID assertRefSnapshotId =
860+
findAssertRefSnapshotID(request);
861+
MetadataUpdate.SetSnapshotRef setSnapshotRef = findSetSnapshotRefUpdate(request);
872862

873-
if (found != 1) {
874-
// TODO: handle this case, find min snapshot id, to rollback to give it creates
875-
// lineage
876-
// lets not complicate things rn
863+
if (assertRefSnapshotId == null || setSnapshotRef == null) {
864+
// This implies the request was not trying to add a snapshot.
877865
throw new ValidationFailureException(e);
878866
}
879867

880-
Long parentSnapshotId = addSnapshot.snapshotId();
881-
// so we will first check all the snapshots on the top of
882-
// base on which the snapshot we want to commit is of type REPLACE ops.
883-
Long parentToRollbackTo = base.currentSnapshot().snapshotId();
884-
List<MetadataUpdate> updateToRemoveSnapshot = new ArrayList<>();
885-
while (!Objects.equals(parentToRollbackTo, parentSnapshotId)) {
886-
Snapshot snap = ops.current().snapshot(parentToRollbackTo);
887-
if (!DataOperations.REPLACE.equals(snap.operation())) {
888-
break;
889-
}
890-
updateToRemoveSnapshot.add(
891-
new MetadataUpdate.RemoveSnapshot(snap.snapshotId()));
892-
parentToRollbackTo = snap.parentId();
893-
}
894-
895-
MetadataUpdate.SetSnapshotRef ref = null;
896-
found = 0;
897-
// find the SetRefName snapshot update
898-
for (MetadataUpdate update : request.updates()) {
899-
if (update instanceof MetadataUpdate.SetSnapshotRef) {
900-
++found;
901-
ref = (MetadataUpdate.SetSnapshotRef) update;
902-
}
868+
if (!hasJustMainBranch(base)) {
869+
// There can be cases when the snapshot we want to rollback
870+
// is being referenced by another branch and just checking
871+
// the tip of these branches is not sufficient.
872+
// TODO: handle cases when tables have >1 branches.
873+
throw new ValidationFailureException(e);
903874
}
904875

905-
if (found != 1 || !Objects.equals(parentToRollbackTo, parentSnapshotId)) {
906-
// nothing can be done as this implies there was a non replace
907-
// snapshot in between or there is more than setRef ops, we don't know where
908-
// to go.
876+
// snapshot-id the client expects the table current_snapshot_id
877+
long expectedCurrentSnapshotId = assertRefSnapshotId.snapshotId();
878+
// table current_snapshot_id.
879+
long currentSnapshotId = base.currentSnapshot().snapshotId();
880+
List<MetadataUpdate> metadataUpdates =
881+
generateUpdatesToRemoveNoopSnapshot(
882+
ops, currentSnapshotId, expectedCurrentSnapshotId);
883+
884+
if (metadataUpdates == null || metadataUpdates.isEmpty()) {
885+
// Nothing can be done as this implies that there were not all
886+
// No-op snapshots (REPLACE) between expectedCurrentSnapshotId and
887+
// currentSnapshotId.
888+
// hence re-throw the exception caught.
909889
throw new ValidationFailureException(e);
910890
}
911891

912-
// first we should also set back the ref we wanted to set, back to the base
913-
// on which the current update is based on.
914-
metadataBuilder.setBranchSnapshot(parentSnapshotId, ref.name());
892+
// Set back the ref we wanted to set, back to the snapshot-id
893+
// the client is expecting the table to be at.
894+
metadataBuilder.setBranchSnapshot(
895+
expectedCurrentSnapshotId, setSnapshotRef.name());
915896

916897
// apply the remove snapshots update in the current metadata.
917-
// NOTE: we need to setRef to parent first and then apply remove as the remove
918-
// will drop. The tags / branch which don't have reference.
898+
// NOTE: we need to setRef to expectedCurrentSnapshotId first and then apply
899+
// remove,
900+
// as otherwise the remove will drop.
919901
// NOTE: we can skip removing the now orphan base. Its not a hard requirement.
920902
// just something good to do, and not leave for Remove Orphans.
921-
updateToRemoveSnapshot.forEach((update -> update.applyTo(metadataBuilder)));
903+
metadataUpdates.forEach((update -> update.applyTo(metadataBuilder)));
922904
// Ref rolled back update correctly to snapshot to be committed parent now.
923905
newBase = metadataBuilder.build();
924-
// move the lastSequenceNumber back, to apply snapshot properly.
925-
// Seq number are considered increasing monotonically, snapshot over snapshot, so
926-
// this is important.
906+
// move the lastSequenceNumber back, to apply snapshot properly on the
907+
// current-metadata
908+
// Seq number are considered increasing monotonically, snapshot over snapshot, the
909+
// client
910+
// generates the manifest list and hence the sequence number can't be changed for
911+
// a snapshot
912+
// the only possible option then is to change the sequenceNumber tracked by
913+
// metadata.json
927914
Class<?> clazz = newBase.getClass();
928915
try {
929916
Field field = clazz.getDeclaredField("lastSequenceNumber");
@@ -935,7 +922,6 @@ public static TableMetadata commit(TableOperations ops, UpdateTableRequest reque
935922
throw new RuntimeException(ex);
936923
}
937924
}
938-
939925
// double check if the requirements passes now.
940926
try {
941927
TableMetadata baseWithRemovedSnaps = newBase;
@@ -959,7 +945,57 @@ public static TableMetadata commit(TableOperations ops, UpdateTableRequest reque
959945
return ops.current();
960946
}
961947

948+
private static UpdateRequirement.AssertRefSnapshotID findAssertRefSnapshotID(
949+
UpdateTableRequest request) {
950+
UpdateRequirement.AssertRefSnapshotID assertRefSnapshotID = null;
951+
int total = 0;
952+
for (UpdateRequirement requirement : request.requirements()) {
953+
if (requirement instanceof UpdateRequirement.AssertRefSnapshotID) {
954+
++total;
955+
assertRefSnapshotID = (UpdateRequirement.AssertRefSnapshotID) requirement;
956+
}
957+
}
958+
959+
// if > 1 assertion for refs, then it's not safe to rollback, make this Noop.
960+
return total != 1 ? null : assertRefSnapshotID;
961+
}
962+
963+
private static List<MetadataUpdate> generateUpdatesToRemoveNoopSnapshot(
964+
TableOperations ops, long currentSnapshotId, long expectedCurrentSnapshotId) {
965+
List<MetadataUpdate> updateToRemoveSnapshot = new ArrayList<>();
966+
Long snapshotId = currentSnapshotId;
967+
while (snapshotId != null && !Objects.equals(snapshotId, expectedCurrentSnapshotId)) {
968+
Snapshot snap = ops.current().snapshot(snapshotId);
969+
if (!DataOperations.REPLACE.equals(snap.operation())) {
970+
break;
971+
}
972+
updateToRemoveSnapshot.add(new MetadataUpdate.RemoveSnapshot(snap.snapshotId()));
973+
snapshotId = snap.parentId();
974+
}
975+
976+
boolean wasExpectedSnapshotReached = Objects.equals(snapshotId, expectedCurrentSnapshotId);
977+
return wasExpectedSnapshotReached ? updateToRemoveSnapshot : null;
978+
}
979+
980+
private static MetadataUpdate.SetSnapshotRef findSetSnapshotRefUpdate(
981+
UpdateTableRequest request) {
982+
int total = 0;
983+
MetadataUpdate.SetSnapshotRef setSnapshotRefUpdate = null;
984+
// find the SetRefName snapshot update
985+
for (MetadataUpdate update : request.updates()) {
986+
if (update instanceof MetadataUpdate.SetSnapshotRef) {
987+
total++;
988+
setSnapshotRefUpdate = (MetadataUpdate.SetSnapshotRef) update;
989+
}
990+
}
962991

992+
// if > 1 assertion for refs, then it's not safe to rollback, make this Noop.
993+
return total != 1 ? null : setSnapshotRefUpdate;
994+
}
995+
996+
private static boolean hasJustMainBranch(TableMetadata tableMetadata) {
997+
return tableMetadata.refs().values().stream().filter(SnapshotRef::isBranch).count() == 1;
998+
}
963999

9641000
public LoadTableResponse updateTableForStagedCreate(
9651001
TableIdentifier tableIdentifier, UpdateTableRequest request) {

service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/ValidationFailureException.java

+8-9
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,19 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
2019
package org.apache.polaris.service.catalog.iceberg;
2120

2221
import org.apache.iceberg.exceptions.CommitFailedException;
2322

2423
public class ValidationFailureException extends RuntimeException {
25-
private final CommitFailedException wrapped;
24+
private final CommitFailedException wrapped;
2625

27-
public ValidationFailureException(CommitFailedException cause) {
28-
super(cause);
29-
this.wrapped = cause;
30-
}
26+
public ValidationFailureException(CommitFailedException cause) {
27+
super(cause);
28+
this.wrapped = cause;
29+
}
3130

32-
public CommitFailedException wrapped() {
33-
return this.wrapped;
34-
}
31+
public CommitFailedException wrapped() {
32+
return this.wrapped;
33+
}
3534
}

0 commit comments

Comments
 (0)