From af156fab4f8b25045a72e0b01152b3ee95e7dd56 Mon Sep 17 00:00:00 2001 From: cugni Date: Wed, 11 Dec 2024 16:50:43 +0100 Subject: [PATCH 1/4] Rebasing the changes form #455 --- .../scala/io/qbeast/spark/writer/Rollup.scala | 13 +++++--- .../delta/DeltaRollupDataWriterTest.scala | 4 +-- .../io/qbeast/spark/writer/RollupTest.scala | 33 +++++++++++++++++-- 3 files changed, 41 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala b/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala index dcefe5628..e976379fa 100644 --- a/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala +++ b/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala @@ -60,12 +60,15 @@ private[writer] class Rollup(limit: Double) { val cubeId = queue.dequeue() val group = groups(cubeId) if (group.size < limit && !cubeId.isRoot) { - val Some(parentCubeId) = cubeId.parent - if (groups.contains(parentCubeId)) { - groups(parentCubeId).add(group) + val nextInLine = cubeId.nextSibling match { + case Some(a) => a + case None => cubeId.parent.get + } + if (groups.contains(nextInLine)) { + groups(nextInLine).add(group) } else { - groups.put(parentCubeId, group) - queue.enqueue(parentCubeId) + groups.put(nextInLine, group) + queue.enqueue(nextInLine) } groups.remove(cubeId) } diff --git a/src/test/scala/io/qbeast/spark/delta/DeltaRollupDataWriterTest.scala b/src/test/scala/io/qbeast/spark/delta/DeltaRollupDataWriterTest.scala index 60a5c726e..44eeb0f21 100644 --- a/src/test/scala/io/qbeast/spark/delta/DeltaRollupDataWriterTest.scala +++ b/src/test/scala/io/qbeast/spark/delta/DeltaRollupDataWriterTest.scala @@ -71,7 +71,7 @@ class DeltaRollupDataWriterTest extends QbeastIntegrationTestSpec { } it should "compute rollup correctly when optimizing" in - withSparkAndTmpDir { (spark, tmpDir) => + withSparkAndTmpDir { (_, tmpDir) => val revision = Revision(1L, 0, QTableID(tmpDir), 20, Vector(EmptyTransformer("col_1")), Vector.empty) @@ -86,7 +86,7 @@ class DeltaRollupDataWriterTest extends QbeastIntegrationTestSpec { Map(root -> 20L, c1 -> 1L, c2 -> 20L)) val rollup = DeltaRollupDataWriter.computeRollup(tc) - rollup shouldBe Map(root -> root, c1 -> root, c2 -> c2) + rollup shouldBe Map(root -> root, c1 -> c2, c2 -> c2) } } diff --git a/src/test/scala/io/qbeast/spark/writer/RollupTest.scala b/src/test/scala/io/qbeast/spark/writer/RollupTest.scala index 2aceec6fd..210933a92 100644 --- a/src/test/scala/io/qbeast/spark/writer/RollupTest.scala +++ b/src/test/scala/io/qbeast/spark/writer/RollupTest.scala @@ -24,7 +24,7 @@ import org.scalatest.matchers.should.Matchers */ class RollupTest extends AnyFlatSpec with Matchers { - "Rollup" should "work correctly" in { + "Rollup" should "work correctly with basic cube structure" in { val root = CubeId.root(1) val c0 = root.firstChild val c1 = c0.nextSibling.get @@ -44,8 +44,37 @@ class RollupTest extends AnyFlatSpec with Matchers { result(root) shouldBe root result(c00) shouldBe c0 result(c01) shouldBe c0 - result(c10) shouldBe root + result(c10) shouldBe c11 // rolliing up into the next siblings. result(c11) shouldBe c11 } + it should "handle empty rollup" in { + val result = new Rollup(3).compute() + result shouldBe empty + } + + it should "handle single cube" in { + val root = CubeId.root(1) + val result = new Rollup(3) + .populate(root, 2) + .compute() + + result(root) shouldBe root + } + + it should "roll up to parent when size exceeds limit" in { + val root = CubeId.root(1) + val kids = root.children.toSeq + val child = kids(0) + val grandChild = kids(1) + + val result = new Rollup(2) + .populate(root, 1) + .populate(child, 2) + .populate(grandChild, 3) // Exceeds limit + .compute() + + result(grandChild) shouldBe grandChild + } + } From 2f7a331a67858932a6b94673b033e89f2cf0d13e Mon Sep 17 00:00:00 2001 From: jiawei Date: Fri, 13 Dec 2024 16:09:12 +0100 Subject: [PATCH 2/4] Ensure sibling cubes are checked in the correct order(left to right) --- .../scala/io/qbeast/spark/writer/Rollup.scala | 27 ++++++- .../io/qbeast/spark/writer/RollupTest.scala | 71 ++++++++++--------- 2 files changed, 64 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala b/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala index e976379fa..de40c4ce9 100644 --- a/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala +++ b/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala @@ -54,7 +54,7 @@ private[writer] class Rollup(limit: Double) { * the rollup result */ def compute(): Map[CubeId, CubeId] = { - val queue = new mutable.PriorityQueue()(Ordering.by[CubeId, Int](_.depth)) + val queue = new mutable.PriorityQueue()(CubeIdOrdering) groups.keys.foreach(queue.enqueue(_)) while (queue.nonEmpty) { val cubeId = queue.dequeue() @@ -78,6 +78,31 @@ private[writer] class Rollup(limit: Double) { }.toMap } + /* + * Ordering for cube identifiers. The cube identifiers are ordered by their depth in ascending + * order. If the depth is the same then the cube identifiers are ordered by in reverse order. + * This ordering is used in the priority queue to process the cube identifiers in the correct + * order, i.e., from the deepest to the shallowest, and from the leftmost to the rightmost: + * 0 root + * 1 c0 c1 + * 2 c00 c01 c10 c11 + * The priority queue will process the cube identifiers in the following order: + * c00, c01, c10, c11, c0, c1, root. + * c00 -> c01 -> c0, c10 -> c11 -> c1, c0 -> c1 -> root + */ + private object CubeIdOrdering extends Ordering[CubeId] { + + override def compare(x: CubeId, y: CubeId): Int = { + val depthComparison = x.depth.compareTo(y.depth) + if (depthComparison == 0) { + y.compare(x) + } else { + depthComparison + } + } + + } + private class Group(val cubeIds: mutable.Set[CubeId], var size: Long) { def add(cubeId: CubeId, size: Long): Unit = { diff --git a/src/test/scala/io/qbeast/spark/writer/RollupTest.scala b/src/test/scala/io/qbeast/spark/writer/RollupTest.scala index 210933a92..a16a82396 100644 --- a/src/test/scala/io/qbeast/spark/writer/RollupTest.scala +++ b/src/test/scala/io/qbeast/spark/writer/RollupTest.scala @@ -25,27 +25,47 @@ import org.scalatest.matchers.should.Matchers class RollupTest extends AnyFlatSpec with Matchers { "Rollup" should "work correctly with basic cube structure" in { - val root = CubeId.root(1) + // root(100) + // | + // c0(100) + // / | \ \ + // c00(50) c01(50) c02(50) c03(50) + val root = CubeId.root(2) val c0 = root.firstChild - val c1 = c0.nextSibling.get - val c00 = c0.firstChild - val c01 = c00.nextSibling.get - val c10 = c1.firstChild - val c11 = c10.nextSibling.get + val Seq(c00, c01, c02, c03) = c0.children.toSeq - val result = new Rollup(3) - .populate(root, 1) - .populate(c00, 1) - .populate(c01, 2) - .populate(c10, 2) - .populate(c11, 3) - .compute() + val rollup = new Rollup(100) + rollup.populate(root, 100) + rollup.populate(c0, 100) + rollup.populate(c00, 50) + rollup.populate(c01, 50) + rollup.populate(c02, 50) + rollup.populate(c03, 50) - result(root) shouldBe root - result(c00) shouldBe c0 - result(c01) shouldBe c0 - result(c10) shouldBe c11 // rolliing up into the next siblings. - result(c11) shouldBe c11 + rollup.compute() shouldBe Map( + root -> root, + c0 -> c0, + c00 -> c01, + c01 -> c01, + c02 -> c03, + c03 -> c03) + } + + it should "rollup a cube up to the parent after checking all sibling cubes" in { + // root(100) + // / | \ \ + // c0(20) c1(20) c2(20) c3(20) + val root = CubeId.root(2) + val Seq(c0, c1, c2, c3) = root.children.toSeq + + val rollup = new Rollup(100) + rollup.populate(root, 100) + rollup.populate(c0, 20) + rollup.populate(c1, 20) + rollup.populate(c2, 20) + rollup.populate(c3, 20) + + rollup.compute() shouldBe Map(root -> root, c0 -> root, c1 -> root, c2 -> root, c3 -> root) } it should "handle empty rollup" in { @@ -62,19 +82,4 @@ class RollupTest extends AnyFlatSpec with Matchers { result(root) shouldBe root } - it should "roll up to parent when size exceeds limit" in { - val root = CubeId.root(1) - val kids = root.children.toSeq - val child = kids(0) - val grandChild = kids(1) - - val result = new Rollup(2) - .populate(root, 1) - .populate(child, 2) - .populate(grandChild, 3) // Exceeds limit - .compute() - - result(grandChild) shouldBe grandChild - } - } From ef3928fc62d66ce551d883c99b682831886f9aef Mon Sep 17 00:00:00 2001 From: jiawei Date: Fri, 13 Dec 2024 17:41:04 +0100 Subject: [PATCH 3/4] Rollup is made more efficient --- .../main/scala/io/qbeast/spark/writer/Rollup.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala b/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala index de40c4ce9..70ebd0f6f 100644 --- a/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala +++ b/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala @@ -60,9 +60,9 @@ private[writer] class Rollup(limit: Double) { val cubeId = queue.dequeue() val group = groups(cubeId) if (group.size < limit && !cubeId.isRoot) { - val nextInLine = cubeId.nextSibling match { - case Some(a) => a - case None => cubeId.parent.get + val nextInLine = queue.headOption match { + case Some(cube) if areSiblings(cube, cubeId) => cube + case _ => cubeId.parent.get } if (groups.contains(nextInLine)) { groups(nextInLine).add(group) @@ -78,6 +78,12 @@ private[writer] class Rollup(limit: Double) { }.toMap } + private def areSiblings(cube_a: CubeId, cube_b: CubeId): Boolean = { + val sameParent = cube_a.parent == cube_b.parent + val differentCube = cube_a != cube_b + sameParent && differentCube + } + /* * Ordering for cube identifiers. The cube identifiers are ordered by their depth in ascending * order. If the depth is the same then the cube identifiers are ordered by in reverse order. From 57bb9752316a6dbca6862887841d2a195ea1db7a Mon Sep 17 00:00:00 2001 From: jiawei Date: Mon, 16 Dec 2024 14:05:20 +0100 Subject: [PATCH 4/4] Add ordering test --- .../scala/io/qbeast/spark/writer/Rollup.scala | 15 ++++----- .../io/qbeast/spark/writer/RollupTest.scala | 31 +++++++++++++++++++ 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala b/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala index 70ebd0f6f..54550d5cc 100644 --- a/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala +++ b/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala @@ -54,7 +54,7 @@ private[writer] class Rollup(limit: Double) { * the rollup result */ def compute(): Map[CubeId, CubeId] = { - val queue = new mutable.PriorityQueue()(CubeIdOrdering) + val queue = new mutable.PriorityQueue()(CubeIdRollupOrdering) groups.keys.foreach(queue.enqueue(_)) while (queue.nonEmpty) { val cubeId = queue.dequeue() @@ -78,11 +78,12 @@ private[writer] class Rollup(limit: Double) { }.toMap } - private def areSiblings(cube_a: CubeId, cube_b: CubeId): Boolean = { - val sameParent = cube_a.parent == cube_b.parent - val differentCube = cube_a != cube_b - sameParent && differentCube - } + /** + * Checks if the given cube identifiers are siblings. Two cube identifiers are siblings if they + * have the same parent. It is assumed that the cube identifiers are different. + */ + private def areSiblings(cube_a: CubeId, cube_b: CubeId): Boolean = + cube_a.parent == cube_b.parent /* * Ordering for cube identifiers. The cube identifiers are ordered by their depth in ascending @@ -96,7 +97,7 @@ private[writer] class Rollup(limit: Double) { * c00, c01, c10, c11, c0, c1, root. * c00 -> c01 -> c0, c10 -> c11 -> c1, c0 -> c1 -> root */ - private object CubeIdOrdering extends Ordering[CubeId] { + private[writer] object CubeIdRollupOrdering extends Ordering[CubeId] { override def compare(x: CubeId, y: CubeId): Int = { val depthComparison = x.depth.compareTo(y.depth) diff --git a/src/test/scala/io/qbeast/spark/writer/RollupTest.scala b/src/test/scala/io/qbeast/spark/writer/RollupTest.scala index a16a82396..b497094d4 100644 --- a/src/test/scala/io/qbeast/spark/writer/RollupTest.scala +++ b/src/test/scala/io/qbeast/spark/writer/RollupTest.scala @@ -19,6 +19,8 @@ import io.qbeast.core.model.CubeId import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import scala.collection.mutable + /** * Tests of Rollup. */ @@ -82,4 +84,33 @@ class RollupTest extends AnyFlatSpec with Matchers { result(root) shouldBe root } + it should "order CubeIds correctly" in { + val root = CubeId.root(4) + val Seq(c0, c1, c2) = root.children.take(3).toSeq + val Seq(c00, c01, c02, c03) = c0.children.take(4).toSeq + val Seq(c10, c11, c12, c13) = c1.children.take(4).toSeq + val Seq(c20, c21, c22, c23) = c2.children.take(4).toSeq + + val rollupCubeOrdering = new Rollup(0).CubeIdRollupOrdering + val queue = new mutable.PriorityQueue()(rollupCubeOrdering) + + queue.enqueue(root, c0, c1, c2, c00, c01, c02, c03, c10, c11, c12, c13, c20, c21, c22, c23) + queue.dequeue() shouldBe c00 + queue.dequeue() shouldBe c01 + queue.dequeue() shouldBe c02 + queue.dequeue() shouldBe c03 + queue.dequeue() shouldBe c10 + queue.dequeue() shouldBe c11 + queue.dequeue() shouldBe c12 + queue.dequeue() shouldBe c13 + queue.dequeue() shouldBe c20 + queue.dequeue() shouldBe c21 + queue.dequeue() shouldBe c22 + queue.dequeue() shouldBe c23 + queue.dequeue() shouldBe c0 + queue.dequeue() shouldBe c1 + queue.dequeue() shouldBe c2 + queue.dequeue() shouldBe root + } + }