Skip to content

Commit 285cc29

Browse files
committed
Add E2E test cases
1 parent c2f50d3 commit 285cc29

File tree

2 files changed

+123
-25
lines changed

2 files changed

+123
-25
lines changed

quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java

Lines changed: 100 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import static java.nio.charset.StandardCharsets.UTF_8;
2222
import static org.apache.iceberg.types.Types.NestedField.required;
23+
import static org.assertj.core.api.Assertions.assertThat;
24+
import static org.assertj.core.api.Fail.fail;
2325
import static org.mockito.ArgumentMatchers.any;
2426
import static org.mockito.ArgumentMatchers.isA;
2527
import static org.mockito.Mockito.doReturn;
@@ -30,6 +32,8 @@
3032
import com.azure.core.exception.HttpResponseException;
3133
import com.google.cloud.storage.StorageException;
3234
import com.google.common.collect.ImmutableMap;
35+
import com.google.common.collect.Iterables;
36+
import com.google.common.collect.Streams;
3337
import io.quarkus.test.junit.QuarkusMock;
3438
import io.quarkus.test.junit.QuarkusTestProfile;
3539
import io.quarkus.test.junit.TestProfile;
@@ -38,6 +42,7 @@
3842
import jakarta.inject.Inject;
3943
import jakarta.ws.rs.core.SecurityContext;
4044
import java.io.IOException;
45+
import java.io.UncheckedIOException;
4146
import java.lang.reflect.Method;
4247
import java.time.Clock;
4348
import java.util.Arrays;
@@ -49,12 +54,18 @@
4954
import java.util.UUID;
5055
import java.util.function.Function;
5156
import java.util.function.Supplier;
57+
import java.util.stream.Collectors;
5258
import java.util.stream.Stream;
5359
import org.apache.commons.lang3.NotImplementedException;
5460
import org.apache.iceberg.BaseTable;
5561
import org.apache.iceberg.CatalogProperties;
62+
import org.apache.iceberg.ContentFile;
63+
import org.apache.iceberg.ContentScanTask;
64+
import org.apache.iceberg.DataOperations;
5665
import org.apache.iceberg.DeleteFile;
5766
import org.apache.iceberg.FileMetadata;
67+
import org.apache.iceberg.FileScanTask;
68+
import org.apache.iceberg.MetadataUpdate;
5869
import org.apache.iceberg.PartitionSpec;
5970
import org.apache.iceberg.RowDelta;
6071
import org.apache.iceberg.Schema;
@@ -63,6 +74,9 @@
6374
import org.apache.iceberg.Table;
6475
import org.apache.iceberg.TableMetadata;
6576
import org.apache.iceberg.TableMetadataParser;
77+
import org.apache.iceberg.TableOperations;
78+
import org.apache.iceberg.UpdateRequirement;
79+
import org.apache.iceberg.UpdateRequirements;
6680
import org.apache.iceberg.UpdateSchema;
6781
import org.apache.iceberg.catalog.CatalogTests;
6882
import org.apache.iceberg.catalog.Namespace;
@@ -73,8 +87,11 @@
7387
import org.apache.iceberg.exceptions.ForbiddenException;
7488
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
7589
import org.apache.iceberg.inmemory.InMemoryFileIO;
90+
import org.apache.iceberg.io.CloseableIterable;
7691
import org.apache.iceberg.io.FileIO;
92+
import org.apache.iceberg.rest.requests.UpdateTableRequest;
7793
import org.apache.iceberg.types.Types;
94+
import org.apache.iceberg.util.CharSequenceSet;
7895
import org.apache.polaris.core.PolarisCallContext;
7996
import org.apache.polaris.core.PolarisDiagnostics;
8097
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
@@ -115,6 +132,7 @@
115132
import org.apache.polaris.service.admin.PolarisAdminService;
116133
import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
117134
import org.apache.polaris.service.catalog.iceberg.IcebergCatalog;
135+
import org.apache.polaris.service.catalog.iceberg.IcebergCatalogHandler;
118136
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
119137
import org.apache.polaris.service.catalog.io.ExceptionMappingFileIO;
120138
import org.apache.polaris.service.catalog.io.FileIOFactory;
@@ -129,7 +147,9 @@
129147
import org.apache.polaris.service.types.NotificationRequest;
130148
import org.apache.polaris.service.types.NotificationType;
131149
import org.apache.polaris.service.types.TableUpdateNotification;
150+
import org.assertj.core.api.AbstractCollectionAssert;
132151
import org.assertj.core.api.Assertions;
152+
import org.assertj.core.api.ListAssert;
133153
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
134154
import org.junit.jupiter.api.AfterEach;
135155
import org.junit.jupiter.api.Assumptions;
@@ -520,36 +540,100 @@ public void testConcurrentWritesWithRollbackNonEmptyTable() {
520540
catalog.createNamespace(NS);
521541
}
522542

