Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AJ-1347: entityQuery streams entities #2665

Merged
merged 19 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.broadinstitute.dsde.rawls.{
RawlsFatalExceptionWithErrorReport
}
import slick.dbio.Effect.Read
import slick.jdbc.{GetResult, JdbcProfile, SQLActionBuilder}
import slick.jdbc.{GetResult, JdbcProfile, ResultSetConcurrency, ResultSetType, SQLActionBuilder, TransactionIsolation}
import slick.sql.SqlStreamingAction

import java.sql.Timestamp
Expand Down Expand Up @@ -421,11 +421,75 @@ trait EntityComponent {
)
}

def activeActionForPagination(workspaceContext: Workspace,
entityType: String,
entityQuery: model.EntityQuery,
parentContext: RawlsRequestContext
): ReadWriteAction[(Int, Int, Seq[EntityAndAttributesResult])] = {
// generate the clause to filter based on user search terms
def paginationFilterSql(prefix: String, alias: String, entityQuery: model.EntityQuery) = {
val filtersOption = entityQuery.filterTerms.map {
_.split(" ").toSeq.map { term =>
sql"concat(#$alias.name, ' ', #$alias.all_attribute_values) like ${'%' + term.toLowerCase + '%'}"
}
}

filtersOption match {
case None => sql""
case Some(filters) =>
concatSqlActions(
sql"#$prefix (",
reduceSqlActionsWithDelim(filters, sql" #${FilterOperators.toSql(entityQuery.filterOperator)} "),
sql") "
)
}
}

def activeActionForMetadata(workspaceContext: Workspace,
entityType: String,
entityQuery: model.EntityQuery,
parentContext: RawlsRequestContext
): ReadWriteAction[(Int, Int)] = {
// standalone query to calculate the count of results that match our filter
def filteredCountQuery: ReadAction[Vector[Int]] =
entityQuery.columnFilter match {
case Some(columnFilter) =>
sql"""select count(distinct(e.id)) from ENTITY e, ENTITY_ATTRIBUTE_#${determineShard(
workspaceContext.workspaceIdAsUUID
)} a
where a.owner_id = e.id
and e.deleted = 0
and e.entity_type = $entityType
and e.workspace_id = ${workspaceContext.workspaceIdAsUUID}
and a.namespace = ${columnFilter.attributeName.namespace}
and a.name = ${columnFilter.attributeName.name}
and COALESCE(a.value_string, a.value_number) = ${columnFilter.term}
""".as[Int]
case _ =>
val filteredQuery =
sql"""select count(1) from ENTITY e
where e.deleted = 0
and e.entity_type = $entityType
and e.workspace_id = ${workspaceContext.workspaceIdAsUUID} """
concatSqlActions(filteredQuery, paginationFilterSql("and", "e", entityQuery)).as[Int]
}

for {
unfilteredCount <- traceDBIOWithParent("findActiveEntityByType", parentContext)(_ =>
findActiveEntityByType(workspaceContext.workspaceIdAsUUID, entityType).length.result
)
filteredCount <-
if (entityQuery.filterTerms.isEmpty && entityQuery.columnFilter.isEmpty) {
// if the query has no filter, then "filteredCount" and "unfilteredCount" will always be the same; no need to make another query
DBIO.successful(Vector(unfilteredCount))
} else {
traceDBIOWithParent("filteredCountQuery", parentContext)(_ => filteredCountQuery)
}

} yield (unfilteredCount, filteredCount.head)
}
// END activeActionForMetadata

def activeActionForEntityAndAttributesSource(workspaceContext: Workspace,
entityType: String,
entityQuery: model.EntityQuery,
parentContext: RawlsRequestContext
): SqlStreamingAction[Seq[EntityAndAttributesResult], EntityAndAttributesResult, Read] = {
/*
Lots of conditionals in here, to achieve the optimal SQL query for any given request. Pseudocode:

Expand All @@ -443,25 +507,6 @@ trait EntityComponent {
sort by attribute, limit, offset) to get the proper pagination
*/

// generate the clause to filter based on user search terms
def filterSql(prefix: String, alias: String) = {
val filtersOption = entityQuery.filterTerms.map {
_.split(" ").toSeq.map { term =>
sql"concat(#$alias.name, ' ', #$alias.all_attribute_values) like ${'%' + term.toLowerCase + '%'}"
}
}

filtersOption match {
case None => sql""
case Some(filters) =>
concatSqlActions(
sql"#$prefix (",
reduceSqlActionsWithDelim(filters, sql" #${FilterOperators.toSql(entityQuery.filterOperator)} "),
sql") "
)
}
}

// generate the clauses to limit the attributes returned to the user
def attrSelectionSql(prefix: String) = {
val fieldsOption = entityQuery.fields.fields.map { fieldList =>
Expand Down Expand Up @@ -524,36 +569,12 @@ trait EntityComponent {
val paginationJoin = concatSqlActions(
sql""" join (""",
paginationSubquery(workspaceContext.workspaceIdAsUUID, entityType, entityQuery.sortField),
filterSql("and", "e"),
paginationFilterSql("and", "e", entityQuery),
filterByColumn,
order(""),
sql" limit #${entityQuery.pageSize} offset #${(entityQuery.page - 1) * entityQuery.pageSize} ) p on p.id = e.id "
)

// standalone query to calculate the count of results that match our filter
def filteredCountQuery: ReadAction[Vector[Int]] =
entityQuery.columnFilter match {
case Some(columnFilter) =>
sql"""select count(distinct(e.id)) from ENTITY e, ENTITY_ATTRIBUTE_#${determineShard(
workspaceContext.workspaceIdAsUUID
)} a
where a.owner_id = e.id
and e.deleted = 0
and e.entity_type = $entityType
and e.workspace_id = ${workspaceContext.workspaceIdAsUUID}
and a.namespace = ${columnFilter.attributeName.namespace}
and a.name = ${columnFilter.attributeName.name}
and COALESCE(a.value_string, a.value_number) = ${columnFilter.term}
""".as[Int]
case _ =>
val filteredQuery =
sql"""select count(1) from ENTITY e
where e.deleted = 0
and e.entity_type = $entityType
and e.workspace_id = ${workspaceContext.workspaceIdAsUUID} """
concatSqlActions(filteredQuery, filterSql("and", "e")).as[Int]
}

// the full query to generate the page of results, as requested by the user
def pageQuery = {
// did the user request any attributes other than "name" and "entityType"?
Expand All @@ -573,7 +594,7 @@ trait EntityComponent {
sql"""select e.id, e.name, e.entity_type, e.workspace_id, e.record_version, e.deleted, e.deleted_date, null
from ENTITY e
where e.deleted = 'false' and e.entity_type = $entityType and e.workspace_id = ${workspaceContext.workspaceIdAsUUID} """,
filterSql("and", "e"),
paginationFilterSql("and", "e", entityQuery),
order("e"),
sql" limit #${entityQuery.pageSize} offset #${(entityQuery.page - 1) * entityQuery.pageSize}"
).as[EntityAndAttributesResult]
Expand All @@ -600,29 +621,17 @@ trait EntityComponent {
}
}

for {
unfilteredCount <- traceDBIOWithParent("findActiveEntityByType", parentContext)(_ =>
findActiveEntityByType(workspaceContext.workspaceIdAsUUID, entityType).length.result
)
filteredCount <-
if (entityQuery.filterTerms.isEmpty && entityQuery.columnFilter.isEmpty) {
// if the query has no filter, then "filteredCount" and "unfilteredCount" will always be the same; no need to make another query
DBIO.successful(Vector(unfilteredCount))
} else {
traceDBIOWithParent("filteredCountQuery", parentContext)(_ => filteredCountQuery)
}
page <- traceDBIOWithParent("pageQuery", parentContext)(_ => pageQuery)
} yield (unfilteredCount, filteredCount.head, page)
pageQuery
}
// END activeActionForPagination
// END activeActionForEntityAndAttributesSource

// actions which may include "deleted" hidden entities

def actionForTypeName(workspaceContext: Workspace,
def streamForTypeName(workspaceContext: Workspace,
entityType: String,
entityName: String,
desiredFields: Set[AttributeName]
): ReadAction[Seq[EntityAndAttributesResult]] = {
): SqlStreamingAction[Seq[EntityAndAttributesResult], EntityAndAttributesResult, Read] = {
// user requested specific attributes. include them in the where clause.
val attrNamespaceNameTuples = reduceSqlActionsWithDelim(desiredFields.toSeq.map { attrName =>
sql"(${attrName.namespace}, ${attrName.name})"
Expand Down Expand Up @@ -870,7 +879,7 @@ trait EntityComponent {
entityName: String,
desiredFields: Set[AttributeName] = Set.empty
): ReadAction[Option[Entity]] =
EntityAndAttributesRawSqlQuery.actionForTypeName(workspaceContext, entityType, entityName, desiredFields) map (
EntityAndAttributesRawSqlQuery.streamForTypeName(workspaceContext, entityType, entityName, desiredFields) map (
query => unmarshalEntities(query)
) map (_.headOption)

Expand Down Expand Up @@ -959,12 +968,28 @@ trait EntityComponent {
(unfilteredCount, page.size, page)
}

// get paginated entities for UI display, as a result of executing a query
def loadEntityPage(workspaceContext: Workspace,
entityType: String,
entityQuery: model.EntityQuery,
parentContext: RawlsRequestContext
): ReadWriteAction[(Int, Int, Iterable[Entity])] = {
def loadEntityPageCounts(workspaceContext: Workspace,
entityType: String,
entityQuery: model.EntityQuery,
parentContext: RawlsRequestContext
): ReadWriteAction[(Int, Int)] =
EntityAndAttributesRawSqlQuery.activeActionForMetadata(workspaceContext, entityType, entityQuery, parentContext)

/**
* Returns a streaming result set of EntityAndAttributesResult objects, representing the individual attributes
* within a set of Entities. Respects pagination, filtering, sorting, and other features of EntityQuery.
*
* @param workspaceContext the workspace containing the entities to be queried
* @param entityType the type of entities to be queried
* @param entityQuery criteria for querying entities
* @param parentContext a tracing context under which this method should add its own traces
* @return the result set, configured for streaming
*/
def loadEntityPageSource(workspaceContext: Workspace,
entityType: String,
entityQuery: model.EntityQuery,
parentContext: RawlsRequestContext
): SqlStreamingAction[Seq[EntityAndAttributesResult], EntityAndAttributesResult, Read] = {
// look for a columnFilter that specifies the primary key for this entityType;
// such a columnFilter means we are filtering by name and can greatly simplify the underlying query.
val nameFilter: Option[String] = entityQuery.columnFilter match {
Expand All @@ -976,29 +1001,21 @@ trait EntityComponent {
case _ => None
}

// Note: if the user specified a column filter, we do not validate the specified column
// exists. Instead, we return 0 results. We may want to add such validation at a later point.

// if filtering by name, retrieve that entity directly, else do the full query:
parentContext.tracingSpan.foreach { span =>
span.putAttribute("isFilterByName", OpenCensusAttributeValue.booleanAttributeValue(nameFilter.isDefined))
}
nameFilter match {
case Some(entityName) =>
parentContext.tracingSpan.map { span =>
span.putAttribute("isFilterByName", OpenCensusAttributeValue.booleanAttributeValue(true))
}
loadSingleEntityForPage(workspaceContext, entityType, entityName, entityQuery)
val desiredFields = entityQuery.fields.fields.getOrElse(Set.empty).map(AttributeName.fromDelimitedName)
EntityAndAttributesRawSqlQuery
.streamForTypeName(workspaceContext, entityType, entityName, desiredFields)
case _ =>
parentContext.tracingSpan.map { span =>
span.putAttribute("isFilterByName", OpenCensusAttributeValue.booleanAttributeValue(false))
}
EntityAndAttributesRawSqlQuery.activeActionForPagination(workspaceContext,
entityType,
entityQuery,
parentContext
) map { case (unfilteredCount, filteredCount, pagination) =>
(unfilteredCount, filteredCount, unmarshalEntities(pagination, validateOrdering = true))
}
EntityAndAttributesRawSqlQuery
.activeActionForEntityAndAttributesSource(workspaceContext, entityType, entityQuery, parentContext)
}
}
// END loadEntityPageSource

// create or replace entities

Expand Down
Loading
Loading