diff --git a/src/main/scala/models/settings/RetrySettings.scala b/src/main/scala/models/settings/RetrySettings.scala new file mode 100644 index 00000000..1118ffb1 --- /dev/null +++ b/src/main/scala/models/settings/RetrySettings.scala @@ -0,0 +1,27 @@ +package com.sneaksanddata.arcane.framework +package models.settings + +import zio.Schedule + +import java.time.Duration + + +enum RetryPolicyType: + case ExponentialBackoff + case None + +trait RetrySettings: + val policyType: RetryPolicyType + + val initialDurationSeconds: Int + val retryCount: Int + + +object RetrySettings: + type ScheduleType = Schedule.WithState[(Long, Long), Any, Any, (zio.Duration, Long)] + + extension (settings: RetrySettings) + def toSchedule: Option[ScheduleType] = settings.policyType match + case RetryPolicyType.ExponentialBackoff => Some(Schedule.exponential(Duration.ofSeconds(settings.initialDurationSeconds)) && Schedule.recurs(settings.initialDurationSeconds)) + case RetryPolicyType.None => None + diff --git a/src/main/scala/models/settings/StagingDataSettings.scala b/src/main/scala/models/settings/StagingDataSettings.scala index 2daeab6d..9563eb7d 100644 --- a/src/main/scala/models/settings/StagingDataSettings.scala +++ b/src/main/scala/models/settings/StagingDataSettings.scala @@ -6,7 +6,8 @@ import java.time.format.DateTimeFormatter import java.util.UUID trait StagingDataSettings: - protected val stagingTablePrefix: String + val stagingTablePrefix: String + val retryPolicy: RetrySettings def newStagingTableName: String = val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy_MM_dd_HH_mm_ss") diff --git a/src/main/scala/services/lakehouse/IcebergS3CatalogWriter.scala b/src/main/scala/services/lakehouse/IcebergS3CatalogWriter.scala index b235f70d..666df1e8 100644 --- a/src/main/scala/services/lakehouse/IcebergS3CatalogWriter.scala +++ b/src/main/scala/services/lakehouse/IcebergS3CatalogWriter.scala @@ -115,11 +115,13 @@ object IcebergS3CatalogWriter: /** * The ZLayer that creates the LazyOutputDataProcessor. */ - val layer: ZLayer[IcebergCatalogSettings, Nothing, CatalogWriter[RESTCatalog, Table, Schema]] = + val layer: ZLayer[IcebergCatalogSettings, Throwable, IcebergS3CatalogWriter] = ZLayer { for settings <- ZIO.service[IcebergCatalogSettings] - yield IcebergS3CatalogWriter(settings) + icebergS3CatalogWriterBuilder = IcebergS3CatalogWriter(settings) + writer <- ZIO.attemptBlocking(icebergS3CatalogWriterBuilder.initialize()) + yield writer } /** diff --git a/src/main/scala/services/streaming/processors/transformers/StagingProcessor.scala b/src/main/scala/services/streaming/processors/transformers/StagingProcessor.scala index 98ae6c58..cd555b70 100644 --- a/src/main/scala/services/streaming/processors/transformers/StagingProcessor.scala +++ b/src/main/scala/services/streaming/processors/transformers/StagingProcessor.scala @@ -11,8 +11,10 @@ import services.lakehouse.given_Conversion_ArcaneSchema_Schema import services.streaming.base.{MetadataEnrichedRowStreamElement, RowGroupTransformer, StagedBatchProcessor} import services.streaming.processors.transformers.StagingProcessor.toStagedBatch +import com.sneaksanddata.arcane.framework.models.settings.RetryPolicyType.ExponentialBackoff import org.apache.iceberg.rest.RESTCatalog import org.apache.iceberg.{Schema, Table} +import zio.Cause.Fail import zio.stream.ZPipeline import zio.{Chunk, Schedule, Task, ZIO, ZLayer} @@ -46,9 +48,16 @@ class StagingProcessor(stagingDataSettings: StagingDataSettings, .map { case ((batches, others), index) => toInFlightBatch(batches, index, others) } private def writeDataRows(rows: Chunk[DataRow], arcaneSchema: ArcaneSchema): Task[StagedVersionedBatch & MergeableBatch & ArchiveableBatch] = - val tableWriterEffect = zlog("Attempting to write data to staging table") *> catalogWriter.write(rows, stagingDataSettings.newStagingTableName, arcaneSchema) + val tableWriterEffect = zlog("Attempting to write data to staging table") *> + catalogWriter + .write(rows, stagingDataSettings.newStagingTableName, arcaneSchema) + .tapErrorCause(e => zlog("Failed to write data to staging table", e)) + + val retryEffect = stagingDataSettings.retryPolicy.toSchedule match + case Some(schedule) => tableWriterEffect.retry(schedule) + case None => tableWriterEffect for - table <- tableWriterEffect.retry(retryPolicy) + table <- retryEffect batch = table.toStagedBatch(icebergCatalogSettings.namespace, icebergCatalogSettings.warehouse, arcaneSchema, diff --git a/src/test/scala/utils/TestStagingDataSettings.scala b/src/test/scala/utils/TestStagingDataSettings.scala index e486453a..c5f4e2f7 100644 --- a/src/test/scala/utils/TestStagingDataSettings.scala +++ b/src/test/scala/utils/TestStagingDataSettings.scala @@ -1,6 +1,10 @@ package com.sneaksanddata.arcane.framework package utils -import models.settings.StagingDataSettings +import models.settings.{RetryPolicyType, RetrySettings, StagingDataSettings} object TestStagingDataSettings extends StagingDataSettings: override val stagingTablePrefix = "staging_" + override val retryPolicy: RetrySettings = new RetrySettings: + override val policyType: RetryPolicyType = RetryPolicyType.None + override val initialDurationSeconds: Int = 0 + override val retryCount: Int = 0