523-
Table table = catalog.buildTable(TABLE, SCHEMA).withPartitionSpec(SPEC).create();
543+
Table table =
544+
catalog
545+
.buildTable(TABLE, SCHEMA)
546+
.withPartitionSpec(SPEC)
547+
.withProperties(Map.of("rollback.compaction.on-conflicts.enabled", "true"))
548+
.create();
524549
this.assertNoFiles(table);
550+
525551
// commit FILE_A
526552
catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit();
527553
this.assertFiles(catalog.loadTable(TABLE), FILE_A);
528554
table.refresh();
555+
529556
long lastSnapshotId = table.currentSnapshot().snapshotId();
530-
// Apply the deletes based on FILE_A
531-
// this should conflict when we try to commit
532-
// without the change
533-
RowDelta rowDelta =
557+
558+
// Apply the deletes based on FILE_A
559+
// this should conflict when we try to commit without the change.
560+
RowDelta originalRowDelta =
534561
table
535562
.newRowDelta()
536563
.addDeletes(FILE_A_DELETES)
537564
.validateFromSnapshot(lastSnapshotId)
538565
.validateDataFilesExist(List.of(FILE_A.location()));
539-
Snapshot uncomittedSnapshot = rowDelta.apply();
566+
// Make client ready with updates, don't reach out to IRC server yet
567+
Snapshot s = originalRowDelta.apply();
568+
TableOperations ops = ((BaseTable) catalog.loadTable(TABLE)).operations();
569+
TableMetadata base = ops.current();
570+
TableMetadata.Builder update = TableMetadata.buildFrom(base);
571+
update.setBranchSnapshot(s, "main");
572+
TableMetadata updatedMetadata = update.build();
573+
List<MetadataUpdate> updates = updatedMetadata.changes();
574+
List<UpdateRequirement> requirements = UpdateRequirements.forUpdateTable(base, updates);
575+
UpdateTableRequest request = UpdateTableRequest.create(TABLE, requirements, updates);
540576

541577
// replace FILE_A with FILE_B
578+
// commit the transaction.
542579
catalog.loadTable(TABLE).newRewrite().addFile(FILE_B).deleteFile(FILE_A).commit();
543580

544-
// no try commit FILE_A delete, It's impossible to mimic the contention
545-
// situation presently as commit triggers client side validation again
546-
// even when the apply() method has been called before
547-
// TODO: to better add such test cases, may be mocking the taskOps to not
548-
// do the refresh, hence make on base snapshot.
549-
// also there is an issue on overriding this as all things are package scope
550-
// For ex MergingSnapshotPro
551-
// rowDelta.commit();
552-
this.assertFiles(catalog.loadTable(TABLE), FILE_B);
581+
try {
582+
// Now call IRC server to commit delete operation.
583+
IcebergCatalogHandler.commit(((BaseTable) catalog.loadTable(TABLE)).operations(), request);
584+
} catch (Exception e) {
585+
fail("Rollback Compaction on conflict feature failed : " + e.getMessage());
586+
}
587+
588+
table.refresh();
589+
590+
// Assert only 2 snapshots and no snapshot of REPLACE left.
591+
Snapshot currentSnapshot = table.snapshot(table.refs().get("main").snapshotId());
592+
int totalSnapshots = 1;
593+
while (currentSnapshot.parentId() != null) {
594+
// no snapshot in the hierarchy for REPLACE operations
595+
assertThat(currentSnapshot.operation()).isNotEqualTo(DataOperations.REPLACE);
596+
currentSnapshot = table.snapshot(currentSnapshot.parentId());
597+
totalSnapshots += 1;
598+
}
599+
assertThat(totalSnapshots).isEqualTo(2);
600+
601+
// Inspect the files 1 DELETE file i.e. FILE_A_DELETES and 1 DATA FILE FILE_A
602+
try {
603+
try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
604+
List<CharSequence> dataFilePaths =
605+
Streams.stream(tasks)
606+
.map(ContentScanTask::file)
607+
.map(ContentFile::location)
608+
.collect(Collectors.toList());
609+
List<CharSequence> deleteFilePaths =
610+
Streams.stream(tasks)
611+
.flatMap(t -> t.deletes().stream().map(ContentFile::location))
612+
.collect(Collectors.toList());
613+
((ListAssert)
614+
Assertions.assertThat(dataFilePaths)
615+
.as("Should contain expected number of data files", new Object[0]))
616+
.hasSize(1);
617+
((ListAssert)
618+
Assertions.assertThat(deleteFilePaths)
619+
.as("Should contain expected number of delete files", new Object[0]))
620+
.hasSize(1);
621+
((AbstractCollectionAssert)
622+
Assertions.assertThat(CharSequenceSet.of(dataFilePaths))
623+
.as("Should contain correct file paths", new Object[0]))
624+
.isEqualTo(
625+
CharSequenceSet.of(
626+
Iterables.transform(Arrays.asList(FILE_A), ContentFile::location)));
627+
((AbstractCollectionAssert)
628+
Assertions.assertThat(CharSequenceSet.of(deleteFilePaths))
629+
.as("Should contain correct file paths", new Object[0]))
630+
.isEqualTo(
631+
CharSequenceSet.of(
632+
Iterables.transform(Arrays.asList(FILE_A_DELETES), ContentFile::location)));
633+
}
634+
} catch (IOException e) {
635+
throw new UncheckedIOException(e);
636+
}
553637
}
554638

