|
21 | 21 | import jakarta.enterprise.context.ApplicationScoped; |
22 | 22 | import jakarta.inject.Inject; |
23 | 23 | import java.util.HashMap; |
| 24 | +import java.util.List; |
24 | 25 | import java.util.Map; |
25 | 26 | import java.util.Set; |
26 | 27 | import java.util.function.BiFunction; |
27 | 28 | import org.apache.iceberg.CatalogProperties; |
| 29 | +import org.apache.iceberg.catalog.TableIdentifier; |
28 | 30 | import org.apache.iceberg.io.FileIO; |
29 | | -import org.apache.polaris.core.PolarisConfiguration; |
30 | | -import org.apache.polaris.core.PolarisConfigurationStore; |
31 | 31 | import org.apache.polaris.core.context.RealmId; |
32 | 32 | import org.apache.polaris.core.entity.PolarisTaskConstants; |
| 33 | +import org.apache.polaris.core.entity.TableLikeEntity; |
33 | 34 | import org.apache.polaris.core.entity.TaskEntity; |
34 | | -import org.apache.polaris.core.persistence.MetaStoreManagerFactory; |
35 | | -import org.apache.polaris.core.persistence.PolarisMetaStoreManager; |
36 | | -import org.apache.polaris.core.persistence.PolarisMetaStoreSession; |
| 35 | +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; |
| 36 | +import org.apache.polaris.core.persistence.ResolvedPolarisEntity; |
| 37 | +import org.apache.polaris.core.storage.PolarisStorageActions; |
37 | 38 | import org.apache.polaris.service.catalog.io.FileIOFactory; |
38 | 39 |
|
39 | 40 | @ApplicationScoped |
40 | 41 | public class TaskFileIOSupplier implements BiFunction<TaskEntity, RealmId, FileIO> { |
41 | | - private final MetaStoreManagerFactory metaStoreManagerFactory; |
42 | 42 | private final FileIOFactory fileIOFactory; |
43 | | - private final PolarisConfigurationStore configurationStore; |
44 | 43 |
|
45 | 44 | @Inject |
46 | | - public TaskFileIOSupplier( |
47 | | - MetaStoreManagerFactory metaStoreManagerFactory, |
48 | | - FileIOFactory fileIOFactory, |
49 | | - PolarisConfigurationStore configurationStore) { |
50 | | - this.metaStoreManagerFactory = metaStoreManagerFactory; |
| 45 | + public TaskFileIOSupplier(FileIOFactory fileIOFactory) { |
51 | 46 | this.fileIOFactory = fileIOFactory; |
52 | | - this.configurationStore = configurationStore; |
53 | 47 | } |
54 | 48 |
|
55 | 49 | @Override |
56 | 50 | public FileIO apply(TaskEntity task, RealmId realmId) { |
57 | 51 | Map<String, String> internalProperties = task.getInternalPropertiesAsMap(); |
58 | | - String location = internalProperties.get(PolarisTaskConstants.STORAGE_LOCATION); |
59 | | - PolarisMetaStoreManager metaStoreManager = |
60 | | - metaStoreManagerFactory.getOrCreateMetaStoreManager(realmId); |
61 | | - PolarisMetaStoreSession metaStoreSession = |
62 | | - metaStoreManagerFactory.getOrCreateSessionSupplier(realmId).get(); |
63 | 52 | Map<String, String> properties = new HashMap<>(internalProperties); |
64 | 53 |
|
65 | | - Boolean skipCredentialSubscopingIndirection = |
66 | | - configurationStore.getConfiguration( |
67 | | - realmId, |
68 | | - PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.key, |
69 | | - PolarisConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION.defaultValue); |
| 54 | + TableLikeEntity tableEntity = TableLikeEntity.of(task); |
| 55 | + TableIdentifier identifier = tableEntity.getTableIdentifier(); |
| 56 | + String location = properties.get(PolarisTaskConstants.STORAGE_LOCATION); |
| 57 | + Set<String> locations = Set.of(location); |
| 58 | + Set<PolarisStorageActions> storageActions = Set.of(PolarisStorageActions.ALL); |
| 59 | + ResolvedPolarisEntity resolvedTaskEntity = |
| 60 | + new ResolvedPolarisEntity(task, List.of(), List.of()); |
| 61 | + PolarisResolvedPathWrapper resolvedPath = |
| 62 | + new PolarisResolvedPathWrapper(List.of(resolvedTaskEntity)); |
70 | 63 |
|
71 | | - if (!skipCredentialSubscopingIndirection) { |
72 | | - properties.putAll( |
73 | | - metaStoreManagerFactory |
74 | | - .getOrCreateStorageCredentialCache(realmId) |
75 | | - .getOrGenerateSubScopeCreds( |
76 | | - metaStoreManager, |
77 | | - metaStoreSession, |
78 | | - task, |
79 | | - true, |
80 | | - Set.of(location), |
81 | | - Set.of(location))); |
82 | | - } |
83 | 64 | String ioImpl = |
84 | 65 | properties.getOrDefault( |
85 | 66 | CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.io.ResolvingFileIO"); |
86 | | - return fileIOFactory.loadFileIO(realmId, ioImpl, properties); |
| 67 | + |
| 68 | + return fileIOFactory.loadFileIO( |
| 69 | + realmId, ioImpl, properties, identifier, locations, storageActions, resolvedPath); |
87 | 70 | } |
88 | 71 | } |
0 commit comments