From 13370f1b83e2ffea61ff72a9a8892c5af1be1276 Mon Sep 17 00:00:00 2001 From: Vitalii Savitskii Date: Tue, 4 Mar 2025 11:05:41 +0100 Subject: [PATCH 1/5] Add ability to customize retry settings --- .../scala/models/settings/RetrySettings.scala | 27 +++++++++++++++++++ .../models/settings/StagingDataSettings.scala | 3 ++- .../transformers/StagingProcessor.scala | 6 ++++- 3 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 src/main/scala/models/settings/RetrySettings.scala diff --git a/src/main/scala/models/settings/RetrySettings.scala b/src/main/scala/models/settings/RetrySettings.scala new file mode 100644 index 00000000..1118ffb1 --- /dev/null +++ b/src/main/scala/models/settings/RetrySettings.scala @@ -0,0 +1,27 @@ +package com.sneaksanddata.arcane.framework +package models.settings + +import zio.Schedule + +import java.time.Duration + + +enum RetryPolicyType: + case ExponentialBackoff + case None + +trait RetrySettings: + val policyType: RetryPolicyType + + val initialDurationSeconds: Int + val retryCount: Int + + +object RetrySettings: + type ScheduleType = Schedule.WithState[(Long, Long), Any, Any, (zio.Duration, Long)] + + extension (settings: RetrySettings) + def toSchedule: Option[ScheduleType] = settings.policyType match + case RetryPolicyType.ExponentialBackoff => Some(Schedule.exponential(Duration.ofSeconds(settings.initialDurationSeconds)) && Schedule.recurs(settings.initialDurationSeconds)) + case RetryPolicyType.None => None + diff --git a/src/main/scala/models/settings/StagingDataSettings.scala b/src/main/scala/models/settings/StagingDataSettings.scala index 2daeab6d..9563eb7d 100644 --- a/src/main/scala/models/settings/StagingDataSettings.scala +++ b/src/main/scala/models/settings/StagingDataSettings.scala @@ -6,7 +6,8 @@ import java.time.format.DateTimeFormatter import java.util.UUID trait StagingDataSettings: - protected val stagingTablePrefix: String + val stagingTablePrefix: String + val retryPolicy: RetrySettings def newStagingTableName: String = val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy_MM_dd_HH_mm_ss") diff --git a/src/main/scala/services/streaming/processors/transformers/StagingProcessor.scala b/src/main/scala/services/streaming/processors/transformers/StagingProcessor.scala index 98ae6c58..4aaf7cfb 100644 --- a/src/main/scala/services/streaming/processors/transformers/StagingProcessor.scala +++ b/src/main/scala/services/streaming/processors/transformers/StagingProcessor.scala @@ -11,6 +11,7 @@ import services.lakehouse.given_Conversion_ArcaneSchema_Schema import services.streaming.base.{MetadataEnrichedRowStreamElement, RowGroupTransformer, StagedBatchProcessor} import services.streaming.processors.transformers.StagingProcessor.toStagedBatch +import com.sneaksanddata.arcane.framework.models.settings.RetryPolicyType.ExponentialBackoff import org.apache.iceberg.rest.RESTCatalog import org.apache.iceberg.{Schema, Table} import zio.stream.ZPipeline @@ -47,8 +48,11 @@ class StagingProcessor(stagingDataSettings: StagingDataSettings, private def writeDataRows(rows: Chunk[DataRow], arcaneSchema: ArcaneSchema): Task[StagedVersionedBatch & MergeableBatch & ArchiveableBatch] = val tableWriterEffect = zlog("Attempting to write data to staging table") *> catalogWriter.write(rows, stagingDataSettings.newStagingTableName, arcaneSchema) + val retryEffect = stagingDataSettings.retryPolicy.toSchedule match + case Some(schedule) => tableWriterEffect.retry(schedule) + case None => tableWriterEffect for - table <- tableWriterEffect.retry(retryPolicy) + table <- retryEffect batch = table.toStagedBatch(icebergCatalogSettings.namespace, icebergCatalogSettings.warehouse, arcaneSchema, From 2136db8b4fe3a96de62c1ed2f3cc01adca1ea084 Mon Sep 17 00:00:00 2001 From: Vitalii Savitskii Date: Tue, 4 Mar 2025 11:06:19 +0100 Subject: [PATCH 2/5] Add ability to customize retry settings --- .../streaming/processors/transformers/StagingProcessor.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/scala/services/streaming/processors/transformers/StagingProcessor.scala b/src/main/scala/services/streaming/processors/transformers/StagingProcessor.scala index 4aaf7cfb..c6ed8f55 100644 --- a/src/main/scala/services/streaming/processors/transformers/StagingProcessor.scala +++ b/src/main/scala/services/streaming/processors/transformers/StagingProcessor.scala @@ -47,7 +47,9 @@ class StagingProcessor(stagingDataSettings: StagingDataSettings, .map { case ((batches, others), index) => toInFlightBatch(batches, index, others) } private def writeDataRows(rows: Chunk[DataRow], arcaneSchema: ArcaneSchema): Task[StagedVersionedBatch & MergeableBatch & ArchiveableBatch] = - val tableWriterEffect = zlog("Attempting to write data to staging table") *> catalogWriter.write(rows, stagingDataSettings.newStagingTableName, arcaneSchema) + val tableWriterEffect = zlog("Attempting to write data to staging table") *> + catalogWriter.write(rows, stagingDataSettings.newStagingTableName, arcaneSchema) + val retryEffect = stagingDataSettings.retryPolicy.toSchedule match case Some(schedule) => tableWriterEffect.retry(schedule) case None => tableWriterEffect From 4b233a816915ed30e1cd40bda9248f7b263d92be Mon Sep 17 00:00:00 2001 From: Vitalii Savitskii Date: Tue, 4 Mar 2025 11:09:50 +0100 Subject: [PATCH 3/5] Add error logs to retries --- .../streaming/processors/transformers/StagingProcessor.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/scala/services/streaming/processors/transformers/StagingProcessor.scala b/src/main/scala/services/streaming/processors/transformers/StagingProcessor.scala index c6ed8f55..cd555b70 100644 --- a/src/main/scala/services/streaming/processors/transformers/StagingProcessor.scala +++ b/src/main/scala/services/streaming/processors/transformers/StagingProcessor.scala @@ -14,6 +14,7 @@ import services.streaming.processors.transformers.StagingProcessor.toStagedBatch import com.sneaksanddata.arcane.framework.models.settings.RetryPolicyType.ExponentialBackoff import org.apache.iceberg.rest.RESTCatalog import org.apache.iceberg.{Schema, Table} +import zio.Cause.Fail import zio.stream.ZPipeline import zio.{Chunk, Schedule, Task, ZIO, ZLayer} @@ -48,7 +49,9 @@ class StagingProcessor(stagingDataSettings: StagingDataSettings, private def writeDataRows(rows: Chunk[DataRow], arcaneSchema: ArcaneSchema): Task[StagedVersionedBatch & MergeableBatch & ArchiveableBatch] = val tableWriterEffect = zlog("Attempting to write data to staging table") *> - catalogWriter.write(rows, stagingDataSettings.newStagingTableName, arcaneSchema) + catalogWriter + .write(rows, stagingDataSettings.newStagingTableName, arcaneSchema) + .tapErrorCause(e => zlog("Failed to write data to staging table", e)) val retryEffect = stagingDataSettings.retryPolicy.toSchedule match case Some(schedule) => tableWriterEffect.retry(schedule) From 7640087f9c093478add75409ae71b59c074254f0 Mon Sep 17 00:00:00 2001 From: Vitalii Savitskii Date: Tue, 4 Mar 2025 11:31:56 +0100 Subject: [PATCH 4/5] Initialize IcebergS3CatalogWriter on inject --- .../scala/services/lakehouse/IcebergS3CatalogWriter.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/scala/services/lakehouse/IcebergS3CatalogWriter.scala b/src/main/scala/services/lakehouse/IcebergS3CatalogWriter.scala index b235f70d..666df1e8 100644 --- a/src/main/scala/services/lakehouse/IcebergS3CatalogWriter.scala +++ b/src/main/scala/services/lakehouse/IcebergS3CatalogWriter.scala @@ -115,11 +115,13 @@ object IcebergS3CatalogWriter: /** * The ZLayer that creates the LazyOutputDataProcessor. */ - val layer: ZLayer[IcebergCatalogSettings, Nothing, CatalogWriter[RESTCatalog, Table, Schema]] = + val layer: ZLayer[IcebergCatalogSettings, Throwable, IcebergS3CatalogWriter] = ZLayer { for settings <- ZIO.service[IcebergCatalogSettings] - yield IcebergS3CatalogWriter(settings) + icebergS3CatalogWriterBuilder = IcebergS3CatalogWriter(settings) + writer <- ZIO.attemptBlocking(icebergS3CatalogWriterBuilder.initialize()) + yield writer } /** From 8b34b0a6158ad0534946240118f724dc4624f3d3 Mon Sep 17 00:00:00 2001 From: Vitalii Savitskii Date: Tue, 4 Mar 2025 11:48:18 +0100 Subject: [PATCH 5/5] Build fix --- src/test/scala/utils/TestStagingDataSettings.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/test/scala/utils/TestStagingDataSettings.scala b/src/test/scala/utils/TestStagingDataSettings.scala index e486453a..c5f4e2f7 100644 --- a/src/test/scala/utils/TestStagingDataSettings.scala +++ b/src/test/scala/utils/TestStagingDataSettings.scala @@ -1,6 +1,10 @@ package com.sneaksanddata.arcane.framework package utils -import models.settings.StagingDataSettings +import models.settings.{RetryPolicyType, RetrySettings, StagingDataSettings} object TestStagingDataSettings extends StagingDataSettings: override val stagingTablePrefix = "staging_" + override val retryPolicy: RetrySettings = new RetrySettings: + override val policyType: RetryPolicyType = RetryPolicyType.None + override val initialDurationSeconds: Int = 0 + override val retryCount: Int = 0