555639
@Test
@@ -584,7 +668,7 @@ public void testValidateNotificationWhenTableAndNamespacesDontExist() {
584668
// We should be able to send the notification without creating the metadata file since it's
585669
// only validating the ability to send the CREATE/UPDATE notification possibly before actually
586670
// creating the table at all on the remote catalog.
587-
Assertions.assertThat(catalog.sendNotification(table, request))
671+
assertThat(catalog.sendNotification(table, request))
588672
.as("Notification should be sent successfully")
589673
.isTrue();
590674
Assertions.assertThat(catalog.namespaceExists(namespace))

service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
*/
1919
package org.apache.polaris.service.catalog.iceberg;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import com.google.common.base.Preconditions;
2223
import com.google.common.collect.Maps;
2324
import jakarta.ws.rs.core.SecurityContext;
2425
import java.io.Closeable;
26+
import java.lang.reflect.Field;
2527
import java.time.OffsetDateTime;
2628
import java.time.ZoneOffset;
2729
import java.util.ArrayList;
@@ -127,6 +129,9 @@
127129
public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseable {
128130
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergCatalogHandler.class);
129131

132+
private static final String ROLLBACK_REPLACE_ENABLED_PROPERTY =
133+
"rollback.compaction.on-conflicts.enabled";
134+
130135
private final PolarisMetaStoreManager metaStoreManager;
131136
private final UserSecretsManager userSecretsManager;
132137
private final CallContextCatalogFactory catalogFactory;
@@ -815,7 +820,8 @@ private static LoadTableResponse updateTableWithRollback(
815820
return LoadTableResponse.builder().withTableMetadata(finalMetadata).build();
816821
}
817822

818-
static TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
823+
@VisibleForTesting
824+
public static TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
819825
AtomicBoolean isRetry = new AtomicBoolean(false);
820826

821827
try {
@@ -827,15 +833,11 @@ static TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
827833
(taskOps) -> {
828834
TableMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current();
829835
isRetry.set(true);
830-
// My prev pr : https://github.com/apache/iceberg/pull/5888
831-
// Taking this feature behind a table property presently.
836+
// Prev PR: https://github.com/apache/iceberg/pull/5888
832837
boolean rollbackCompaction =
833838
PropertyUtil.propertyAsBoolean(
834-
taskOps.current().properties(),
835-
"rollback.compaction.on-conflicts.enabled",
836-
false);
837-
// otherwise create a metadataUpdate to remove the snapshots we had
838-
// applied our rollback requests first
839+
taskOps.current().properties(), ROLLBACK_REPLACE_ENABLED_PROPERTY, false);
840+
839841
TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base);
840842
TableMetadata newBase = base;
841843
try {
@@ -870,7 +872,7 @@ static TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
870872
Long parentSnapshotId = addSnapshot.snapshotId();
871873
// so we will first check all the snapshots on the top of
872874
// base on which the snapshot we want to commit is of type REPLACE ops.
873-
Long parentToRollbackTo = ops.current().currentSnapshot().snapshotId();
875+
Long parentToRollbackTo = base.currentSnapshot().snapshotId();
874876
List<MetadataUpdate> updateToRemoveSnapshot = new ArrayList<>();
875877
while (!Objects.equals(parentToRollbackTo, parentSnapshotId)) {
876878
Snapshot snap = ops.current().snapshot(parentToRollbackTo);
@@ -883,6 +885,7 @@ static TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
883885
}
884886

885887
MetadataUpdate.SetSnapshotRef ref = null;
888+
found = 0;
886889
// find the SetRefName snapshot update
887890
for (MetadataUpdate update : request.updates()) {
888891
if (update instanceof MetadataUpdate.SetSnapshotRef) {
@@ -910,6 +913,17 @@ static TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
910913
updateToRemoveSnapshot.forEach((update -> update.applyTo(metadataBuilder)));
911914
// Ref rolled back update correctly to snapshot to be committed parent now.
912915
newBase = metadataBuilder.build();
916+
// move the lastSequenceNumber back, to apply snapshot properly.
917+
// Seq number are considered increasing monotonically, snapshot over snapshot, so
918+
// this is important.
919+
Class<?> clazz = newBase.getClass();
920+
try {
921+
Field field = clazz.getDeclaredField("lastSequenceNumber");
922+
field.setAccessible(true);
923+
field.set(newBase, newBase.lastSequenceNumber() - 1);
924+
} catch (NoSuchFieldException | IllegalAccessException ex) {
925+
throw new RuntimeException(ex);
926+
}
913927
}
914928

915929
// double check if the requirements passes now.

0 commit comments

Comments
 (0)