Skip to content

Commit 05f9bad

Browse files
committed
POC: Rollback compaction on conflict
1 parent dd3c71c commit 05f9bad

File tree

2 files changed

+244
-1
lines changed

2 files changed

+244
-1
lines changed

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java

+52
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,12 @@
5353
import org.apache.commons.lang3.NotImplementedException;
5454
import org.apache.iceberg.BaseTable;
5555
import org.apache.iceberg.CatalogProperties;
56+
import org.apache.iceberg.DeleteFile;
57+
import org.apache.iceberg.FileMetadata;
5658
import org.apache.iceberg.PartitionSpec;
59+
import org.apache.iceberg.RowDelta;
5760
import org.apache.iceberg.Schema;
61+
import org.apache.iceberg.Snapshot;
5862
import org.apache.iceberg.SortOrder;
5963
import org.apache.iceberg.Table;
6064
import org.apache.iceberg.TableMetadata;
@@ -144,6 +148,15 @@
144148
@TestProfile(IcebergCatalogTest.Profile.class)
145149
public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> {
146150

151+
DeleteFile FILE_A_DELETES =
152+
FileMetadata.deleteFileBuilder(SPEC)
153+
.ofPositionDeletes()
154+
.withPath("/path/to/data-a-deletes.parquet")
155+
.withFileSizeInBytes(10)
156+
.withPartitionPath("id_bucket=0") // easy way to set partition data for now
157+
.withRecordCount(1)
158+
.build();
159+
147160
public static class Profile implements QuarkusTestProfile {
148161

149162
@Override
@@ -459,6 +472,45 @@ public void testCreateNestedNamespaceUnderMissingParent() {
459472
.hasMessageContaining("Parent");
460473
}
461474

475+
@Test
476+
public void testConcurrentWritesWithRollbackNonEmptyTable() {
477+
IcebergCatalog catalog = this.catalog();
478+
if (this.requiresNamespaceCreate()) {
479+
catalog.createNamespace(NS);
480+
}
481+
482+
Table table = catalog.buildTable(TABLE, SCHEMA).withPartitionSpec(SPEC).create();
483+
this.assertNoFiles(table);
484+
// commit FILE_A
485+
catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit();
486+
this.assertFiles(catalog.loadTable(TABLE), FILE_A);
487+
table.refresh();
488+
long lastSnapshotId = table.currentSnapshot().snapshotId();
489+
// Apply the deletes based on FILE_A
490+
// this should conflict when we try to commit
491+
// without the change
492+
RowDelta rowDelta =
493+
table
494+
.newRowDelta()
495+
.addDeletes(FILE_A_DELETES)
496+
.validateFromSnapshot(lastSnapshotId)
497+
.validateDataFilesExist(List.of(FILE_A.location()));
498+
Snapshot uncomittedSnapshot = rowDelta.apply();
499+
500+
// replace FILE_A with FILE_B
501+
catalog.loadTable(TABLE).newRewrite().addFile(FILE_B).deleteFile(FILE_A).commit();
502+
503+
// no try commit FILE_A delete, It's impossible to mimic the contention
504+
// situation presently as commit triggers client side validation again
505+
// even when the apply() method has been called before
506+
// TODO: to better add such test cases, may be mocking the taskOps to not
507+
// do the refresh, hence make on base snapshot.
508+
// also there is an issue on overriding this as all things are package scope
509+
// For ex MergingSnapshotPro
510+
// rowDelta.commit();
511+
this.assertFiles(catalog.loadTable(TABLE), FILE_B);
512+
}
513+
462514
@Test
463515
public void testValidateNotificationWhenTableAndNamespacesDontExist() {
464516
Assumptions.assumeTrue(

service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java

+192-1
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,24 @@
2929
import java.util.HashSet;
3030
import java.util.List;
3131
import java.util.Map;
32+
import java.util.Objects;
3233
import java.util.Optional;
3334
import java.util.Set;
35+
import java.util.concurrent.atomic.AtomicBoolean;
3436
import java.util.stream.Collectors;
3537
import org.apache.iceberg.BaseMetadataTable;
3638
import org.apache.iceberg.BaseTable;
39+
import org.apache.iceberg.BaseTransaction;
40+
import org.apache.iceberg.DataOperations;
3741
import org.apache.iceberg.MetadataUpdate;
3842
import org.apache.iceberg.PartitionSpec;
43+
import org.apache.iceberg.Schema;
44+
import org.apache.iceberg.Snapshot;
3945
import org.apache.iceberg.SortOrder;
4046
import org.apache.iceberg.Table;
4147
import org.apache.iceberg.TableMetadata;
4248
import org.apache.iceberg.TableOperations;
49+
import org.apache.iceberg.Transaction;
4350
import org.apache.iceberg.UpdateRequirement;
4451
import org.apache.iceberg.catalog.Catalog;
4552
import org.apache.iceberg.catalog.Namespace;
@@ -67,6 +74,9 @@
6774
import org.apache.iceberg.rest.responses.LoadTableResponse;
6875
import org.apache.iceberg.rest.responses.LoadViewResponse;
6976
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
77+
import org.apache.iceberg.types.Types;
78+
import org.apache.iceberg.util.PropertyUtil;
79+
import org.apache.iceberg.util.Tasks;
7080
import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
7181
import org.apache.polaris.core.auth.PolarisAuthorizer;
7282
import org.apache.polaris.core.config.FeatureConfiguration;
@@ -699,7 +709,188 @@ public LoadTableResponse updateTable(
699709
if (isExternal(catalog)) {
700710
throw new BadRequestException("Cannot update table on external catalogs.");
701711
}
702-
return CatalogHandlers.updateTable(baseCatalog, tableIdentifier, applyUpdateFilters(request));
712+
// TODO: pending discussion if table property is right way, or a writer specific knob is
713+
// required.
714+
return updateTableWithRollback(baseCatalog, tableIdentifier, applyUpdateFilters(request));
715+
}
716+
717+
private static TableMetadata create(TableOperations ops, UpdateTableRequest request) {
718+
request.requirements().forEach((requirement) -> requirement.validate(ops.current()));
719+
Optional<Integer> formatVersion =
720+
request.updates().stream()
721+
.filter((update) -> update instanceof MetadataUpdate.UpgradeFormatVersion)
722+
.map((update) -> ((MetadataUpdate.UpgradeFormatVersion) update).formatVersion())
723+
.findFirst();
724+
TableMetadata.Builder builder =
725+
(TableMetadata.Builder)
726+
formatVersion
727+
.map(TableMetadata::buildFromEmpty)
728+
.orElseGet(TableMetadata::buildFromEmpty);
729+
request.updates().forEach((update) -> update.applyTo(builder));
730+
ops.commit((TableMetadata) null, builder.build());
731+
return ops.current();
732+
}
733+
734+
// TODO: Clean this up when CatalogHandler become extensible.
735+
// Copy of CatalogHandler#update
736+
private static LoadTableResponse updateTableWithRollback(
737+
Catalog catalog, TableIdentifier ident, UpdateTableRequest request) {
738+
Schema EMPTY_SCHEMA = new Schema(new Types.NestedField[0]);
739+
TableMetadata finalMetadata;
740+
if (isCreate(request)) {
741+
Transaction transaction =
742+
catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction();
743+
if (!(transaction instanceof BaseTransaction)) {
744+
throw new IllegalStateException(
745+
"Cannot wrap catalog that does not produce BaseTransaction");
746+
}
747+
748+
BaseTransaction baseTransaction = (BaseTransaction) transaction;
749+
finalMetadata = create(baseTransaction.underlyingOps(), request);
750+
} else {
751+
Table table = catalog.loadTable(ident);
752+
if (!(table instanceof BaseTable)) {
753+
throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable");
754+
}
755+
756+
TableOperations ops = ((BaseTable) table).operations();
757+
finalMetadata = commit(ops, request);
758+
}
759+
760+
return LoadTableResponse.builder().withTableMetadata(finalMetadata).build();
761+
}
762+
763+
static TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
764+
AtomicBoolean isRetry = new AtomicBoolean(false);
765+
766+
try {
767+
Tasks.foreach(new TableOperations[] {ops})
768+
.retry(4)
769+
.exponentialBackoff(100L, 60000L, 1800000L, (double) 2.0F)
770+
.onlyRetryOn(CommitFailedException.class)
771+
.run(
772+
(taskOps) -> {
773+
TableMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current();
774+
isRetry.set(true);
775+
// My prev pr : https://github.com/apache/iceberg/pull/5888
776+
// Taking this feature behind a table property presently.
777+
boolean rollbackCompaction =
778+
PropertyUtil.propertyAsBoolean(
779+
taskOps.current().properties(),
780+
"rollback.compaction.on-conflicts.enabled",
781+
false);
782+
// otherwise create a metadataUpdate to remove the snapshots we had
783+
// applied our rollback requests first
784+
TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base);
785+
TableMetadata newBase = base;
786+
try {
787+
request.requirements().forEach((requirement) -> requirement.validate(base));
788+
} catch (CommitFailedException e) {
789+
if (!rollbackCompaction) {
790+
throw new ValidationFailureException(e);
791+
}
792+
// Since snapshot has already been created at the client end.
793+
// Nothing much can be done, we can move this
794+
// to writer specific thing, but it would be cool if catalog does this for us.
795+
// Inspect that the requirements states that snapshot
796+
// ref needs to be asserted this usually means in the update section
797+
// it has addSnapshot and setSnapshotRef
798+
UpdateRequirement.AssertRefSnapshotID addSnapshot = null;
799+
int found = 0;
800+
for (UpdateRequirement requirement : request.requirements()) {
801+
// there should be only add snapshot request
802+
if (requirement instanceof UpdateRequirement.AssertRefSnapshotID) {
803+
++found;
804+
addSnapshot = (UpdateRequirement.AssertRefSnapshotID) requirement;
805+
}
806+
}
807+
808+
if (found != 1) {
809+
// TODO: handle this case, find min snapshot id, to rollback to give it creates
810+
// lineage
811+
// lets not complicate things rn
812+
throw new ValidationFailureException(e);
813+
}
814+
815+
Long parentSnapshotId = addSnapshot.snapshotId();
816+
// so we will first check all the snapshots on the top of
817+
// base on which the snapshot we want to commit is of type REPLACE ops.
818+
Long parentToRollbackTo = ops.current().currentSnapshot().snapshotId();
819+
List<MetadataUpdate> updateToRemoveSnapshot = new ArrayList<>();
820+
while (!Objects.equals(parentToRollbackTo, parentSnapshotId)) {
821+
Snapshot snap = ops.current().snapshot(parentToRollbackTo);
822+
if (!DataOperations.REPLACE.equals(snap.operation())) {
823+
break;
824+
}
825+
updateToRemoveSnapshot.add(
826+
new MetadataUpdate.RemoveSnapshot(snap.snapshotId()));
827+
parentToRollbackTo = snap.parentId();
828+
}
829+
830+
MetadataUpdate.SetSnapshotRef ref = null;
831+
// find the SetRefName snapshot update
832+
for (MetadataUpdate update : request.updates()) {
833+
if (update instanceof MetadataUpdate.SetSnapshotRef) {
834+
++found;
835+
ref = (MetadataUpdate.SetSnapshotRef) update;
836+
}
837+
}
838+
839+
if (found != 1 || (!Objects.equals(parentToRollbackTo, parentSnapshotId))) {
840+
// nothing can be done as this implies there was a non replace
841+
// snapshot in between or there is more than setRef ops, we don't know where
842+
// to go.
843+
throw new ValidationFailureException(e);
844+
}
845+
846+
// first we should also set back the ref we wanted to set, back to the base
847+
// on which the current update is based on.
848+
metadataBuilder.setBranchSnapshot(parentSnapshotId, ref.name());
849+
850+
// apply the remove snapshots update in the current metadata.
851+
// NOTE: we need to setRef to parent first and then apply remove as the remove
852+
// will drop. The tags / branch which don't have reference.
853+
// NOTE: we can skip removing the now orphan base. Its not a hard requirement.
854+
// just something good to do, and not leave for Remove Orphans.
855+
updateToRemoveSnapshot.forEach((update -> update.applyTo(metadataBuilder)));
856+
// Ref rolled back update correctly to snapshot to be committed parent now.
857+
newBase = metadataBuilder.build();
858+
}
859+
860+
// double check if the requirements passes now.
861+
try {
862+
TableMetadata baseWithRemovedSnaps = newBase;
863+
request
864+
.requirements()
865+
.forEach((requirement) -> requirement.validate(baseWithRemovedSnaps));
866+
} catch (CommitFailedException e) {
867+
throw new ValidationFailureException(e);
868+
}
869+
870+
TableMetadata.Builder newMetadataBuilder = TableMetadata.buildFrom(newBase);
871+
request.updates().forEach((update) -> update.applyTo(newMetadataBuilder));
872+
TableMetadata updated = newMetadataBuilder.build();
873+
// always commit this
874+
taskOps.commit(base, updated);
875+
});
876+
} catch (ValidationFailureException e) {
877+
throw e.wrapped();
878+
}
879+
880+
return ops.current();
881+
}
882+
883+
private static class ValidationFailureException extends RuntimeException {
884+
private final CommitFailedException wrapped;
885+
886+
private ValidationFailureException(CommitFailedException cause) {
887+
super(cause);
888+
this.wrapped = cause;
889+
}
890+
891+
public CommitFailedException wrapped() {
892+
return this.wrapped;
893+
}
703894
}
704895

705896
public LoadTableResponse updateTableForStagedCreate(

0 commit comments

Comments
 (0)