Skip to content

Commit c2f50d3

Browse files
committed
POC: Rollback compaction on conflict
1 parent 0b4539e commit c2f50d3

File tree

2 files changed

+244
-1
lines changed

2 files changed

+244
-1
lines changed

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,12 @@
5353
import org.apache.commons.lang3.NotImplementedException;
5454
import org.apache.iceberg.BaseTable;
5555
import org.apache.iceberg.CatalogProperties;
56+
import org.apache.iceberg.DeleteFile;
57+
import org.apache.iceberg.FileMetadata;
5658
import org.apache.iceberg.PartitionSpec;
59+
import org.apache.iceberg.RowDelta;
5760
import org.apache.iceberg.Schema;
61+
import org.apache.iceberg.Snapshot;
5862
import org.apache.iceberg.SortOrder;
5963
import org.apache.iceberg.Table;
6064
import org.apache.iceberg.TableMetadata;
@@ -147,6 +151,15 @@
147151
@TestProfile(IcebergCatalogTest.Profile.class)
148152
public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> {
149153

154+
DeleteFile FILE_A_DELETES =
155+
FileMetadata.deleteFileBuilder(SPEC)
156+
.ofPositionDeletes()
157+
.withPath("/path/to/data-a-deletes.parquet")
158+
.withFileSizeInBytes(10)
159+
.withPartitionPath("id_bucket=0") // easy way to set partition data for now
160+
.withRecordCount(1)
161+
.build();
162+
150163
public static class Profile implements QuarkusTestProfile {
151164

152165
@Override
@@ -500,6 +513,45 @@ public void testCreateNestedNamespaceUnderMissingParent() {
500513
.hasMessageContaining("Parent");
501514
}
502515

516+
@Test
517+
public void testConcurrentWritesWithRollbackNonEmptyTable() {
518+
IcebergCatalog catalog = this.catalog();
519+
if (this.requiresNamespaceCreate()) {
520+
catalog.createNamespace(NS);
521+
}
522+
523+
Table table = catalog.buildTable(TABLE, SCHEMA).withPartitionSpec(SPEC).create();
524+
this.assertNoFiles(table);
525+
// commit FILE_A
526+
catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit();
527+
this.assertFiles(catalog.loadTable(TABLE), FILE_A);
528+
table.refresh();
529+
long lastSnapshotId = table.currentSnapshot().snapshotId();
530+
// Apply the deletes based on FILE_A
531+
// this should conflict when we try to commit
532+
// without the change
533+
RowDelta rowDelta =
534+
table
535+
.newRowDelta()
536+
.addDeletes(FILE_A_DELETES)
537+
.validateFromSnapshot(lastSnapshotId)
538+
.validateDataFilesExist(List.of(FILE_A.location()));
539+
Snapshot uncomittedSnapshot = rowDelta.apply();
540+
541+
// replace FILE_A with FILE_B
542+
catalog.loadTable(TABLE).newRewrite().addFile(FILE_B).deleteFile(FILE_A).commit();
543+
544+
// no try commit FILE_A delete, It's impossible to mimic the contention
545+
// situation presently as commit triggers client side validation again
546+
// even when the apply() method has been called before
547+
// TODO: to better add such test cases, may be mocking the taskOps to not
548+
// do the refresh, hence make on base snapshot.
549+
// also there is an issue on overriding this as all things are package scope
550+
// For ex MergingSnapshotPro
551+
// rowDelta.commit();
552+
this.assertFiles(catalog.loadTable(TABLE), FILE_B);
553+
}
554+
503555
@Test
504556
public void testValidateNotificationWhenTableAndNamespacesDontExist() {
505557
Assumptions.assumeTrue(

service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java

Lines changed: 192 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,24 @@
2929
import java.util.HashSet;
3030
import java.util.List;
3131
import java.util.Map;
32+
import java.util.Objects;
3233
import java.util.Optional;
3334
import java.util.Set;
35+
import java.util.concurrent.atomic.AtomicBoolean;
3436
import java.util.stream.Collectors;
3537
import org.apache.iceberg.BaseMetadataTable;
3638
import org.apache.iceberg.BaseTable;
39+
import org.apache.iceberg.BaseTransaction;
40+
import org.apache.iceberg.DataOperations;
3741
import org.apache.iceberg.MetadataUpdate;
3842
import org.apache.iceberg.PartitionSpec;
43+
import org.apache.iceberg.Schema;
44+
import org.apache.iceberg.Snapshot;
3945
import org.apache.iceberg.SortOrder;
4046
import org.apache.iceberg.Table;
4147
import org.apache.iceberg.TableMetadata;
4248
import org.apache.iceberg.TableOperations;
49+
import org.apache.iceberg.Transaction;
4350
import org.apache.iceberg.UpdateRequirement;
4451
import org.apache.iceberg.catalog.Catalog;
4552
import org.apache.iceberg.catalog.Namespace;
@@ -71,6 +78,9 @@
7178
import org.apache.iceberg.rest.responses.LoadTableResponse;
7279
import org.apache.iceberg.rest.responses.LoadViewResponse;
7380
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
81+
import org.apache.iceberg.types.Types;
82+
import org.apache.iceberg.util.PropertyUtil;
83+
import org.apache.iceberg.util.Tasks;
7484
import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
7585
import org.apache.polaris.core.auth.PolarisAuthorizer;
7686
import org.apache.polaris.core.config.FeatureConfiguration;
@@ -754,7 +764,188 @@ public LoadTableResponse updateTable(
754764
if (isStaticFacade(catalog)) {
755765
throw new BadRequestException("Cannot update table on static-facade external catalogs.");
756766
}
757-
return CatalogHandlers.updateTable(baseCatalog, tableIdentifier, applyUpdateFilters(request));
767+
// TODO: pending discussion if table property is right way, or a writer specific knob is
768+
// required.
769+
return updateTableWithRollback(baseCatalog, tableIdentifier, applyUpdateFilters(request));
770+
}
771+
772+
private static TableMetadata create(TableOperations ops, UpdateTableRequest request) {
773+
request.requirements().forEach((requirement) -> requirement.validate(ops.current()));
774+
Optional<Integer> formatVersion =
775+
request.updates().stream()
776+
.filter((update) -> update instanceof MetadataUpdate.UpgradeFormatVersion)
777+
.map((update) -> ((MetadataUpdate.UpgradeFormatVersion) update).formatVersion())
778+
.findFirst();
779+
TableMetadata.Builder builder =
780+
(TableMetadata.Builder)
781+
formatVersion
782+
.map(TableMetadata::buildFromEmpty)
783+
.orElseGet(TableMetadata::buildFromEmpty);
784+
request.updates().forEach((update) -> update.applyTo(builder));
785+
ops.commit((TableMetadata) null, builder.build());
786+
return ops.current();
787+
}
788+
789+
// TODO: Clean this up when CatalogHandler become extensible.
790+
// Copy of CatalogHandler#update
791+
private static LoadTableResponse updateTableWithRollback(
792+
Catalog catalog, TableIdentifier ident, UpdateTableRequest request) {
793+
Schema EMPTY_SCHEMA = new Schema(new Types.NestedField[0]);
794+
TableMetadata finalMetadata;
795+
if (isCreate(request)) {
796+
Transaction transaction =
797+
catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction();
798+
if (!(transaction instanceof BaseTransaction)) {
799+
throw new IllegalStateException(
800+
"Cannot wrap catalog that does not produce BaseTransaction");
801+
}
802+
803+
BaseTransaction baseTransaction = (BaseTransaction) transaction;
804+
finalMetadata = create(baseTransaction.underlyingOps(), request);
805+
} else {
806+
Table table = catalog.loadTable(ident);
807+
if (!(table instanceof BaseTable)) {
808+
throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable");
809+
}
810+
811+
TableOperations ops = ((BaseTable) table).operations();
812+
finalMetadata = commit(ops, request);
813+
}
814+
815+
return LoadTableResponse.builder().withTableMetadata(finalMetadata).build();
816+
}
817+
818+
static TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
819+
AtomicBoolean isRetry = new AtomicBoolean(false);
820+
821+
try {
822+
Tasks.foreach(new TableOperations[] {ops})
823+
.retry(4)
824+
.exponentialBackoff(100L, 60000L, 1800000L, (double) 2.0F)
825+
.onlyRetryOn(CommitFailedException.class)
826+
.run(
827+
(taskOps) -> {
828+
TableMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current();
829+
isRetry.set(true);
830+
// My prev pr : https://github.com/apache/iceberg/pull/5888
831+
// Taking this feature behind a table property presently.
832+
boolean rollbackCompaction =
833+
PropertyUtil.propertyAsBoolean(
834+
taskOps.current().properties(),
835+
"rollback.compaction.on-conflicts.enabled",
836+
false);
837+
// otherwise create a metadataUpdate to remove the snapshots we had
838+
// applied our rollback requests first
839+
TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base);
840+
TableMetadata newBase = base;
841+
try {
842+
request.requirements().forEach((requirement) -> requirement.validate(base));
843+
} catch (CommitFailedException e) {
844+
if (!rollbackCompaction) {
845+
throw new ValidationFailureException(e);
846+
}
847+
// Since snapshot has already been created at the client end.
848+
// Nothing much can be done, we can move this
849+
// to writer specific thing, but it would be cool if catalog does this for us.
850+
// Inspect that the requirements states that snapshot
851+
// ref needs to be asserted this usually means in the update section
852+
// it has addSnapshot and setSnapshotRef
853+
UpdateRequirement.AssertRefSnapshotID addSnapshot = null;
854+
int found = 0;
855+
for (UpdateRequirement requirement : request.requirements()) {
856+
// there should be only add snapshot request
857+
if (requirement instanceof UpdateRequirement.AssertRefSnapshotID) {
858+
++found;
859+
addSnapshot = (UpdateRequirement.AssertRefSnapshotID) requirement;
860+
}
861+
}
862+
863+
if (found != 1) {
864+
// TODO: handle this case, find min snapshot id, to rollback to give it creates
865+
// lineage
866+
// lets not complicate things rn
867+
throw new ValidationFailureException(e);
868+
}
869+
870+
Long parentSnapshotId = addSnapshot.snapshotId();
871+
// so we will first check all the snapshots on the top of
872+
// base on which the snapshot we want to commit is of type REPLACE ops.
873+
Long parentToRollbackTo = ops.current().currentSnapshot().snapshotId();
874+
List<MetadataUpdate> updateToRemoveSnapshot = new ArrayList<>();
875+
while (!Objects.equals(parentToRollbackTo, parentSnapshotId)) {
876+
Snapshot snap = ops.current().snapshot(parentToRollbackTo);
877+
if (!DataOperations.REPLACE.equals(snap.operation())) {
878+
break;
879+
}
880+
updateToRemoveSnapshot.add(
881+
new MetadataUpdate.RemoveSnapshot(snap.snapshotId()));
882+
parentToRollbackTo = snap.parentId();
883+
}
884+
885+
MetadataUpdate.SetSnapshotRef ref = null;
886+
// find the SetRefName snapshot update
887+
for (MetadataUpdate update : request.updates()) {
888+
if (update instanceof MetadataUpdate.SetSnapshotRef) {
889+
++found;
890+
ref = (MetadataUpdate.SetSnapshotRef) update;
891+
}
892+
}
893+
894+
if (found != 1 || (!Objects.equals(parentToRollbackTo, parentSnapshotId))) {
895+
// nothing can be done as this implies there was a non replace
896+
// snapshot in between or there is more than setRef ops, we don't know where
897+
// to go.
898+
throw new ValidationFailureException(e);
899+
}
900+
901+
// first we should also set back the ref we wanted to set, back to the base
902+
// on which the current update is based on.
903+
metadataBuilder.setBranchSnapshot(parentSnapshotId, ref.name());
904+
905+
// apply the remove snapshots update in the current metadata.
906+
// NOTE: we need to setRef to parent first and then apply remove as the remove
907+
// will drop. The tags / branch which don't have reference.
908+
// NOTE: we can skip removing the now orphan base. Its not a hard requirement.
909+
// just something good to do, and not leave for Remove Orphans.
910+
updateToRemoveSnapshot.forEach((update -> update.applyTo(metadataBuilder)));
911+
// Ref rolled back update correctly to snapshot to be committed parent now.
912+
newBase = metadataBuilder.build();
913+
}
914+
915+
// double check if the requirements passes now.
916+
try {
917+
TableMetadata baseWithRemovedSnaps = newBase;
918+
request
919+
.requirements()
920+
.forEach((requirement) -> requirement.validate(baseWithRemovedSnaps));
921+
} catch (CommitFailedException e) {
922+
throw new ValidationFailureException(e);
923+
}
924+
925+
TableMetadata.Builder newMetadataBuilder = TableMetadata.buildFrom(newBase);
926+
request.updates().forEach((update) -> update.applyTo(newMetadataBuilder));
927+
TableMetadata updated = newMetadataBuilder.build();
928+
// always commit this
929+
taskOps.commit(base, updated);
930+
});
931+
} catch (ValidationFailureException e) {
932+
throw e.wrapped();
933+
}
934+
935+
return ops.current();
936+
}
937+
938+
private static class ValidationFailureException extends RuntimeException {
939+
private final CommitFailedException wrapped;
940+
941+
private ValidationFailureException(CommitFailedException cause) {
942+
super(cause);
943+
this.wrapped = cause;
944+
}
945+
946+
public CommitFailedException wrapped() {
947+
return this.wrapped;
948+
}
758949
}
759950

760951
public LoadTableResponse updateTableForStagedCreate(

0 commit comments

Comments
 (0)