Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to overwrite retry policy #58

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/main/scala/models/settings/RetrySettings.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.sneaksanddata.arcane.framework
package models.settings

import zio.Schedule

import java.time.Duration


enum RetryPolicyType:
case ExponentialBackoff
case None

trait RetrySettings:
val policyType: RetryPolicyType

val initialDurationSeconds: Int
val retryCount: Int


object RetrySettings:
type ScheduleType = Schedule.WithState[(Long, Long), Any, Any, (zio.Duration, Long)]

extension (settings: RetrySettings)
def toSchedule: Option[ScheduleType] = settings.policyType match
case RetryPolicyType.ExponentialBackoff => Some(Schedule.exponential(Duration.ofSeconds(settings.initialDurationSeconds)) && Schedule.recurs(settings.initialDurationSeconds))
case RetryPolicyType.None => None

3 changes: 2 additions & 1 deletion src/main/scala/models/settings/StagingDataSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import java.time.format.DateTimeFormatter
import java.util.UUID

trait StagingDataSettings:
protected val stagingTablePrefix: String
val stagingTablePrefix: String
val retryPolicy: RetrySettings

def newStagingTableName: String =
val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy_MM_dd_HH_mm_ss")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,13 @@ object IcebergS3CatalogWriter:
/**
* The ZLayer that creates the LazyOutputDataProcessor.
*/
val layer: ZLayer[IcebergCatalogSettings, Nothing, CatalogWriter[RESTCatalog, Table, Schema]] =
val layer: ZLayer[IcebergCatalogSettings, Throwable, IcebergS3CatalogWriter] =
ZLayer {
for
settings <- ZIO.service[IcebergCatalogSettings]
yield IcebergS3CatalogWriter(settings)
icebergS3CatalogWriterBuilder = IcebergS3CatalogWriter(settings)
writer <- ZIO.attemptBlocking(icebergS3CatalogWriterBuilder.initialize())
yield writer
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import services.lakehouse.given_Conversion_ArcaneSchema_Schema
import services.streaming.base.{MetadataEnrichedRowStreamElement, RowGroupTransformer, StagedBatchProcessor}
import services.streaming.processors.transformers.StagingProcessor.toStagedBatch

import com.sneaksanddata.arcane.framework.models.settings.RetryPolicyType.ExponentialBackoff
import org.apache.iceberg.rest.RESTCatalog
import org.apache.iceberg.{Schema, Table}
import zio.Cause.Fail
import zio.stream.ZPipeline
import zio.{Chunk, Schedule, Task, ZIO, ZLayer}

Expand Down Expand Up @@ -46,9 +48,16 @@ class StagingProcessor(stagingDataSettings: StagingDataSettings,
.map { case ((batches, others), index) => toInFlightBatch(batches, index, others) }

private def writeDataRows(rows: Chunk[DataRow], arcaneSchema: ArcaneSchema): Task[StagedVersionedBatch & MergeableBatch & ArchiveableBatch] =
val tableWriterEffect = zlog("Attempting to write data to staging table") *> catalogWriter.write(rows, stagingDataSettings.newStagingTableName, arcaneSchema)
val tableWriterEffect = zlog("Attempting to write data to staging table") *>
catalogWriter
.write(rows, stagingDataSettings.newStagingTableName, arcaneSchema)
.tapErrorCause(e => zlog("Failed to write data to staging table", e))

val retryEffect = stagingDataSettings.retryPolicy.toSchedule match
case Some(schedule) => tableWriterEffect.retry(schedule)
case None => tableWriterEffect
for
table <- tableWriterEffect.retry(retryPolicy)
table <- retryEffect
batch = table.toStagedBatch(icebergCatalogSettings.namespace,
icebergCatalogSettings.warehouse,
arcaneSchema,
Expand Down
6 changes: 5 additions & 1 deletion src/test/scala/utils/TestStagingDataSettings.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.sneaksanddata.arcane.framework
package utils
import models.settings.StagingDataSettings
import models.settings.{RetryPolicyType, RetrySettings, StagingDataSettings}

object TestStagingDataSettings extends StagingDataSettings:
override val stagingTablePrefix = "staging_"
override val retryPolicy: RetrySettings = new RetrySettings:
override val policyType: RetryPolicyType = RetryPolicyType.None
override val initialDurationSeconds: Int = 0
override val retryCount: Int = 0
Loading