Skip to content

[CORE-580] Enable Compact Data Table Migration from Workspace Settings #3407

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jul 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions core/src/main/scala/org/broadinstitute/dsde/rawls/Boot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -365,24 +365,27 @@ object Boot extends IOApp with LazyLogging {

val workspaceSettingRepository = new WorkspaceSettingRepository(slickDataSource)

val workspaceSettingServiceConstructor: RawlsRequestContext => WorkspaceSettingService =
new WorkspaceSettingService(_,
workspaceSettingRepository,
workspaceRepository,
gcsDAO,
samDAO,
appDependencies.googleStorageService
)(implicitly, IORuntime.global)

val entityServiceConstructor: RawlsRequestContext => EntityService = EntityService.constructor(
// Define both constructors using lazy evaluation to break the circular dependency
lazy val entityServiceConstructor: RawlsRequestContext => EntityService = EntityService.constructor(
slickDataSource,
samDAO,
workbenchMetricBaseName = metricsPrefix,
entityManager,
appConfigManager.conf.getInt("entities.pageSizeLimit"),
Option(workspaceSettingServiceConstructor)
Some(workspaceSettingServiceConstructor)
)

lazy val workspaceSettingServiceConstructor: RawlsRequestContext => WorkspaceSettingService =
(ctx: RawlsRequestContext) =>
new WorkspaceSettingService(ctx,
workspaceSettingRepository,
workspaceRepository,
gcsDAO,
samDAO,
appDependencies.googleStorageService,
entityServiceConstructor(ctx)
)(implicitly, IORuntime.global)

val workspaceServiceConstructor: RawlsRequestContext => WorkspaceService = WorkspaceService.constructor(
slickDataSource,
shardedExecutionServiceCluster,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,16 @@ trait WorkspaceSettingComponent {
&& rec.settingType === settingType.toString
).take(1).result.map(_.map(WorkspaceSettingRecord.toWorkspaceSetting).toList)
)

def getPendingSettingForWorkspaceByType(workspaceId: UUID,
settingType: WorkspaceSettingType
): ReadAction[Option[WorkspaceSetting]] =
uniqueResult(
filter(rec =>
rec.workspaceId === workspaceId
&& rec.status === WorkspaceSettingRecord.SettingStatus.Pending.toString
&& rec.settingType === settingType.toString
).take(1).result.map(_.map(WorkspaceSettingRecord.toWorkspaceSetting).toList)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,28 @@ class EntityManager(providerBuilders: Set[EntityProviderBuilder[_ <: EntityProvi
)
}

// If the workspace has the CompactDataTables setting enabled, use CompactEntityProvider; else use LocalEntityProvider.
val compactDataTables =
workspaceSettingRepository.getWorkspaceSettingOfType(requestArguments.workspace.workspaceIdAsUUID,
CompactDataTables
) map {
case Some(qs: CompactDataTablesSetting) => qs.config.enabled
case _ => false
// If the workspace has the CompactDataTables setting enabled and no pending CompactDataTables Settings,
// use CompactEntityProvider; else use LocalEntityProvider.
val compactDataTables = for {
settingOpt <- workspaceSettingRepository.getWorkspaceSettingOfType(
requestArguments.workspace.workspaceIdAsUUID,
CompactDataTables
)
hasPending <- workspaceSettingRepository.hasPendingSettings(
requestArguments.workspace.workspaceIdAsUUID,
CompactDataTables
)
} yield {
if (hasPending) {
throw new DataEntityException(
s"CompactDataTable migration is in progress for workspace ${requestArguments.workspace.toWorkspaceName}. Writes are temporarily disabled."
)
}
settingOpt match {
case Some(qs: CompactDataTablesSetting) if qs.config.enabled => true
case _ => false
}
}
val targetTagFuture = compactDataTables map {
case true => typeTag[CompactEntityProvider]
case false => typeTag[LocalEntityProvider]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,31 @@ class EntityService(protected val ctx: RawlsRequestContext,
throw new RawlsExceptionWithErrorReport(ErrorReport("Quicksilver already enabled for this workspace"))
}

// Check if there are any pending compactDataTables settings for this workspace
// Pending settings indicate that this originated from a workspace setting request
hasPendingSettings <- traceFutureWithParent("workspaceHasPendingSettings", s) { _ =>
workspaceSettingService.workspaceHasPendingSettings(workspaceName, CompactDataTables)
}

// If there are pending settings, it means migration has already been requested and is currently in progress.
// Prevent starting another migration to avoid conflicts or an inconsistent state.
_ = if (hasPendingSettings) {
throw new RawlsExceptionWithErrorReport(
ErrorReport(StatusCodes.BadRequest, "Quicksilver migration is already in progress for this workspace")
)
}

// If there are no pending settings, we can set the workspace settings to enable Quicksilver migration
// This is done before the migration starts to ensure that the workspace setting marked as "Pending"
_ = if (!hasPendingSettings) {
traceFutureWithParent("setWorkspaceSettings", s) { _ =>
workspaceSettingService.setWorkspaceSettings(
workspaceName,
List(CompactDataTablesSetting(CompactDataTablesConfig(enabled = true)))
)
}
}

// start a transaction; here's where we do a bunch of writes
userResult <- dataSource.inTransaction { dataAccess =>
val shardId: String = dataAccess.determineShard(workspaceId)
Expand Down Expand Up @@ -662,14 +687,6 @@ class EntityService(protected val ctx: RawlsRequestContext,

}

// finally, change the workspace to be quicksilver-enabled
_ <- traceFutureWithParent("setWorkspaceSettings", s) { _ =>
workspaceSettingService.setWorkspaceSettings(
workspaceName,
List(CompactDataTablesSetting(CompactDataTablesConfig(enabled = true)))
)
}

// return a count of entities updated
} yield userResult
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ class WorkspaceSettingRepository(dataSource: SlickDataSource) {
access.workspaceSettingQuery.getAppliedSettingForWorkspaceByType(workspaceId, settingType)
}

// Return if the workspace has pending settings for the settingType. Applied and deleted settings are not returned.
def hasPendingSettings(workspaceId: UUID, settingType: WorkspaceSettingType)(implicit
ec: ExecutionContext
): Future[Boolean] =
dataSource.inTransaction { access =>
access.workspaceSettingQuery
.getPendingSettingForWorkspaceByType(workspaceId, settingType)
.map(_.nonEmpty)
}

// Create new settings for a workspace as pending. If there are any existing pending settings, throw an exception.
def createWorkspaceSettingsRecords(workspaceId: UUID,
workspaceSettings: List[WorkspaceSetting],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.google.cloud.storage.BucketInfo.{LifecycleRule, SoftDeletePolicy}
import com.google.cloud.storage.Storage
import com.typesafe.scalalogging.LazyLogging
import org.broadinstitute.dsde.rawls.dataaccess.{GoogleServicesDAO, SamDAO}
import org.broadinstitute.dsde.rawls.entities.EntityService
import org.broadinstitute.dsde.rawls.model.WorkspaceSettingConfig._
import org.broadinstitute.dsde.rawls.model.WorkspaceSettingTypes.WorkspaceSettingType
import org.broadinstitute.dsde.rawls.model.{
Expand All @@ -29,7 +30,8 @@ import org.broadinstitute.dsde.rawls.model.{
Workspace,
WorkspaceName,
WorkspaceSetting,
WorkspaceSettingResponse
WorkspaceSettingResponse,
WorkspaceSettingTypes
}
import org.broadinstitute.dsde.rawls.util.WorkspaceSupport
import org.broadinstitute.dsde.rawls.{RawlsException, RawlsExceptionWithErrorReport}
Expand All @@ -47,7 +49,8 @@ class WorkspaceSettingService(protected val ctx: RawlsRequestContext,
val workspaceRepository: WorkspaceRepository,
gcsDAO: GoogleServicesDAO,
val samDAO: SamDAO,
googleStorageService: GoogleStorageService[IO]
googleStorageService: GoogleStorageService[IO],
entityService: EntityService
)(implicit protected val executionContext: ExecutionContext, ioRuntime: IORuntime)
extends WorkspaceSupport
with LazyLogging {
Expand All @@ -74,6 +77,11 @@ class WorkspaceSettingService(protected val ctx: RawlsRequestContext,
workspaceSettingRepository.getWorkspaceSettings(workspace.workspaceIdAsUUID)
}

// Returns true if the workspace has any pending settings.
def workspaceHasPendingSettings(workspaceName: WorkspaceName, settingType: WorkspaceSettingType): Future[Boolean] =
getV2WorkspaceContextAndPermissions(workspaceName, SamWorkspaceActions.readSettings).flatMap { workspace =>
workspaceSettingRepository.hasPendingSettings(workspace.workspaceIdAsUUID, settingType)
}
def setWorkspaceSettings(workspaceName: WorkspaceName,
workspaceSettings: List[WorkspaceSetting]
): Future[WorkspaceSettingResponse] = {
Expand Down Expand Up @@ -201,8 +209,8 @@ class WorkspaceSettingService(protected val ctx: RawlsRequestContext,
// SeparateSubmissionFinalOutputsSetting, UseCromwellGcpBatchBackendSetting, and CompactDataTablesSetting
// are not bucket settings, so we do not need to apply anything here

case CompactDataTablesSetting(CompactDataTablesConfig(_)) =>
Future.successful(())
case CompactDataTablesSetting(CompactDataTablesConfig(enabled)) =>
applyCompactDataTablesSetting(WorkspaceName(workspace.namespace, workspace.name), enabled)

case SeparateSubmissionFinalOutputsSetting(SeparateSubmissionFinalOutputsConfig(_)) =>
Future.successful(())
Expand Down Expand Up @@ -270,4 +278,32 @@ class WorkspaceSettingService(protected val ctx: RawlsRequestContext,
}
_ <- iamPolicyAction.compile.drain.unsafeToFuture()
} yield ()

/**
* Call to handle entity attributes migration when compact data tables setting enabled.
*/
private def applyCompactDataTablesSetting(workspaceName: WorkspaceName, enabled: Boolean): Future[Unit] =
if (!enabled) {
// Check if the setting is already enabled in the database
getWorkspaceSettingOfType(workspaceName, WorkspaceSettingTypes.CompactDataTables).flatMap {
case Some(CompactDataTablesSetting(CompactDataTablesConfig(true))) =>
throw new RawlsExceptionWithErrorReport(
ErrorReport(StatusCodes.BadRequest, "Cannot disable compact data tables setting once enabled.")
)
case _ =>
Future.successful(())
}
} else {
// If compact data tables setting is enabled, we need to migrate the entity attributes.
Future {
entityService
.quicksilverMigration(workspaceName = workspaceName)
.map(_ => ())
.recover { case e: Exception =>
throw new RawlsExceptionWithErrorReport(
ErrorReport(StatusCodes.InternalServerError, s"Quicksilver migration failed: ${e.getMessage}")
)
}
}.flatten
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import akka.http.scaladsl.model.headers.OAuth2BearerToken
import org.broadinstitute.dsde.rawls.dataaccess.SlickDataSource
import org.broadinstitute.dsde.rawls.entities.base.AuditLoggingEntityProvider
import org.broadinstitute.dsde.rawls.entities.compact.CompactEntityProvider
import org.broadinstitute.dsde.rawls.entities.exceptions.DataEntityException
import org.broadinstitute.dsde.rawls.entities.local.LocalEntityProvider
import org.broadinstitute.dsde.rawls.model.WorkspaceSettingConfig.CompactDataTablesConfig
import org.broadinstitute.dsde.rawls.model.WorkspaceSettingTypes.CompactDataTables
Expand Down Expand Up @@ -65,6 +66,10 @@ class EntityManagerSpec extends AnyFlatSpec with MockitoTestUtils with Matchers

val entityRequestArguments = EntityRequestArguments(workspace, defaultRequestContext)

// Mock workspace has no pending compact data tables setting
when(workspaceSettingRepository.hasPendingSettings(workspaceId, CompactDataTables))
.thenReturn(Future.successful(false))

// check the EntityManager behavior when the compact data tables setting is not set
when(workspaceSettingRepository.getWorkspaceSettingOfType(workspaceId, CompactDataTables))
.thenReturn(Future.successful(None))
Expand Down Expand Up @@ -94,6 +99,16 @@ class EntityManagerSpec extends AnyFlatSpec with MockitoTestUtils with Matchers
afterUpdate shouldBe a[AuditLoggingEntityProvider]
val afterUpdateAuditProvider = afterUpdate.asInstanceOf[AuditLoggingEntityProvider]
afterUpdateAuditProvider.delegate shouldBe a[LocalEntityProvider]

// check the EntityManager behavior when there are pending compact data tables settings
when(workspaceSettingRepository.hasPendingSettings(workspaceId, CompactDataTables))
.thenReturn(Future.successful(true))
when(workspaceSettingRepository.getWorkspaceSettingOfType(workspaceId, CompactDataTables))
.thenReturn(Future.successful(Option(CompactDataTablesSetting(CompactDataTablesConfig(enabled = true)))))
val afterSettingPending = intercept[DataEntityException] {
Await.result(entityManager.resolveProviderFuture(entityRequestArguments), Duration.Inf)
}
afterSettingPending.getMessage should include("migration is in progress")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ class EntityServiceCompactMigrationSpec
new WorkspaceRepository(dataSource),
new MockGoogleServicesDAO("groupsPrefix"),
samDAO,
googleStorageService
googleStorageService,
entityService
)(executionContext, global)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,16 @@ trait ApiServiceSpec
new RequesterPaysSetupServiceImpl(slickDataSource, gcsDAO, bondApiDAO, requesterPaysRole = "requesterPaysRole")

override val workspaceSettingServiceConstructor: RawlsRequestContext => WorkspaceSettingService =
new WorkspaceSettingService(_,
new WorkspaceSettingRepository(slickDataSource),
new WorkspaceRepository(slickDataSource),
gcsDAO,
samDAO,
mock[GoogleStorageService[IO]]
)
ctx =>
new WorkspaceSettingService(
ctx,
new WorkspaceSettingRepository(slickDataSource),
new WorkspaceRepository(slickDataSource),
gcsDAO,
samDAO,
mock[GoogleStorageService[IO]],
entityServiceConstructor(ctx)
)

val entityManager = EntityManager.defaultEntityManager(
dataSource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ class EntityApiServiceProviderEquivalenceSpec extends ApiServiceSpec with SprayJ

def withApiServices[T](dataSource: SlickDataSource)(testCode: TestApiService => T): T = {
val apiService = TestApiService(dataSource, new MockGoogleServicesDAO("test"), new MockGooglePubSubDAO)
try {
setupProviders(apiService) // tweaks to minimalTestData for this test suite
try
testCode(apiService)
} finally
finally
apiService.cleanupSupervisor
}

Expand Down Expand Up @@ -271,18 +270,6 @@ class EntityApiServiceProviderEquivalenceSpec extends ApiServiceSpec with SprayJ
// helper methods
// ====================================================================================================

private def setupProviders(services: TestApiService): Unit = {
// configure compactWs to use compact ("Quicksilver") data tables
// no changes to legacyWs, so that will use legacy data tables
val compactDataTablesSetting = CompactDataTablesSetting(CompactDataTablesConfig(enabled = true))
val workspaceSettingService = services.workspaceSettingServiceConstructor(testContext)
val workspaceSettingResponse = Await.result(
workspaceSettingService.setWorkspaceSettings(compactWs.toWorkspaceName, List(compactDataTablesSetting)),
atMost
)
workspaceSettingResponse.successes should have size 1
}

private def withHandlers(route: server.Route): server.Route =
(handleExceptions(RawlsApiService.exceptionHandler) & handleRejections(RawlsApiService.rejectionHandler)) {
route
Expand Down
Loading
Loading