diff --git a/quarkus/service/build.gradle.kts b/quarkus/service/build.gradle.kts index 726464862b..7ab5ec87a9 100644 --- a/quarkus/service/build.gradle.kts +++ b/quarkus/service/build.gradle.kts @@ -90,6 +90,7 @@ dependencies { testFixturesApi(project(":polaris-tests")) testImplementation(project(":polaris-api-management-model")) + testImplementation(testFixtures(project(":polaris-service-common"))) testImplementation("org.apache.iceberg:iceberg-api:${libs.versions.iceberg.get()}:tests") testImplementation("org.apache.iceberg:iceberg-core:${libs.versions.iceberg.get()}:tests") diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/TestServices.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/TestServices.java deleted file mode 100644 index 81d34dd515..0000000000 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/TestServices.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.quarkus; - -import com.google.auth.oauth2.AccessToken; -import com.google.auth.oauth2.GoogleCredentials; -import jakarta.ws.rs.core.SecurityContext; -import java.security.Principal; -import java.time.Clock; -import java.time.Instant; -import java.util.Date; -import java.util.Map; -import java.util.Set; -import org.apache.polaris.core.PolarisDiagnostics; -import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; -import org.apache.polaris.core.auth.PolarisAuthorizer; -import org.apache.polaris.core.context.RealmId; -import org.apache.polaris.core.entity.PolarisEntity; -import org.apache.polaris.core.entity.PrincipalEntity; -import org.apache.polaris.core.persistence.PolarisEntityManager; -import org.apache.polaris.core.persistence.PolarisMetaStoreManager; -import org.apache.polaris.core.persistence.PolarisMetaStoreSession; -import org.apache.polaris.service.admin.PolarisServiceImpl; -import org.apache.polaris.service.admin.api.PolarisCatalogsApi; -import org.apache.polaris.service.catalog.IcebergCatalogAdapter; -import org.apache.polaris.service.catalog.api.IcebergRestCatalogApi; -import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService; -import org.apache.polaris.service.catalog.io.FileIOFactory; -import org.apache.polaris.service.config.DefaultConfigurationStore; -import org.apache.polaris.service.config.RealmEntityManagerFactory; -import org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory; -import org.apache.polaris.service.quarkus.catalog.io.TestFileIOFactory; -import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; -import org.apache.polaris.service.task.TaskExecutor; -import org.mockito.Mockito; - -public record TestServices( - IcebergRestCatalogApi restApi, - PolarisCatalogsApi catalogsApi, - RealmId realmId, - SecurityContext securityContext) { - - private static final RealmId testRealm = RealmId.newRealmId("test-realm"); - - public static TestServices inMemory(Map config) { - return inMemory(new TestFileIOFactory(), config); - } - - public static TestServices inMemory(FileIOFactory ioFactory) { - return inMemory(ioFactory, Map.of()); - } - - public static TestServices inMemory(FileIOFactory ioFactory, Map config) { - - DefaultConfigurationStore configurationStore = new DefaultConfigurationStore(config); - PolarisDiagnostics polarisDiagnostics = Mockito.mock(PolarisDiagnostics.class); - - PolarisStorageIntegrationProviderImpl storageIntegrationProvider = - new PolarisStorageIntegrationProviderImpl( - Mockito::mock, - () -> GoogleCredentials.create(new AccessToken("abc", new Date())), - configurationStore); - - InMemoryPolarisMetaStoreManagerFactory metaStoreManagerFactory = - new InMemoryPolarisMetaStoreManagerFactory( - storageIntegrationProvider, - configurationStore, - polarisDiagnostics, - Clock.systemDefaultZone()); - - PolarisMetaStoreManager metaStoreManager = - metaStoreManagerFactory.getOrCreateMetaStoreManager(testRealm); - - PolarisMetaStoreSession session = - metaStoreManagerFactory.getOrCreateSessionSupplier(testRealm).get(); - - RealmEntityManagerFactory realmEntityManagerFactory = - new RealmEntityManagerFactory(metaStoreManagerFactory, polarisDiagnostics) {}; - - PolarisEntityManager entityManager = - realmEntityManagerFactory.getOrCreateEntityManager(testRealm); - - PolarisAuthorizer authorizer = Mockito.mock(PolarisAuthorizer.class); - - IcebergRestCatalogApiService service = - new IcebergCatalogAdapter( - testRealm, - entityManager, - metaStoreManager, - session, - configurationStore, - polarisDiagnostics, - authorizer, - Mockito.mock(TaskExecutor.class), - ioFactory); - - IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(service); - - PolarisMetaStoreManager.CreatePrincipalResult createdPrincipal = - metaStoreManager.createPrincipal( - session, - new PrincipalEntity.Builder() - .setName("test-principal") - .setCreateTimestamp(Instant.now().toEpochMilli()) - .setCredentialRotationRequiredState() - .build()); - - AuthenticatedPolarisPrincipal principal = - new AuthenticatedPolarisPrincipal( - PolarisEntity.of(createdPrincipal.getPrincipal()), Set.of()); - - SecurityContext securityContext = - new SecurityContext() { - @Override - public Principal getUserPrincipal() { - return principal; - } - - @Override - public boolean isUserInRole(String s) { - return false; - } - - @Override - public boolean isSecure() { - return true; - } - - @Override - public String getAuthenticationScheme() { - return ""; - } - }; - - PolarisCatalogsApi catalogsApi = - new PolarisCatalogsApi( - new PolarisServiceImpl( - entityManager, - metaStoreManager, - session, - configurationStore, - authorizer, - polarisDiagnostics)); - - return new TestServices(restApi, catalogsApi, testRealm, securityContext); - } -} diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/ManagementServiceTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/ManagementServiceTest.java index 4af7b2331f..c4e10852f7 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/ManagementServiceTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/ManagementServiceTest.java @@ -32,15 +32,14 @@ import org.apache.polaris.core.admin.model.PolarisCatalog; import org.apache.polaris.core.admin.model.StorageConfigInfo; import org.apache.polaris.core.admin.model.UpdateCatalogRequest; -import org.apache.polaris.service.quarkus.TestServices; -import org.apache.polaris.service.quarkus.catalog.io.TestFileIOFactory; +import org.apache.polaris.service.TestServices; import org.junit.jupiter.api.Test; public class ManagementServiceTest { static TestServices services = - TestServices.inMemory( - new TestFileIOFactory(), - Map.of("SUPPORTED_CATALOG_STORAGE_TYPES", List.of("S3", "GCS", "AZURE"))); + TestServices.builder() + .config(Map.of("SUPPORTED_CATALOG_STORAGE_TYPES", List.of("S3", "GCS", "AZURE"))) + .build(); @Test public void testCreateCatalogWithDisallowedStorageConfig() { diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java index 485f46348a..629951df7f 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java @@ -67,10 +67,11 @@ import org.apache.polaris.core.persistence.PolarisMetaStoreSession; import org.apache.polaris.service.admin.PolarisAdminService; import org.apache.polaris.service.catalog.BasePolarisCatalog; +import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView; import org.apache.polaris.service.catalog.io.DefaultFileIOFactory; +import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.config.DefaultConfigurationStore; import org.apache.polaris.service.config.RealmEntityManagerFactory; -import org.apache.polaris.service.quarkus.catalog.PolarisPassthroughResolutionView; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; @@ -160,6 +161,7 @@ public Map getConfigOverrides() { protected PolarisEntityManager entityManager; protected PolarisMetaStoreManager metaStoreManager; protected PolarisMetaStoreSession metaStoreSession; + protected FileIOFactory fileIOFactory; protected PolarisBaseEntity catalogEntity; protected PrincipalEntity principalEntity; protected RealmId realmId; @@ -385,6 +387,8 @@ private void initBaseCatalog() { PolarisPassthroughResolutionView passthroughView = new PolarisPassthroughResolutionView( entityManager, metaStoreSession, securityContext, CATALOG_NAME); + this.fileIOFactory = + new DefaultFileIOFactory(realmEntityManagerFactory, managerFactory, configurationStore); this.baseCatalog = new BasePolarisCatalog( realmId, @@ -396,7 +400,7 @@ private void initBaseCatalog() { passthroughView, securityContext, Mockito.mock(), - new DefaultFileIOFactory()); + fileIOFactory); this.baseCatalog.initialize( CATALOG_NAME, ImmutableMap.of( diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java index 61a3fdc53f..9e60049ce9 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java @@ -33,16 +33,14 @@ import org.apache.polaris.core.admin.model.CatalogProperties; import org.apache.polaris.core.admin.model.CreateCatalogRequest; import org.apache.polaris.core.admin.model.StorageConfigInfo; -import org.apache.polaris.service.quarkus.TestServices; -import org.apache.polaris.service.quarkus.catalog.io.TestFileIOFactory; +import org.apache.polaris.service.TestServices; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; public class PolarisOverlappingCatalogTest { static TestServices services = - TestServices.inMemory( - new TestFileIOFactory(), Map.of("ALLOW_OVERLAPPING_CATALOG_URLS", "false")); + TestServices.builder().config(Map.of("ALLOW_OVERLAPPING_CATALOG_URLS", "false")).build(); private Response createCatalog(String prefix, String defaultBaseLocation, boolean isExternal) { return createCatalog(prefix, defaultBaseLocation, isExternal, new ArrayList()); diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingTableTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingTableTest.java index 2791bfa733..03a5b72374 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingTableTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingTableTest.java @@ -36,7 +36,7 @@ import org.apache.polaris.core.admin.model.CreateCatalogRequest; import org.apache.polaris.core.admin.model.FileStorageConfigInfo; import org.apache.polaris.core.admin.model.StorageConfigInfo; -import org.apache.polaris.service.quarkus.TestServices; +import org.apache.polaris.service.TestServices; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -105,7 +105,7 @@ void testTableLocationRestrictions( Map serverConfig, Map catalogConfig, int expectedStatusForOverlaps) { - TestServices services = TestServices.inMemory(serverConfig); + TestServices services = TestServices.builder().config(serverConfig).build(); CatalogProperties.Builder propertiesBuilder = CatalogProperties.builder() diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java index 38580b9a10..cab70907f2 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java @@ -23,7 +23,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -96,11 +95,12 @@ import org.apache.polaris.core.storage.cache.StorageCredentialCache; import org.apache.polaris.service.admin.PolarisAdminService; import org.apache.polaris.service.catalog.BasePolarisCatalog; +import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView; import org.apache.polaris.service.catalog.io.DefaultFileIOFactory; import org.apache.polaris.service.catalog.io.FileIOFactory; +import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory; import org.apache.polaris.service.config.RealmEntityManagerFactory; import org.apache.polaris.service.exception.IcebergExceptionMapper; -import org.apache.polaris.service.quarkus.catalog.io.TestFileIOFactory; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; import org.apache.polaris.service.task.TableCleanupTaskHandler; import org.apache.polaris.service.task.TaskExecutor; @@ -163,6 +163,7 @@ public Map getConfigOverrides() { private PolarisMetaStoreSession metaStoreSession; private PolarisAdminService adminService; private PolarisEntityManager entityManager; + private FileIOFactory fileIOFactory; private AuthenticatedPolarisPrincipal authenticatedRoot; private PolarisEntity catalogEntity; private SecurityContext securityContext; @@ -241,22 +242,9 @@ public void before(TestInfo testInfo) { new PolarisPassthroughResolutionView( entityManager, metaStoreSession, securityContext, CATALOG_NAME); TaskExecutor taskExecutor = Mockito.mock(); - this.catalog = - new BasePolarisCatalog( - realmId, - entityManager, - metaStoreManager, - metaStoreSession, - configurationStore, - diagServices, - passthroughView, - securityContext, - taskExecutor, - new DefaultFileIOFactory()); - this.catalog.initialize( - CATALOG_NAME, - ImmutableMap.of( - CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO")); + this.fileIOFactory = + new DefaultFileIOFactory(entityManagerFactory, managerFactory, configurationStore); + StsClient stsClient = Mockito.mock(StsClient.class); when(stsClient.assumeRole(isA(AssumeRoleRequest.class))) .thenReturn( @@ -273,6 +261,23 @@ public void before(TestInfo testInfo) { when(storageIntegrationProvider.getStorageIntegrationForConfig( isA(AwsStorageConfigurationInfo.class))) .thenReturn((PolarisStorageIntegration) storageIntegration); + + this.catalog = + new BasePolarisCatalog( + realmId, + entityManager, + metaStoreManager, + metaStoreSession, + configurationStore, + diagServices, + passthroughView, + securityContext, + taskExecutor, + fileIOFactory); + this.catalog.initialize( + CATALOG_NAME, + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO")); } @AfterEach @@ -509,7 +514,8 @@ public void testValidateNotificationFailToCreateFileIO() throws IOException { // filename. final String tableLocation = "s3://externally-owned-bucket/validate_table/"; final String tableMetadataLocation = tableLocation + "metadata/"; - FileIOFactory fileIoFactory = spy(new DefaultFileIOFactory()); + FileIOFactory fileIoFactory = + spy(new DefaultFileIOFactory(entityManagerFactory, managerFactory, configurationStore)); BasePolarisCatalog catalog = new BasePolarisCatalog( realmId, @@ -541,7 +547,7 @@ public void testValidateNotificationFailToCreateFileIO() throws IOException { doThrow(new ForbiddenException("Fake failure applying downscoped credentials")) .when(fileIoFactory) - .loadFileIO(any(), any()); + .loadFileIO(any(), any(), any(), any(), any(), any(), any()); Assertions.assertThatThrownBy(() -> catalog.sendNotification(table, request)) .isInstanceOf(ForbiddenException.class) .hasMessageContaining("Fake failure applying downscoped credentials"); @@ -849,7 +855,7 @@ public void testUpdateNotificationCreateTableWithLocalFilePrefix() { passthroughView, securityContext, taskExecutor, - new DefaultFileIOFactory()); + fileIOFactory); catalog.initialize( catalogWithoutStorage, ImmutableMap.of( @@ -914,7 +920,7 @@ public void testUpdateNotificationCreateTableWithHttpPrefix() { passthroughView, securityContext, taskExecutor, - new DefaultFileIOFactory()); + fileIOFactory); catalog.initialize( catalogName, ImmutableMap.of( @@ -1406,10 +1412,7 @@ public void testDropTableWithPurge() { .containsEntry(PolarisCredentialProperty.AWS_KEY_ID, TEST_ACCESS_KEY) .containsEntry(PolarisCredentialProperty.AWS_SECRET_KEY, SECRET_ACCESS_KEY) .containsEntry(PolarisCredentialProperty.AWS_TOKEN, SESSION_TOKEN); - FileIO fileIO = - new TaskFileIOSupplier( - createMockMetaStoreManagerFactory(), new DefaultFileIOFactory(), configurationStore) - .apply(taskEntity, realmId); + FileIO fileIO = new TaskFileIOSupplier(fileIOFactory).apply(taskEntity, realmId); Assertions.assertThat(fileIO).isNotNull().isInstanceOf(InMemoryFileIO.class); } @@ -1450,7 +1453,7 @@ public void testDropTableWithPurgeDisabled() { passthroughView, securityContext, Mockito.mock(), - new DefaultFileIOFactory()); + fileIOFactory); noPurgeCatalog.initialize( noPurgeCatalogName, ImmutableMap.of( @@ -1522,7 +1525,8 @@ public void testFileIOWrapper() { new PolarisPassthroughResolutionView( entityManager, metaStoreSession, securityContext, CATALOG_NAME); - TestFileIOFactory measured = new TestFileIOFactory(); + MeasuredFileIOFactory measured = + new MeasuredFileIOFactory(entityManagerFactory, managerFactory, configurationStore); BasePolarisCatalog catalog = new BasePolarisCatalog( realmId, @@ -1558,22 +1562,24 @@ public void testFileIOWrapper() { .isGreaterThan(0); Assertions.assertThat(catalog.dropTable(TABLE)).as("Table deletion should succeed").isTrue(); + TaskEntity taskEntity = + TaskEntity.of( + metaStoreManager + .loadTasks(metaStoreSession, "testExecutor", 1) + .getEntities() + .getFirst()); + Map properties = taskEntity.getInternalPropertiesAsMap(); + properties.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"); + taskEntity.setInternalPropertiesAsMap(properties); TableCleanupTaskHandler handler = new TableCleanupTaskHandler( Mockito.mock(), createMockMetaStoreManagerFactory(), configurationStore, diagServices, - (task, rc) -> - measured.loadFileIO("org.apache.iceberg.inmemory.InMemoryFileIO", Map.of()), + new TaskFileIOSupplier(measured), clock); - handler.handleTask( - TaskEntity.of( - metaStoreManager - .loadTasks(metaStoreSession, "testExecutor", 1) - .getEntities() - .getFirst()), - realmId); + handler.handleTask(taskEntity, realmId); Assertions.assertThat(measured.getNumDeletedFiles()).as("A table was deleted").isGreaterThan(0); } } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogViewTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogViewTest.java index b6bbefef0d..f5cdbcafe8 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogViewTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogViewTest.java @@ -56,7 +56,9 @@ import org.apache.polaris.core.persistence.PolarisMetaStoreSession; import org.apache.polaris.service.admin.PolarisAdminService; import org.apache.polaris.service.catalog.BasePolarisCatalog; +import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView; import org.apache.polaris.service.catalog.io.DefaultFileIOFactory; +import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.config.RealmEntityManagerFactory; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; import org.junit.jupiter.api.AfterEach; @@ -171,6 +173,8 @@ public void before(TestInfo testInfo) { PolarisPassthroughResolutionView passthroughView = new PolarisPassthroughResolutionView( entityManager, metaStoreSession, securityContext, CATALOG_NAME); + FileIOFactory fileIOFactory = + new DefaultFileIOFactory(entityManagerFactory, managerFactory, configurationStore); this.catalog = new BasePolarisCatalog( realmId, @@ -182,7 +186,7 @@ public void before(TestInfo testInfo) { passthroughView, securityContext, Mockito.mock(), - new DefaultFileIOFactory()); + fileIOFactory); this.catalog.initialize( CATALOG_NAME, ImmutableMap.of( diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java index 5f6b0dec0c..ef3b7fa5ca 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisCatalogHandlerWrapperAuthzTest.java @@ -58,7 +58,6 @@ import org.apache.polaris.core.entity.PrincipalEntity; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.service.catalog.PolarisCatalogHandlerWrapper; -import org.apache.polaris.service.catalog.io.DefaultFileIOFactory; import org.apache.polaris.service.quarkus.admin.PolarisAuthzTestBase; import org.apache.polaris.service.types.NotificationRequest; import org.apache.polaris.service.types.NotificationType; @@ -117,7 +116,7 @@ private PolarisCatalogHandlerWrapper newWrapper( catalogName, polarisAuthorizer, Mockito.mock(), - new DefaultFileIOFactory()); + fileIOFactory); } /** diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java index 13349c8259..e78d9a688b 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/FileIOExceptionsTest.java @@ -38,7 +38,8 @@ import org.apache.polaris.core.admin.model.FileStorageConfigInfo; import org.apache.polaris.core.admin.model.PolarisCatalog; import org.apache.polaris.core.admin.model.StorageConfigInfo; -import org.apache.polaris.service.quarkus.TestServices; +import org.apache.polaris.service.TestServices; +import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; @@ -53,13 +54,13 @@ public class FileIOExceptionsTest { private static final String catalog = "test-catalog"; private static final String catalogBaseLocation = "file:/tmp/buckets/my-bucket/path/to/data"; - private static TestFileIOFactory ioFactory; private static TestServices services; + private static MeasuredFileIOFactory ioFactory; @BeforeAll public static void beforeAll() { - ioFactory = new TestFileIOFactory(); - services = TestServices.inMemory(ioFactory); + services = TestServices.builder().build(); + ioFactory = (MeasuredFileIOFactory) services.fileIOFactory(); FileStorageConfigInfo storageConfigInfo = FileStorageConfigInfo.builder() diff --git a/service/common/build.gradle.kts b/service/common/build.gradle.kts index a44e8e13f6..1ecd4c400e 100644 --- a/service/common/build.gradle.kts +++ b/service/common/build.gradle.kts @@ -19,6 +19,7 @@ plugins { id("polaris-server") + id("java-test-fixtures") alias(libs.plugins.jandex) } @@ -91,6 +92,31 @@ dependencies { testImplementation(libs.assertj.core) testImplementation(libs.mockito.core) testRuntimeOnly("org.junit.platform:junit-platform-launcher") + + testFixturesImplementation(project(":polaris-core")) + testFixturesImplementation(project(":polaris-api-management-model")) + testFixturesImplementation(project(":polaris-api-management-service")) + testFixturesImplementation(project(":polaris-api-iceberg-service")) + + testFixturesImplementation(libs.jakarta.enterprise.cdi.api) + testFixturesImplementation(libs.jakarta.annotation.api) + testFixturesImplementation(libs.jakarta.ws.rs.api) + + testFixturesImplementation(platform(libs.quarkus.bom)) + testFixturesImplementation("io.quarkus:quarkus-rest-client") + testFixturesImplementation("io.quarkus:quarkus-rest-client-jackson") + + testFixturesImplementation(platform(libs.iceberg.bom)) + testFixturesImplementation("org.apache.iceberg:iceberg-api") + testFixturesImplementation("org.apache.iceberg:iceberg-core") + testFixturesImplementation("org.apache.iceberg:iceberg-aws") + + testFixturesImplementation(platform(libs.google.cloud.storage.bom)) + testFixturesImplementation("com.google.cloud:google-cloud-storage") + testFixturesImplementation(platform(libs.awssdk.bom)) + testFixturesImplementation("software.amazon.awssdk:sts") + testFixturesImplementation("software.amazon.awssdk:iam-policy-builder") + testFixturesImplementation("software.amazon.awssdk:s3") } tasks.named("javadoc") { dependsOn("jandex") } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java index 06bbc84d6c..91c18b4883 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java @@ -93,6 +93,7 @@ import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.PolarisMetaStoreSession; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.persistence.ResolvedPolarisEntity; import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest; import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView; import org.apache.polaris.core.persistence.resolver.ResolverPath; @@ -104,6 +105,7 @@ import org.apache.polaris.core.storage.PolarisStorageIntegration; import org.apache.polaris.core.storage.StorageLocation; import org.apache.polaris.service.catalog.io.FileIOFactory; +import org.apache.polaris.service.catalog.io.FileIOUtil; import org.apache.polaris.service.exception.IcebergExceptionMapper; import org.apache.polaris.service.task.TaskExecutor; import org.apache.polaris.service.types.NotificationRequest; @@ -324,7 +326,7 @@ public Table registerTable(TableIdentifier identifier, String metadataFileLocati String.format("Failed to fetch resolved parent for TableIdentifier '%s'", identifier)); } FileIO fileIO = - refreshIOWithCredentials( + loadFileIOForTableLike( identifier, Set.of(locationDir), resolvedParent, @@ -818,10 +820,15 @@ public Map getCredentialConfig( .log("Table entity has no storage configuration in its hierarchy"); return Map.of(); } - return refreshCredentials( + return FileIOUtil.refreshCredentials( + realmId, + entityManager, + getCredentialVendor(), + metaStoreSession, + configurationStore, tableIdentifier, - storageActions, getLocationsAllowedToBeAccessed(tableMetadata), + storageActions, storageInfo.get()); } @@ -857,62 +864,7 @@ public String transformTableLikeLocation(String specifiedTableLikeLocation) { ? resolvedEntityView.getResolvedPath(tableIdentifier.namespace()) : resolvedTableEntities; - return findStorageInfoFromHierarchy(resolvedStorageEntity); - } - - private Map refreshCredentials( - TableIdentifier tableIdentifier, - Set storageActions, - String tableLocation, - PolarisEntity entity) { - return refreshCredentials(tableIdentifier, storageActions, Set.of(tableLocation), entity); - } - - private Map refreshCredentials( - TableIdentifier tableIdentifier, - Set storageActions, - Set tableLocations, - PolarisEntity entity) { - Boolean skipCredentialSubscopingIndirection = - getBooleanContextConfiguration( - PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.key, - PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.defaultValue); - if (Boolean.TRUE.equals(skipCredentialSubscopingIndirection)) { - LOGGER - .atInfo() - .addKeyValue("tableIdentifier", tableIdentifier) - .log("Skipping generation of subscoped creds for table"); - return Map.of(); - } - - boolean allowList = - storageActions.contains(PolarisStorageActions.LIST) - || storageActions.contains(PolarisStorageActions.ALL); - Set writeLocations = - storageActions.contains(PolarisStorageActions.WRITE) - || storageActions.contains(PolarisStorageActions.DELETE) - || storageActions.contains(PolarisStorageActions.ALL) - ? tableLocations - : Set.of(); - Map credentialsMap = - entityManager - .getCredentialCache() - .getOrGenerateSubScopeCreds( - getCredentialVendor(), - metaStoreSession, - entity, - allowList, - tableLocations, - writeLocations); - LOGGER - .atDebug() - .addKeyValue("tableIdentifier", tableIdentifier) - .addKeyValue("credentialKeys", credentialsMap.keySet()) - .log("Loaded scoped credentials for table"); - if (credentialsMap.isEmpty()) { - LOGGER.debug("No credentials found for table"); - } - return credentialsMap; + return FileIOUtil.findStorageInfoFromHierarchy(resolvedStorageEntity); } /** @@ -1243,7 +1195,7 @@ public void doRefresh() { // then we should use the actual current table properties for IO refresh here // instead of the general tableDefaultProperties. FileIO fileIO = - refreshIOWithCredentials( + loadFileIOForTableLike( tableIdentifier, Set.of(latestLocationDir), resolvedEntities, @@ -1279,7 +1231,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { // refresh credentials because we need to read the metadata file to validate its location tableFileIO = - refreshIOWithCredentials( + loadFileIOForTableLike( tableIdentifier, getLocationsAllowedToBeAccessed(metadata), resolvedStorageEntity, @@ -1418,18 +1370,6 @@ private void validateMetadataFileInTableDir( } } - private static @Nonnull Optional findStorageInfoFromHierarchy( - PolarisResolvedPathWrapper resolvedStorageEntity) { - Optional storageInfoEntity = - resolvedStorageEntity.getRawFullPath().reversed().stream() - .filter( - e -> - e.getInternalPropertiesAsMap() - .containsKey(PolarisEntityConstants.getStorageConfigInfoPropertyName())) - .findFirst(); - return storageInfoEntity; - } - private class BasePolarisViewOperations extends BaseViewOperations { private final TableIdentifier identifier; private final String fullViewName; @@ -1475,7 +1415,7 @@ public void doRefresh() { // then we should use the actual current table properties for IO refresh here // instead of the general tableDefaultProperties. FileIO fileIO = - refreshIOWithCredentials( + loadFileIOForTableLike( identifier, Set.of(latestLocationDir), resolvedEntities, @@ -1529,7 +1469,7 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { Map tableProperties = new HashMap<>(metadata.properties()); viewFileIO = - refreshIOWithCredentials( + loadFileIOForTableLike( identifier, getLocationsAllowedToBeAccessed(metadata), resolvedStorageEntity, @@ -1586,27 +1526,22 @@ protected String viewName() { } } - private FileIO refreshIOWithCredentials( + private FileIO loadFileIOForTableLike( TableIdentifier identifier, Set readLocations, PolarisResolvedPathWrapper resolvedStorageEntity, Map tableProperties, Set storageActions) { - Optional storageInfoEntity = findStorageInfoFromHierarchy(resolvedStorageEntity); - Map credentialsMap = - storageInfoEntity - .map( - storageInfo -> - refreshCredentials(identifier, storageActions, readLocations, storageInfo)) - .orElse(Map.of()); - - // Update the FileIO before we write the new metadata file - // update with table properties in case there are table-level overrides - // the credentials should always override table-level properties, since - // storage configuration will be found at whatever entity defines it - tableProperties.putAll(credentialsMap); - FileIO fileIO = null; - fileIO = loadFileIO(ioImplClassName, tableProperties); + // Reload fileIO based on table specific context + FileIO fileIO = + fileIOFactory.loadFileIO( + realmId, + ioImplClassName, + tableProperties, + identifier, + readLocations, + storageActions, + resolvedStorageEntity); // ensure the new fileIO is closed when the catalog is closed closeableGroup.addCloseable(fileIO); return fileIO; @@ -1888,7 +1823,7 @@ private boolean sendNotificationForTableLike( .toArray(String[]::new)); resolvedStorageEntity = resolvedEntityView.getResolvedPath(nsLevel); if (resolvedStorageEntity != null) { - storageInfoEntity = findStorageInfoFromHierarchy(resolvedStorageEntity); + storageInfoEntity = FileIOUtil.findStorageInfoFromHierarchy(resolvedStorageEntity); break; } } @@ -1905,7 +1840,7 @@ private boolean sendNotificationForTableLike( // Validate that we can construct a FileIO String locationDir = metadataLocation.substring(0, metadataLocation.lastIndexOf("/")); - refreshIOWithCredentials( + loadFileIOForTableLike( tableIdentifier, Set.of(locationDir), resolvedStorageEntity, @@ -1960,7 +1895,7 @@ private boolean sendNotificationForTableLike( String locationDir = newLocation.substring(0, newLocation.lastIndexOf("/")); FileIO fileIO = - refreshIOWithCredentials( + loadFileIOForTableLike( tableIdentifier, Set.of(locationDir), resolvedParent, @@ -2039,8 +1974,16 @@ private List listTableLike(PolarisEntitySubType subType, Namesp * @return FileIO object */ private FileIO loadFileIO(String ioImpl, Map properties) { - Map propertiesWithS3CustomizedClientFactory = new HashMap<>(properties); - return fileIOFactory.loadFileIO(ioImpl, propertiesWithS3CustomizedClientFactory); + TableLikeEntity tableLikeEntity = TableLikeEntity.of(catalogEntity); + TableIdentifier identifier = tableLikeEntity.getTableIdentifier(); + Set locations = Set.of(catalogEntity.getDefaultBaseLocation()); + ResolvedPolarisEntity resolvedCatalogEntity = + new ResolvedPolarisEntity(catalogEntity, List.of(), List.of()); + PolarisResolvedPathWrapper resolvedPath = + new PolarisResolvedPathWrapper(List.of(resolvedCatalogEntity)); + Set storageActions = Set.of(PolarisStorageActions.ALL); + return fileIOFactory.loadFileIO( + realmId, ioImpl, properties, identifier, locations, storageActions, resolvedPath); } private void blockedUserSpecifiedWriteLocation(Map properties) { diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java b/service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java index b962ec79d9..a92d4c358f 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java @@ -18,19 +18,104 @@ */ package org.apache.polaris.service.catalog.io; +import com.google.common.annotations.VisibleForTesting; import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.Nonnull; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.util.HashMap; import java.util.Map; +import java.util.Optional; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; +import org.apache.polaris.core.PolarisConfigurationStore; +import org.apache.polaris.core.context.RealmId; +import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.core.persistence.PolarisEntityManager; +import org.apache.polaris.core.persistence.PolarisMetaStoreSession; +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.storage.PolarisCredentialVendor; +import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.service.config.RealmEntityManagerFactory; -/** A simple FileIOFactory implementation that defers all the work to the Iceberg SDK */ +/** + * A default FileIO factory implementation for creating Iceberg {@link FileIO} instances with + * contextual table-level properties. + * + *

This class acts as a translation layer between Polaris properties and the properties required + * by Iceberg's {@link FileIO}. For example, it evaluates storage actions and retrieves subscoped + * credentials to initialize a {@link FileIO} instance with the most limited permissions necessary. + */ @ApplicationScoped @Identifier("default") public class DefaultFileIOFactory implements FileIOFactory { + + private final RealmEntityManagerFactory realmEntityManagerFactory; + private final MetaStoreManagerFactory metaStoreManagerFactory; + private final PolarisConfigurationStore configurationStore; + + @Inject + public DefaultFileIOFactory( + RealmEntityManagerFactory realmEntityManagerFactory, + MetaStoreManagerFactory metaStoreManagerFactory, + PolarisConfigurationStore configurationStore) { + this.realmEntityManagerFactory = realmEntityManagerFactory; + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.configurationStore = configurationStore; + } + @Override - public FileIO loadFileIO(String impl, Map properties) { - return CatalogUtil.loadFileIO(impl, properties, new Configuration()); + public FileIO loadFileIO( + @Nonnull RealmId realmId, + @Nonnull String ioImplClassName, + @Nonnull Map properties, + @Nonnull TableIdentifier identifier, + @Nonnull Set tableLocations, + @Nonnull Set storageActions, + @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) { + PolarisEntityManager entityManager = + realmEntityManagerFactory.getOrCreateEntityManager(realmId); + PolarisCredentialVendor credentialVendor = + metaStoreManagerFactory.getOrCreateMetaStoreManager(realmId); + PolarisMetaStoreSession metaStoreSession = + metaStoreManagerFactory.getOrCreateSessionSupplier(realmId).get(); + + // Get subcoped creds + properties = new HashMap<>(properties); + Optional storageInfoEntity = + FileIOUtil.findStorageInfoFromHierarchy(resolvedEntityPath); + Map credentialsMap = + storageInfoEntity + .map( + storageInfo -> + FileIOUtil.refreshCredentials( + realmId, + entityManager, + credentialVendor, + metaStoreSession, + configurationStore, + identifier, + tableLocations, + storageActions, + storageInfo)) + .orElse(Map.of()); + + // Update the FileIO with the subscoped credentials + // Update with properties in case there are table-level overrides the credentials should + // always override table-level properties, since storage configuration will be found at + // whatever entity defines it + properties.putAll(credentialsMap); + + return loadFileIOInternal(ioImplClassName, properties); + } + + @VisibleForTesting + FileIO loadFileIOInternal( + @Nonnull String ioImplClassName, @Nonnull Map properties) { + return CatalogUtil.loadFileIO(ioImplClassName, properties, new Configuration()); } } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java b/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java index ca3c085117..905441d791 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java @@ -18,10 +18,44 @@ */ package org.apache.polaris.service.catalog.io; +import jakarta.annotation.Nonnull; +import jakarta.enterprise.context.ApplicationScoped; import java.util.Map; +import java.util.Set; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; +import org.apache.polaris.core.context.RealmId; +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.storage.PolarisStorageActions; -/** Interface for providing a way to construct FileIO objects, such as for reading/writing S3. */ +/** + * Interface for providing a way to construct FileIO objects, such as for reading/writing S3. + * + *

Implementations are available via CDI as {@link ApplicationScoped @ApplicationScoped} beans. + */ public interface FileIOFactory { - FileIO loadFileIO(String impl, Map properties); + + /** + * Loads a FileIO implementation for a specific table in the given realm with detailed config. + * + *

This method may obtain subscoped credentials to restrict the FileIO's permissions, ensuring + * secure and limited access to the table's data and locations. + * + * @param realmId the realm for which the FileIO is being loaded. + * @param ioImplClassName the class name of the FileIO implementation to load. + * @param properties configuration properties for the FileIO. + * @param identifier the table identifier. + * @param tableLocations locations associated with the table. + * @param storageActions storage actions allowed for the table. + * @param resolvedEntityPath resolved paths for the entities. + * @return a configured FileIO instance. + */ + FileIO loadFileIO( + @Nonnull RealmId realmId, + @Nonnull String ioImplClassName, + @Nonnull Map properties, + @Nonnull TableIdentifier identifier, + @Nonnull Set tableLocations, + @Nonnull Set storageActions, + @Nonnull PolarisResolvedPathWrapper resolvedEntityPath); } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java b/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java new file mode 100644 index 0000000000..9775ebd848 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.catalog.io; + +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.polaris.core.PolarisConfiguration; +import org.apache.polaris.core.PolarisConfigurationStore; +import org.apache.polaris.core.context.RealmId; +import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.entity.PolarisEntityConstants; +import org.apache.polaris.core.persistence.PolarisEntityManager; +import org.apache.polaris.core.persistence.PolarisMetaStoreSession; +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.storage.PolarisCredentialVendor; +import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.service.catalog.BasePolarisCatalog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FileIOUtil { + private static final Logger LOGGER = LoggerFactory.getLogger(FileIOUtil.class); + + private FileIOUtil() {} + + /** + * Finds storage configuration information in the hierarchy of the resolved storage entity. + * + *

This method starts at the "leaf" level (e.g., table) and walks "upwards" through namespaces + * in the hierarchy to the "root." It searches for the first entity containing storage config + * properties, identified using a key from {@link + * PolarisEntityConstants#getStorageConfigInfoPropertyName()}. + * + * @param resolvedStorageEntity the resolved entity wrapper containing the hierarchical path + * @return an {@link Optional} containing the entity with storage config, or empty if not found + */ + public static Optional findStorageInfoFromHierarchy( + PolarisResolvedPathWrapper resolvedStorageEntity) { + Optional storageInfoEntity = + resolvedStorageEntity.getRawFullPath().reversed().stream() + .filter( + e -> + e.getInternalPropertiesAsMap() + .containsKey(PolarisEntityConstants.getStorageConfigInfoPropertyName())) + .findFirst(); + return storageInfoEntity; + } + + /** + * Refreshes or generates subscoped creds for accessing table storage based on the params. + * + *

Use cases: + * + *

    + *
  • In {@link BasePolarisCatalog}, subscoped credentials are generated or refreshed when the + * client sends a loadTable request to vend credentials. + *
  • In {@link DefaultFileIOFactory}, subscoped credentials are obtained to access the storage + * and read/write metadata JSON files. + *
+ */ + public static Map refreshCredentials( + RealmId realmId, + PolarisEntityManager entityManager, + PolarisCredentialVendor credentialVendor, + PolarisMetaStoreSession metaStoreSession, + PolarisConfigurationStore configurationStore, + TableIdentifier tableIdentifier, + Set tableLocations, + Set storageActions, + PolarisEntity entity) { + boolean skipCredentialSubscopingIndirection = + configurationStore.getConfiguration( + realmId, + PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.key, + PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.defaultValue); + if (skipCredentialSubscopingIndirection) { + LOGGER + .atDebug() + .addKeyValue("tableIdentifier", tableIdentifier) + .log("Skipping generation of subscoped creds for table"); + return Map.of(); + } + + boolean allowList = + storageActions.contains(PolarisStorageActions.LIST) + || storageActions.contains(PolarisStorageActions.ALL); + Set writeLocations = + storageActions.contains(PolarisStorageActions.WRITE) + || storageActions.contains(PolarisStorageActions.DELETE) + || storageActions.contains(PolarisStorageActions.ALL) + ? tableLocations + : Set.of(); + Map credentialsMap = + entityManager + .getCredentialCache() + .getOrGenerateSubScopeCreds( + credentialVendor, + metaStoreSession, + entity, + allowList, + tableLocations, + writeLocations); + LOGGER + .atDebug() + .addKeyValue("tableIdentifier", tableIdentifier) + .addKeyValue("credentialKeys", credentialsMap.keySet()) + .log("Loaded scoped credentials for table"); + if (credentialsMap.isEmpty()) { + LOGGER.debug("No credentials found for table"); + } + return credentialsMap; + } +} diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java b/service/common/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java index 469587a396..fe2cd0a752 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java @@ -19,19 +19,54 @@ package org.apache.polaris.service.catalog.io; import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.Nonnull; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; import java.util.Map; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogUtil; +import java.util.Set; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; +import org.apache.polaris.core.PolarisConfigurationStore; +import org.apache.polaris.core.context.RealmId; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.service.config.RealmEntityManagerFactory; /** A {@link FileIOFactory} that translates WASB paths to ABFS ones */ @ApplicationScoped @Identifier("wasb") public class WasbTranslatingFileIOFactory implements FileIOFactory { + + private final FileIOFactory defaultFileIOFactory; + + @Inject + public WasbTranslatingFileIOFactory( + RealmEntityManagerFactory realmEntityManagerFactory, + MetaStoreManagerFactory metaStoreManagerFactory, + PolarisConfigurationStore configurationStore) { + defaultFileIOFactory = + new DefaultFileIOFactory( + realmEntityManagerFactory, metaStoreManagerFactory, configurationStore); + } + @Override - public FileIO loadFileIO(String ioImpl, Map properties) { + public FileIO loadFileIO( + @Nonnull RealmId realmId, + @Nonnull String ioImplClassName, + @Nonnull Map properties, + @Nonnull TableIdentifier identifier, + @Nonnull Set tableLocations, + @Nonnull Set storageActions, + @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) { return new WasbTranslatingFileIO( - CatalogUtil.loadFileIO(ioImpl, properties, new Configuration())); + defaultFileIOFactory.loadFileIO( + realmId, + ioImplClassName, + properties, + identifier, + tableLocations, + storageActions, + resolvedEntityPath)); } } diff --git a/service/common/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java b/service/common/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java index 71d2660eb0..7fa5802e44 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java @@ -21,68 +21,51 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.BiFunction; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; -import org.apache.polaris.core.PolarisConfiguration; -import org.apache.polaris.core.PolarisConfigurationStore; import org.apache.polaris.core.context.RealmId; import org.apache.polaris.core.entity.PolarisTaskConstants; +import org.apache.polaris.core.entity.TableLikeEntity; import org.apache.polaris.core.entity.TaskEntity; -import org.apache.polaris.core.persistence.MetaStoreManagerFactory; -import org.apache.polaris.core.persistence.PolarisMetaStoreManager; -import org.apache.polaris.core.persistence.PolarisMetaStoreSession; +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.persistence.ResolvedPolarisEntity; +import org.apache.polaris.core.storage.PolarisStorageActions; import org.apache.polaris.service.catalog.io.FileIOFactory; @ApplicationScoped public class TaskFileIOSupplier implements BiFunction { - private final MetaStoreManagerFactory metaStoreManagerFactory; private final FileIOFactory fileIOFactory; - private final PolarisConfigurationStore configurationStore; @Inject - public TaskFileIOSupplier( - MetaStoreManagerFactory metaStoreManagerFactory, - FileIOFactory fileIOFactory, - PolarisConfigurationStore configurationStore) { - this.metaStoreManagerFactory = metaStoreManagerFactory; + public TaskFileIOSupplier(FileIOFactory fileIOFactory) { this.fileIOFactory = fileIOFactory; - this.configurationStore = configurationStore; } @Override public FileIO apply(TaskEntity task, RealmId realmId) { Map internalProperties = task.getInternalPropertiesAsMap(); - String location = internalProperties.get(PolarisTaskConstants.STORAGE_LOCATION); - PolarisMetaStoreManager metaStoreManager = - metaStoreManagerFactory.getOrCreateMetaStoreManager(realmId); - PolarisMetaStoreSession metaStoreSession = - metaStoreManagerFactory.getOrCreateSessionSupplier(realmId).get(); Map properties = new HashMap<>(internalProperties); - Boolean skipCredentialSubscopingIndirection = - configurationStore.getConfiguration( - realmId, - PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.key, - PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.defaultValue); + TableLikeEntity tableEntity = TableLikeEntity.of(task); + TableIdentifier identifier = tableEntity.getTableIdentifier(); + String location = properties.get(PolarisTaskConstants.STORAGE_LOCATION); + Set locations = Set.of(location); + Set storageActions = Set.of(PolarisStorageActions.ALL); + ResolvedPolarisEntity resolvedTaskEntity = + new ResolvedPolarisEntity(task, List.of(), List.of()); + PolarisResolvedPathWrapper resolvedPath = + new PolarisResolvedPathWrapper(List.of(resolvedTaskEntity)); - if (!skipCredentialSubscopingIndirection) { - properties.putAll( - metaStoreManagerFactory - .getOrCreateStorageCredentialCache(realmId) - .getOrGenerateSubScopeCreds( - metaStoreManager, - metaStoreSession, - task, - true, - Set.of(location), - Set.of(location))); - } String ioImpl = properties.getOrDefault( CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.io.ResolvingFileIO"); - return fileIOFactory.loadFileIO(ioImpl, properties); + + return fileIOFactory.loadFileIO( + realmId, ioImpl, properties, identifier, locations, storageActions, resolvedPath); } } diff --git a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java new file mode 100644 index 0000000000..6455c16824 --- /dev/null +++ b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.catalog.io; + +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import jakarta.annotation.Nonnull; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.aws.s3.S3FileIOProperties; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.inmemory.InMemoryFileIO; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.types.Types; +import org.apache.polaris.core.admin.model.AwsStorageConfigInfo; +import org.apache.polaris.core.admin.model.Catalog; +import org.apache.polaris.core.admin.model.CatalogProperties; +import org.apache.polaris.core.admin.model.CreateCatalogRequest; +import org.apache.polaris.core.admin.model.PolarisCatalog; +import org.apache.polaris.core.admin.model.StorageConfigInfo; +import org.apache.polaris.core.context.RealmId; +import org.apache.polaris.core.entity.*; +import org.apache.polaris.core.persistence.*; +import org.apache.polaris.service.TestServices; +import org.apache.polaris.service.catalog.BasePolarisCatalog; +import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView; +import org.apache.polaris.service.task.TaskFileIOSupplier; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.mockito.Mockito; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; +import software.amazon.awssdk.services.sts.model.AssumeRoleResponse; +import software.amazon.awssdk.services.sts.model.Credentials; + +public class FileIOFactoryTest { + + public static final String CATALOG_NAME = "polaris-catalog"; + public static final Namespace NS = Namespace.of("newdb"); + public static final TableIdentifier TABLE = TableIdentifier.of(NS, "table"); + public static final Schema SCHEMA = + new Schema( + required(3, "id", Types.IntegerType.get(), "unique ID 🤪"), + required(4, "data", Types.StringType.get())); + public static final String TEST_ACCESS_KEY = "test_access_key"; + public static final String SECRET_ACCESS_KEY = "secret_access_key"; + public static final String SESSION_TOKEN = "session_token"; + + private RealmId realmId; + private StsClient stsClient; + private TestServices testServices; + + @BeforeEach + public void before(TestInfo testInfo) { + String realmName = + "realm_%s_%s" + .formatted( + testInfo.getTestMethod().map(Method::getName).orElse("test"), System.nanoTime()); + realmId = RealmId.newRealmId(realmName); + + // Mock get subscoped creds + stsClient = Mockito.mock(StsClient.class); + when(stsClient.assumeRole(isA(AssumeRoleRequest.class))) + .thenReturn( + AssumeRoleResponse.builder() + .credentials( + Credentials.builder() + .accessKeyId(TEST_ACCESS_KEY) + .secretAccessKey(SECRET_ACCESS_KEY) + .sessionToken(SESSION_TOKEN) + .build()) + .build()); + + // Spy FileIOFactory and check if the credentials are passed to the FileIO + TestServices.FileIOFactorySupplier fileIOFactorySupplier = + (entityManagerFactory, metaStoreManagerFactory, configurationStore) -> + Mockito.spy( + new DefaultFileIOFactory( + entityManagerFactory, metaStoreManagerFactory, configurationStore) { + @Override + FileIO loadFileIOInternal( + @Nonnull String ioImplClassName, @Nonnull Map properties) { + // properties should contain credentials + Assertions.assertThat(properties) + .containsEntry(S3FileIOProperties.ACCESS_KEY_ID, TEST_ACCESS_KEY) + .containsEntry(S3FileIOProperties.SECRET_ACCESS_KEY, SECRET_ACCESS_KEY) + .containsEntry(S3FileIOProperties.SESSION_TOKEN, SESSION_TOKEN); + return super.loadFileIOInternal(ioImplClassName, properties); + } + }); + + testServices = + TestServices.builder() + .config(Map.of("ALLOW_SPECIFYING_FILE_IO_IMPL", true)) + .realmId(realmId) + .stsClient(stsClient) + .fileIOFactorySupplier(fileIOFactorySupplier) + .build(); + } + + @AfterEach + public void after() {} + + @Test + public void testLoadFileIOForTableLike() { + BasePolarisCatalog catalog = createCatalog(testServices); + catalog.createNamespace(NS); + catalog.createTable(TABLE, SCHEMA); + + // 1. BasePolarisCatalog:doCommit: for writing the table during the creation + Mockito.verify(testServices.fileIOFactory(), Mockito.times(1)) + .loadFileIO( + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any()); + } + + @Test + public void testLoadFileIOForCleanupTask() { + BasePolarisCatalog catalog = createCatalog(testServices); + catalog.createNamespace(NS); + catalog.createTable(TABLE, SCHEMA); + catalog.dropTable(TABLE, true); + + List tasks = + testServices + .metaStoreManagerFactory() + .getOrCreateMetaStoreManager(realmId) + .loadTasks( + testServices.metaStoreManagerFactory().getOrCreateSessionSupplier(realmId).get(), + "testExecutor", + 1) + .getEntities(); + Assertions.assertThat(tasks).hasSize(1); + TaskEntity taskEntity = TaskEntity.of(tasks.get(0)); + FileIO fileIO = new TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity, realmId); + Assertions.assertThat(fileIO).isNotNull().isInstanceOf(InMemoryFileIO.class); + + // 1. BasePolarisCatalog:doCommit: for writing the table during the creation + // 2. BasePolarisCatalog:doRefresh: for reading the table during the drop + // 3. TaskFileIOSupplier:apply: for clean up metadata files and merge files + Mockito.verify(testServices.fileIOFactory(), Mockito.times(3)) + .loadFileIO( + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any()); + } + + BasePolarisCatalog createCatalog(TestServices services) { + String storageLocation = "s3://my-bucket/path/to/data"; + AwsStorageConfigInfo awsStorageConfigInfo = + AwsStorageConfigInfo.builder() + .setStorageType(StorageConfigInfo.StorageTypeEnum.S3) + .setAllowedLocations(List.of(storageLocation)) + .setRoleArn("arn:aws:iam::012345678901:role/jdoe") + .build(); + + // Create Catalog Entity + Catalog catalog = + PolarisCatalog.builder() + .setType(Catalog.TypeEnum.INTERNAL) + .setName(CATALOG_NAME) + .setProperties(new CatalogProperties("s3://tmp/path/to/data")) + .setStorageConfigInfo(awsStorageConfigInfo) + .build(); + services + .catalogsApi() + .createCatalog( + new CreateCatalogRequest(catalog), services.realmId(), services.securityContext()); + + PolarisPassthroughResolutionView passthroughView = + new PolarisPassthroughResolutionView( + services.entityManagerFactory().getOrCreateEntityManager(realmId), + services.metaStoreManagerFactory().getOrCreateSessionSupplier(realmId).get(), + services.securityContext(), + CATALOG_NAME); + BasePolarisCatalog polarisCatalog = + new BasePolarisCatalog( + services.realmId(), + services.entityManagerFactory().getOrCreateEntityManager(realmId), + services.metaStoreManagerFactory().getOrCreateMetaStoreManager(realmId), + services.metaStoreManagerFactory().getOrCreateSessionSupplier(realmId).get(), + services.configurationStore(), + services.polarisDiagnostics(), + passthroughView, + services.securityContext(), + services.taskExecutor(), + services.fileIOFactory()); + polarisCatalog.initialize( + CATALOG_NAME, + ImmutableMap.of( + org.apache.iceberg.CatalogProperties.FILE_IO_IMPL, + "org.apache.iceberg.inmemory.InMemoryFileIO")); + return polarisCatalog; + } +} diff --git a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java new file mode 100644 index 0000000000..c09efeded6 --- /dev/null +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service; + +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.GoogleCredentials; +import jakarta.ws.rs.core.SecurityContext; +import java.security.Principal; +import java.time.Clock; +import java.time.Instant; +import java.util.Date; +import java.util.Map; +import java.util.Set; +import org.apache.polaris.core.PolarisConfigurationStore; +import org.apache.polaris.core.PolarisDiagnostics; +import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; +import org.apache.polaris.core.auth.PolarisAuthorizer; +import org.apache.polaris.core.context.RealmId; +import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.entity.PrincipalEntity; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.core.persistence.PolarisEntityManager; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.apache.polaris.core.persistence.PolarisMetaStoreSession; +import org.apache.polaris.service.admin.PolarisServiceImpl; +import org.apache.polaris.service.admin.api.PolarisCatalogsApi; +import org.apache.polaris.service.catalog.IcebergCatalogAdapter; +import org.apache.polaris.service.catalog.api.IcebergRestCatalogApi; +import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService; +import org.apache.polaris.service.catalog.io.FileIOFactory; +import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory; +import org.apache.polaris.service.config.DefaultConfigurationStore; +import org.apache.polaris.service.config.RealmEntityManagerFactory; +import org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory; +import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; +import org.apache.polaris.service.task.TaskExecutor; +import org.assertj.core.util.TriFunction; +import org.mockito.Mockito; +import software.amazon.awssdk.services.sts.StsClient; + +public record TestServices( + PolarisCatalogsApi catalogsApi, + IcebergRestCatalogApi restApi, + PolarisConfigurationStore configurationStore, + PolarisDiagnostics polarisDiagnostics, + RealmEntityManagerFactory entityManagerFactory, + MetaStoreManagerFactory metaStoreManagerFactory, + RealmId realmId, + SecurityContext securityContext, + FileIOFactory fileIOFactory, + TaskExecutor taskExecutor) { + + private static final RealmId TEST_REALM = + org.apache.polaris.core.context.RealmId.newRealmId("test-realm"); + private static final String GCP_ACCESS_TOKEN = "abc"; + + @FunctionalInterface + public interface FileIOFactorySupplier + extends TriFunction< + RealmEntityManagerFactory, + MetaStoreManagerFactory, + PolarisConfigurationStore, + FileIOFactory> {} + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private RealmId realmId = TEST_REALM; + private Map config = Map.of(); + private StsClient stsClient = Mockito.mock(StsClient.class); + private FileIOFactorySupplier fileIOFactorySupplier = MeasuredFileIOFactory::new; + + private Builder() {} + + public Builder realmId(RealmId realmId) { + this.realmId = realmId; + return this; + } + + public Builder config(Map config) { + this.config = config; + return this; + } + + public Builder fileIOFactorySupplier(FileIOFactorySupplier fileIOFactorySupplier) { + this.fileIOFactorySupplier = fileIOFactorySupplier; + return this; + } + + public Builder stsClient(StsClient stsClient) { + this.stsClient = stsClient; + return this; + } + + public TestServices build() { + DefaultConfigurationStore configurationStore = new DefaultConfigurationStore(config); + PolarisDiagnostics polarisDiagnostics = Mockito.mock(PolarisDiagnostics.class); + PolarisAuthorizer authorizer = Mockito.mock(PolarisAuthorizer.class); + + // Application level + PolarisStorageIntegrationProviderImpl storageIntegrationProvider = + new PolarisStorageIntegrationProviderImpl( + () -> stsClient, + () -> GoogleCredentials.create(new AccessToken(GCP_ACCESS_TOKEN, new Date())), + configurationStore); + InMemoryPolarisMetaStoreManagerFactory metaStoreManagerFactory = + new InMemoryPolarisMetaStoreManagerFactory( + storageIntegrationProvider, + configurationStore, + polarisDiagnostics, + Clock.systemDefaultZone()); + RealmEntityManagerFactory realmEntityManagerFactory = + new RealmEntityManagerFactory(metaStoreManagerFactory, polarisDiagnostics) {}; + + PolarisEntityManager entityManager = + realmEntityManagerFactory.getOrCreateEntityManager(realmId); + PolarisMetaStoreManager metaStoreManager = + metaStoreManagerFactory.getOrCreateMetaStoreManager(realmId); + PolarisMetaStoreSession metaStoreSession = + metaStoreManagerFactory.getOrCreateSessionSupplier(realmId).get(); + + FileIOFactory fileIOFactory = + fileIOFactorySupplier.apply( + realmEntityManagerFactory, metaStoreManagerFactory, configurationStore); + + TaskExecutor taskExecutor = Mockito.mock(TaskExecutor.class); + IcebergRestCatalogApiService service = + new IcebergCatalogAdapter( + realmId, + entityManager, + metaStoreManager, + metaStoreSession, + configurationStore, + polarisDiagnostics, + authorizer, + taskExecutor, + fileIOFactory); + + IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(service); + + PolarisMetaStoreManager.CreatePrincipalResult createdPrincipal = + metaStoreManager.createPrincipal( + metaStoreSession, + new PrincipalEntity.Builder() + .setName("test-principal") + .setCreateTimestamp(Instant.now().toEpochMilli()) + .setCredentialRotationRequiredState() + .build()); + AuthenticatedPolarisPrincipal principal = + new AuthenticatedPolarisPrincipal( + PolarisEntity.of(createdPrincipal.getPrincipal()), Set.of()); + + SecurityContext securityContext = + new SecurityContext() { + @Override + public Principal getUserPrincipal() { + return principal; + } + + @Override + public boolean isUserInRole(String s) { + return false; + } + + @Override + public boolean isSecure() { + return true; + } + + @Override + public String getAuthenticationScheme() { + return ""; + } + }; + + PolarisCatalogsApi catalogsApi = + new PolarisCatalogsApi( + new PolarisServiceImpl( + entityManager, + metaStoreManager, + metaStoreSession, + configurationStore, + authorizer, + polarisDiagnostics)); + + return new TestServices( + catalogsApi, + restApi, + configurationStore, + polarisDiagnostics, + realmEntityManagerFactory, + metaStoreManagerFactory, + realmId, + securityContext, + fileIOFactory, + taskExecutor); + } + } +} diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisPassthroughResolutionView.java b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/PolarisPassthroughResolutionView.java similarity index 99% rename from quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisPassthroughResolutionView.java rename to service/common/src/testFixtures/java/org/apache/polaris/service/catalog/PolarisPassthroughResolutionView.java index db74ad3b49..54197d7ec7 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolarisPassthroughResolutionView.java +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/PolarisPassthroughResolutionView.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.polaris.service.quarkus.catalog; +package org.apache.polaris.service.catalog; import jakarta.ws.rs.core.SecurityContext; import java.util.Arrays; diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIO.java b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIO.java similarity index 93% rename from quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIO.java rename to service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIO.java index 8acf2efd8e..0313f86fe3 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIO.java +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIO.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.polaris.service.quarkus.catalog.io; +package org.apache.polaris.service.catalog.io; import java.util.Map; import java.util.Optional; @@ -32,7 +32,7 @@ * File IO wrapper used for tests. It measures the number of bytes read, files written, and files * deleted. It can inject exceptions during InputFile and OutputFile creation. */ -public class TestFileIO implements FileIO { +public class MeasuredFileIO implements FileIO { private final FileIO io; // When present, the following will be used to throw exceptions at various parts of the IO @@ -44,7 +44,7 @@ public class TestFileIO implements FileIO { private int numOutputFiles; private int numDeletedFiles; - public TestFileIO( + public MeasuredFileIO( FileIO io, Optional> newInputFileExceptionSupplier, Optional> newOutputFileExceptionSupplier, @@ -73,9 +73,9 @@ private InputFile wrapInputFile(InputFile inner) { throw s.get(); }); - // Use the inner's length in case the TestInputFile throws a getLength exception + // Use the inner's length in case the MeasuredInputFile throws a getLength exception inputBytes += inner.getLength(); - return new TestInputFile(inner, getLengthExceptionSupplier); + return new MeasuredInputFile(inner, getLengthExceptionSupplier); } @Override diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIOFactory.java b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java similarity index 55% rename from quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIOFactory.java rename to service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java index 7504755995..5826c1daca 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestFileIOFactory.java +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java @@ -16,26 +16,33 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.polaris.service.quarkus.catalog.io; +package org.apache.polaris.service.catalog.io; +import jakarta.annotation.Nonnull; import jakarta.enterprise.inject.Vetoed; +import jakarta.inject.Inject; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Supplier; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; -import org.apache.polaris.service.catalog.io.DefaultFileIOFactory; +import org.apache.polaris.core.PolarisConfigurationStore; +import org.apache.polaris.core.context.RealmId; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.service.config.RealmEntityManagerFactory; /** * A FileIOFactory that measures the number of bytes read, files written, and files deleted. It can * inject exceptions at various parts of the IO construction. */ @Vetoed -public class TestFileIOFactory extends DefaultFileIOFactory { - private final List ios = new ArrayList<>(); +public class MeasuredFileIOFactory implements FileIOFactory { + private final List ios = new ArrayList<>(); // When present, the following will be used to throw exceptions at various parts of the IO public Optional> loadFileIOExceptionSupplier = Optional.empty(); @@ -43,18 +50,42 @@ public class TestFileIOFactory extends DefaultFileIOFactory { public Optional> newOutputFileExceptionSupplier = Optional.empty(); public Optional> getLengthExceptionSupplier = Optional.empty(); - public TestFileIOFactory() {} + private final FileIOFactory defaultFileIOFactory; + + @Inject + public MeasuredFileIOFactory( + RealmEntityManagerFactory realmEntityManagerFactory, + MetaStoreManagerFactory metaStoreManagerFactory, + PolarisConfigurationStore configurationStore) { + defaultFileIOFactory = + new DefaultFileIOFactory( + realmEntityManagerFactory, metaStoreManagerFactory, configurationStore); + } @Override - public FileIO loadFileIO(String ioImpl, Map properties) { + public FileIO loadFileIO( + @Nonnull RealmId realmId, + @Nonnull String ioImplClassName, + @Nonnull Map properties, + @Nonnull TableIdentifier identifier, + @Nonnull Set tableLocations, + @Nonnull Set storageActions, + @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) { loadFileIOExceptionSupplier.ifPresent( s -> { throw s.get(); }); - TestFileIO wrapped = - new TestFileIO( - CatalogUtil.loadFileIO(ioImpl, properties, new Configuration()), + MeasuredFileIO wrapped = + new MeasuredFileIO( + defaultFileIOFactory.loadFileIO( + realmId, + ioImplClassName, + properties, + identifier, + tableLocations, + storageActions, + resolvedEntityPath), newInputFileExceptionSupplier, newOutputFileExceptionSupplier, getLengthExceptionSupplier); @@ -64,7 +95,7 @@ public FileIO loadFileIO(String ioImpl, Map properties) { public long getInputBytes() { long sum = 0; - for (TestFileIO io : ios) { + for (MeasuredFileIO io : ios) { sum += io.getInputBytes(); } return sum; @@ -72,7 +103,7 @@ public long getInputBytes() { public long getNumOutputFiles() { long sum = 0; - for (TestFileIO io : ios) { + for (MeasuredFileIO io : ios) { sum += io.getNumOuptutFiles(); } return sum; @@ -80,7 +111,7 @@ public long getNumOutputFiles() { public long getNumDeletedFiles() { long sum = 0; - for (TestFileIO io : ios) { + for (MeasuredFileIO io : ios) { sum += io.getNumDeletedFiles(); } return sum; diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestInputFile.java b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredInputFile.java similarity index 93% rename from quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestInputFile.java rename to service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredInputFile.java index da050c1198..a4b2f2361c 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/io/TestInputFile.java +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredInputFile.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.polaris.service.quarkus.catalog.io; +package org.apache.polaris.service.catalog.io; import java.util.Optional; import java.util.function.Supplier; @@ -24,11 +24,11 @@ import org.apache.iceberg.io.SeekableInputStream; /** An InputFile wrapper that can be forced to throw exceptions. */ -public class TestInputFile implements InputFile { +public class MeasuredInputFile implements InputFile { private final InputFile inputFile; private final Optional> getLengthExceptionSupplier; - public TestInputFile( + public MeasuredInputFile( InputFile inputFile, Optional> getLengthExceptionSupplier) { this.inputFile = inputFile; this.getLengthExceptionSupplier = getLengthExceptionSupplier;