Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #455: Next in line Rollup (rebased) #514

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 41 additions & 6 deletions core/src/main/scala/io/qbeast/spark/writer/Rollup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,21 @@ private[writer] class Rollup(limit: Double) {
* the rollup result
*/
def compute(): Map[CubeId, CubeId] = {
val queue = new mutable.PriorityQueue()(Ordering.by[CubeId, Int](_.depth))
val queue = new mutable.PriorityQueue()(CubeIdRollupOrdering)
groups.keys.foreach(queue.enqueue(_))
while (queue.nonEmpty) {
val cubeId = queue.dequeue()
val group = groups(cubeId)
if (group.size < limit && !cubeId.isRoot) {
val Some(parentCubeId) = cubeId.parent
if (groups.contains(parentCubeId)) {
groups(parentCubeId).add(group)
val nextInLine = queue.headOption match {
case Some(cube) if areSiblings(cube, cubeId) => cube
case _ => cubeId.parent.get
}
if (groups.contains(nextInLine)) {
groups(nextInLine).add(group)
} else {
groups.put(parentCubeId, group)
queue.enqueue(parentCubeId)
groups.put(nextInLine, group)
queue.enqueue(nextInLine)
}
groups.remove(cubeId)
}
Expand All @@ -75,6 +78,38 @@ private[writer] class Rollup(limit: Double) {
}.toMap
}

/**
* Checks if the given cube identifiers are siblings. Two cube identifiers are siblings if they
* have the same parent. It is assumed that the cube identifiers are different.
*/
private def areSiblings(cube_a: CubeId, cube_b: CubeId): Boolean =
cube_a.parent == cube_b.parent

/*
* Ordering for cube identifiers. The cube identifiers are ordered by their depth in ascending
* order. If the depth is the same then the cube identifiers are ordered by in reverse order.
* This ordering is used in the priority queue to process the cube identifiers in the correct
* order, i.e., from the deepest to the shallowest, and from the leftmost to the rightmost:
* 0 root
* 1 c0 c1
* 2 c00 c01 c10 c11
* The priority queue will process the cube identifiers in the following order:
* c00, c01, c10, c11, c0, c1, root.
* c00 -> c01 -> c0, c10 -> c11 -> c1, c0 -> c1 -> root
*/
private[writer] object CubeIdRollupOrdering extends Ordering[CubeId] {

override def compare(x: CubeId, y: CubeId): Int = {
val depthComparison = x.depth.compareTo(y.depth)
if (depthComparison == 0) {
y.compare(x)
} else {
depthComparison
}
}

}

private class Group(val cubeIds: mutable.Set[CubeId], var size: Long) {

def add(cubeId: CubeId, size: Long): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class DeltaRollupDataWriterTest extends QbeastIntegrationTestSpec {
}

it should "compute rollup correctly when optimizing" in
withSparkAndTmpDir { (spark, tmpDir) =>
withSparkAndTmpDir { (_, tmpDir) =>
val revision =
Revision(1L, 0, QTableID(tmpDir), 20, Vector(EmptyTransformer("col_1")), Vector.empty)

Expand All @@ -86,7 +86,7 @@ class DeltaRollupDataWriterTest extends QbeastIntegrationTestSpec {
Map(root -> 20L, c1 -> 1L, c2 -> 20L))

val rollup = DeltaRollupDataWriter.computeRollup(tc)
rollup shouldBe Map(root -> root, c1 -> root, c2 -> c2)
rollup shouldBe Map(root -> root, c1 -> c2, c2 -> c2)
}

}
97 changes: 81 additions & 16 deletions src/test/scala/io/qbeast/spark/writer/RollupTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,98 @@ import io.qbeast.core.model.CubeId
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import scala.collection.mutable

/**
* Tests of Rollup.
*/
class RollupTest extends AnyFlatSpec with Matchers {

"Rollup" should "work correctly" in {
val root = CubeId.root(1)
"Rollup" should "work correctly with basic cube structure" in {
// root(100)
// |
// c0(100)
// / | \ \
// c00(50) c01(50) c02(50) c03(50)
val root = CubeId.root(2)
val c0 = root.firstChild
val c1 = c0.nextSibling.get
val c00 = c0.firstChild
val c01 = c00.nextSibling.get
val c10 = c1.firstChild
val c11 = c10.nextSibling.get
val Seq(c00, c01, c02, c03) = c0.children.toSeq

val rollup = new Rollup(100)
rollup.populate(root, 100)
rollup.populate(c0, 100)
rollup.populate(c00, 50)
rollup.populate(c01, 50)
rollup.populate(c02, 50)
rollup.populate(c03, 50)

rollup.compute() shouldBe Map(
root -> root,
c0 -> c0,
c00 -> c01,
c01 -> c01,
c02 -> c03,
c03 -> c03)
}

it should "rollup a cube up to the parent after checking all sibling cubes" in {
// root(100)
// / | \ \
// c0(20) c1(20) c2(20) c3(20)
val root = CubeId.root(2)
val Seq(c0, c1, c2, c3) = root.children.toSeq

val rollup = new Rollup(100)
rollup.populate(root, 100)
rollup.populate(c0, 20)
rollup.populate(c1, 20)
rollup.populate(c2, 20)
rollup.populate(c3, 20)

rollup.compute() shouldBe Map(root -> root, c0 -> root, c1 -> root, c2 -> root, c3 -> root)
}

it should "handle empty rollup" in {
val result = new Rollup(3).compute()
result shouldBe empty
}

it should "handle single cube" in {
val root = CubeId.root(1)
val result = new Rollup(3)
.populate(root, 1)
.populate(c00, 1)
.populate(c01, 2)
.populate(c10, 2)
.populate(c11, 3)
.populate(root, 2)
.compute()

result(root) shouldBe root
result(c00) shouldBe c0
result(c01) shouldBe c0
result(c10) shouldBe root
result(c11) shouldBe c11
}

it should "order CubeIds correctly" in {
val root = CubeId.root(4)
val Seq(c0, c1, c2) = root.children.take(3).toSeq
val Seq(c00, c01, c02, c03) = c0.children.take(4).toSeq
val Seq(c10, c11, c12, c13) = c1.children.take(4).toSeq
val Seq(c20, c21, c22, c23) = c2.children.take(4).toSeq

val rollupCubeOrdering = new Rollup(0).CubeIdRollupOrdering
val queue = new mutable.PriorityQueue()(rollupCubeOrdering)

queue.enqueue(root, c0, c1, c2, c00, c01, c02, c03, c10, c11, c12, c13, c20, c21, c22, c23)
queue.dequeue() shouldBe c00
queue.dequeue() shouldBe c01
queue.dequeue() shouldBe c02
queue.dequeue() shouldBe c03
queue.dequeue() shouldBe c10
queue.dequeue() shouldBe c11
queue.dequeue() shouldBe c12
queue.dequeue() shouldBe c13
queue.dequeue() shouldBe c20
queue.dequeue() shouldBe c21
queue.dequeue() shouldBe c22
queue.dequeue() shouldBe c23
queue.dequeue() shouldBe c0
queue.dequeue() shouldBe c1
queue.dequeue() shouldBe c2
queue.dequeue() shouldBe root
}

}
Loading