diff --git a/src/main/scala/services/streaming/base/BackfillStreamingDataProvider.scala b/src/main/scala/services/streaming/base/BackfillStreamingDataProvider.scala new file mode 100644 index 0000000..97d9173 --- /dev/null +++ b/src/main/scala/services/streaming/base/BackfillStreamingDataProvider.scala @@ -0,0 +1,19 @@ +package com.sneaksanddata.arcane.framework +package services.streaming.base + +import services.mssql.MsSqlConnection.BackfillBatch + +import com.sneaksanddata.arcane.framework.services.consumers.{StagedBackfillBatch, StagedBackfillOverwriteBatch} +import zio.Task + +/** + * A trait that represents a backfill data provider. + */ +trait BackfillStreamingDataProvider: + + /** + * Provides the backfill data. + * + * @return A task that represents the backfill data. + */ + def requestBackfill: Task[StagedBackfillOverwriteBatch] diff --git a/src/main/scala/services/streaming/base/BatchProcessor.scala b/src/main/scala/services/streaming/base/BatchProcessor.scala index d3ca98e..bbec55f 100644 --- a/src/main/scala/services/streaming/base/BatchProcessor.scala +++ b/src/main/scala/services/streaming/base/BatchProcessor.scala @@ -1,12 +1,14 @@ package com.sneaksanddata.arcane.framework package services.streaming.base +import org.apache.hc.core5.annotation.Obsolete import zio.stream.ZPipeline /** * A trait that represents a batch processor. * @tparam IncomingType The type of the incoming data. */ +@Obsolete trait BatchProcessor[IncomingType, OutgoingType] { /** diff --git a/src/main/scala/services/streaming/graph_builders/GenericGraphBuilderFactory.scala b/src/main/scala/services/streaming/graph_builders/GenericGraphBuilderFactory.scala index 75879f6..73f3fea 100644 --- a/src/main/scala/services/streaming/graph_builders/GenericGraphBuilderFactory.scala +++ b/src/main/scala/services/streaming/graph_builders/GenericGraphBuilderFactory.scala @@ -6,7 +6,6 @@ import models.app.StreamContext import models.settings.{BackfillBehavior, BackfillSettings} import services.streaming.base.StreamingGraphBuilder import services.streaming.graph_builders.backfill.{GenericBackfillMergeGraphBuilder, GenericBackfillOverwriteGraphBuilder} -import services.streaming.graph_builders.base.GenericStreamingGraphBuilder import zio.{ZIO, ZLayer} diff --git a/src/main/scala/services/streaming/graph_builders/base/GenericStreamingGraphBuilder.scala b/src/main/scala/services/streaming/graph_builders/GenericStreamingGraphBuilder.scala similarity index 98% rename from src/main/scala/services/streaming/graph_builders/base/GenericStreamingGraphBuilder.scala rename to src/main/scala/services/streaming/graph_builders/GenericStreamingGraphBuilder.scala index fbab11d..0bf2f91 100644 --- a/src/main/scala/services/streaming/graph_builders/base/GenericStreamingGraphBuilder.scala +++ b/src/main/scala/services/streaming/graph_builders/GenericStreamingGraphBuilder.scala @@ -1,5 +1,5 @@ package com.sneaksanddata.arcane.framework -package services.streaming.graph_builders.base +package services.streaming.graph_builders import services.app.base.StreamLifetimeService import services.streaming.base.{HookManager, StreamDataProvider, StreamingGraphBuilder} diff --git a/src/main/scala/services/streaming/graph_builders/backfill/GenericBackfillMergeGraphBuilder.scala b/src/main/scala/services/streaming/graph_builders/backfill/GenericBackfillMergeGraphBuilder.scala index c15f485..280db07 100644 --- a/src/main/scala/services/streaming/graph_builders/backfill/GenericBackfillMergeGraphBuilder.scala +++ b/src/main/scala/services/streaming/graph_builders/backfill/GenericBackfillMergeGraphBuilder.scala @@ -1,7 +1,7 @@ package com.sneaksanddata.arcane.framework package services.streaming.graph_builders.backfill -import services.streaming.base.{BackfillStreamingGraphBuilder, BackfillStreamingMergeDataProvider, BackfillStreamingOverwriteDataProvider, StreamDataProvider} +import services.streaming.base.{BackfillStreamingGraphBuilder, BackfillStreamingMergeDataProvider, StreamDataProvider} import services.streaming.processors.batch_processors.backfill.BackfillApplyBatchProcessor import zio.stream.ZStream diff --git a/src/test/scala/services/streaming/GenericStreamRunnerServiceTests.scala b/src/test/scala/services/streaming/GenericStreamRunnerServiceTests.scala index 2bf8dc5..885e7e5 100644 --- a/src/test/scala/services/streaming/GenericStreamRunnerServiceTests.scala +++ b/src/test/scala/services/streaming/GenericStreamRunnerServiceTests.scala @@ -9,8 +9,7 @@ import services.consumers.SqlServerChangeTrackingMergeBatch import services.filters.FieldsFilteringService import services.lakehouse.base.CatalogWriter import services.merging.JdbcTableManager -import services.streaming.base.{HookManager, StreamDataProvider} -import services.streaming.graph_builders.base.GenericStreamingGraphBuilder +import services.streaming.base.{BackfillDataProvider, BackfillStreamingDataProvider, HookManager, StreamDataProvider} import services.streaming.processors.GenericGroupingTransformer import services.streaming.processors.batch_processors.streaming.{DisposeBatchProcessor, MergeBatchProcessor} import services.streaming.processors.transformers.FieldFilteringTransformer.Environment @@ -19,6 +18,8 @@ import services.streaming.processors.utils.TestIndexedStagedBatches import utils.* import com.sneaksanddata.arcane.framework.models.app.StreamContext +import com.sneaksanddata.arcane.framework.services.streaming.graph_builders.GenericStreamingGraphBuilder +import com.sneaksanddata.arcane.framework.services.streaming.graph_builders.backfill.GenericBackfillOverwriteGraphBuilder import org.apache.iceberg.rest.RESTCatalog import org.apache.iceberg.{Schema, Table} import org.easymock.EasyMock @@ -47,6 +48,7 @@ class GenericStreamRunnerServiceTests extends AsyncFlatSpec with Matchers with E val jdbcTableManager = mock[JdbcTableManager] val hookManager = mock[HookManager] val streamDataProvider = mock[StreamDataProvider] + val backfillDataProvider = mock[BackfillStreamingDataProvider] val catalogWriter = mock[CatalogWriter[RESTCatalog, Table, Schema]] val tableMock = mock[Table] diff --git a/src/test/scala/services/streaming/graph_builders/GenericGraphBuilderFactoryTests.scala b/src/test/scala/services/streaming/graph_builders/GenericGraphBuilderFactoryTests.scala index f0f20f9..c9d91cf 100644 --- a/src/test/scala/services/streaming/graph_builders/GenericGraphBuilderFactoryTests.scala +++ b/src/test/scala/services/streaming/graph_builders/GenericGraphBuilderFactoryTests.scala @@ -6,7 +6,7 @@ import com.sneaksanddata.arcane.framework.models.settings.{BackfillBehavior, Bac import com.sneaksanddata.arcane.framework.services.app.base.StreamLifetimeService import com.sneaksanddata.arcane.framework.services.streaming.base.{BackfillStreamingMergeDataProvider, BackfillStreamingOverwriteDataProvider, HookManager, StreamDataProvider, StreamingGraphBuilder} import com.sneaksanddata.arcane.framework.services.streaming.graph_builders.backfill.{GenericBackfillMergeGraphBuilder, GenericBackfillOverwriteGraphBuilder} -import com.sneaksanddata.arcane.framework.services.streaming.graph_builders.base.GenericStreamingGraphBuilder +import com.sneaksanddata.arcane.framework.services.streaming.graph_builders.GenericStreamingGraphBuilder import com.sneaksanddata.arcane.framework.services.streaming.processors.GenericGroupingTransformer import com.sneaksanddata.arcane.framework.services.streaming.processors.batch_processors.backfill.BackfillApplyBatchProcessor import com.sneaksanddata.arcane.framework.services.streaming.processors.batch_processors.streaming.{DisposeBatchProcessor, MergeBatchProcessor} diff --git a/src/test/scala/services/streaming/processors/MergeBatchProcessorTests.scala b/src/test/scala/services/streaming/processors/MergeBatchProcessorTests.scala index c346ef2..ba42320 100644 --- a/src/test/scala/services/streaming/processors/MergeBatchProcessorTests.scala +++ b/src/test/scala/services/streaming/processors/MergeBatchProcessorTests.scala @@ -8,6 +8,10 @@ import services.consumers.SynapseLinkMergeBatch import services.merging.JdbcTableManager import services.streaming.processors.batch_processors.streaming.MergeBatchProcessor import services.streaming.processors.utils.TestIndexedStagedBatches +import services.merging.models.{JdbcOptimizationRequest, JdbcOrphanFilesExpirationRequest, JdbcSnapshotExpirationRequest} +import services.streaming.base.{OptimizationRequestConvertable, OrphanFilesExpirationRequestConvertable, SnapshotExpirationRequestConvertable} +import services.streaming.processors.transformers.IndexedStagedBatches + import com.sneaksanddata.arcane.framework.utils.{TablePropertiesSettings, TestTargetTableSettings, TestTargetTableSettingsWithMaintenance} import org.easymock.EasyMock