Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
isLeftUnique: JBoolean,
isRightUnique: JBoolean): JSet[ImmutableBitSet] = {

// first add the different combinations of concatenated unique keys
// First add the different combinations of concatenated unique keys
// from the left and the right, adjusting the right hand side keys to
// reflect the addition of the left hand side
//
Expand All @@ -576,15 +576,25 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
// an alternative way of getting unique key information.
val retSet = new JHashSet[ImmutableBitSet]
val nFieldsOnLeft = leftFieldsCount
val rightSet = if (rightUniqueKeys != null) {

// We only want to do this if the right side has unique keys AND
// they cannot be null. If the side of the join can generate null keys,
// then we can't use it in the unique key.
val rightSet = if (rightUniqueKeys != null && !joinRelType.generatesNullsOnRight) {
val res = new JHashSet[ImmutableBitSet]
// This only collects the unique keys from the right side, offsetting
// the column positions appropriately.
rightUniqueKeys.foreach {
colMask =>
val tmpMask = ImmutableBitSet.builder
colMask.foreach(bit => tmpMask.set(bit + nFieldsOnLeft))
res.add(tmpMask.build())
}
if (leftUniqueKeys != null) {

// If the left side has unique keys AND the keys cannot be null,
// then we can combine them with the right side unique keys to form
// a superset of unique keys.
if (leftUniqueKeys != null && !joinRelType.generatesNullsOnLeft) {
res.foreach {
// 1) Concatenate unique keys from both sides to get a superset that is unique.
// If left is unique on {0,1} and right on {0}, then {0,1} (after offset) remains unique,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,25 +299,22 @@ class FlinkRelMdUniqueKeysTest extends FlinkRelMdHandlerTestBase {
assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalInnerJoinWithoutEquiCond).toSet)
assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalInnerJoinWithEquiAndNonEquiCond).toSet)

assertEquals(
uniqueKeys(Array(1), Array(1, 5), Array(1, 5, 6)),
mq.getUniqueKeys(logicalLeftJoinOnUniqueKeys).toSet)
assertEquals(uniqueKeys(Array(1)), mq.getUniqueKeys(logicalLeftJoinOnUniqueKeys).toSet)

assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalLeftJoinNotOnUniqueKeys).toSet)
assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalLeftJoinOnRHSUniqueKeys).toSet)
assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalLeftJoinWithoutEquiCond).toSet)
assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalLeftJoinWithEquiAndNonEquiCond).toSet)

assertEquals(
uniqueKeys(Array(5), Array(1, 5), Array(5, 6), Array(1, 5, 6)),
uniqueKeys(Array(5), Array(5, 6)),
mq.getUniqueKeys(logicalRightJoinOnUniqueKeys).toSet)
assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalRightJoinNotOnUniqueKeys).toSet)
assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalRightJoinOnLHSUniqueKeys).toSet)
assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalRightJoinWithoutEquiCond).toSet)
assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalRightJoinWithEquiAndNonEquiCond).toSet)

assertEquals(
uniqueKeys(Array(1, 5), Array(1, 5, 6)),
mq.getUniqueKeys(logicalFullJoinOnUniqueKeys).toSet)
assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalFullJoinOnUniqueKeys).toSet)
assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalFullJoinNotOnUniqueKeys).toSet)
assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalFullJoinOnRHSUniqueKeys).toSet)
assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalFullJoinWithoutEquiCond).toSet)
Expand Down Expand Up @@ -381,11 +378,11 @@ class FlinkRelMdUniqueKeysTest extends FlinkRelMdHandlerTestBase {
@Test
def testGetUniqueKeysOnTableScanTable(): Unit = {
assertEquals(
uniqueKeys(Array(0, 1), Array(0, 1, 5)),
uniqueKeys(Array(0, 1)),
mq.getUniqueKeys(logicalLeftJoinOnContainedUniqueKeys).toSet
)
assertEquals(
uniqueKeys(Array(0, 1, 5)),
uniqueKeys(),
mq.getUniqueKeys(logicalLeftJoinOnDisjointUniqueKeys).toSet
)
assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,25 +327,21 @@ class FlinkRelMdUpsertKeysTest extends FlinkRelMdHandlerTestBase {
assertEquals(toBitSet(), mq.getUpsertKeys(logicalInnerJoinWithoutEquiCond).toSet)
assertEquals(toBitSet(), mq.getUpsertKeys(logicalInnerJoinWithEquiAndNonEquiCond).toSet)

assertEquals(
toBitSet(Array(1), Array(1, 5), Array(1, 5, 6)),
mq.getUpsertKeys(logicalLeftJoinOnUniqueKeys).toSet)
assertEquals(toBitSet(Array(1)), mq.getUpsertKeys(logicalLeftJoinOnUniqueKeys).toSet)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am i missing something, I do not see tests for nulls on each side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above is the test: 5, and 6 were columns coming from the right side of the join. So they can't be included in unique keys adn that's why "Array(1, 5), Array(1, 5, 6)" were remove as possible unique keys.

The same for all the other tests that were updated, they test the new behaviour

assertEquals(toBitSet(), mq.getUpsertKeys(logicalLeftJoinNotOnUniqueKeys).toSet)
assertEquals(toBitSet(), mq.getUpsertKeys(logicalLeftJoinOnRHSUniqueKeys).toSet)
assertEquals(toBitSet(), mq.getUpsertKeys(logicalLeftJoinWithoutEquiCond).toSet)
assertEquals(toBitSet(), mq.getUpsertKeys(logicalLeftJoinWithEquiAndNonEquiCond).toSet)

assertEquals(
toBitSet(Array(5), Array(1, 5), Array(5, 6), Array(1, 5, 6)),
toBitSet(Array(5), Array(5, 6)),
mq.getUpsertKeys(logicalRightJoinOnUniqueKeys).toSet)
assertEquals(toBitSet(), mq.getUpsertKeys(logicalRightJoinNotOnUniqueKeys).toSet)
assertEquals(toBitSet(), mq.getUpsertKeys(logicalRightJoinOnLHSUniqueKeys).toSet)
assertEquals(toBitSet(), mq.getUpsertKeys(logicalRightJoinWithoutEquiCond).toSet)
assertEquals(toBitSet(), mq.getUpsertKeys(logicalRightJoinWithEquiAndNonEquiCond).toSet)

assertEquals(
toBitSet(Array(1, 5), Array(1, 5, 6)),
mq.getUpsertKeys(logicalFullJoinOnUniqueKeys).toSet)
assertEquals(toBitSet(), mq.getUpsertKeys(logicalFullJoinOnUniqueKeys).toSet)
assertEquals(toBitSet(), mq.getUpsertKeys(logicalFullJoinNotOnUniqueKeys).toSet)
assertEquals(toBitSet(), mq.getUpsertKeys(logicalFullJoinOnRHSUniqueKeys).toSet)
assertEquals(toBitSet(), mq.getUpsertKeys(logicalFullJoinWithoutEquiCond).toSet)
Expand Down