From 2943e7f18df029a63f979f62cc1ec202f24d253b Mon Sep 17 00:00:00 2001 From: Muhammad Usama Date: Wed, 25 Dec 2024 19:01:07 +0500 Subject: [PATCH 1/3] [Bug Fix] Incorrect Query Results with Distributed Query Plans This PR addresses two reported issues: citusdata#7698 and citusdata#7697. The Problem: Both issues involve incorrect query results when a query references both local and distributed tables, and includes a WHERE EXISTS clause on the local table. For example: SELECT ... WHERE EXISTS (SELECT * FROM local_table); In such cases, the WHERE EXISTS clause typically generates an InitPlan or "one-time filter," which determines whether the rest of the plan's output qualifies for the result. If this InitPlan relies on the contents of a local table, it must be executed locally on the coordinator. However, the planner's decisions regarding whether to convert local or distributed tables into intermediate results fail to account for the references within the InitPlan. This results in an incorrect query execution plan and, subsequently, incorrect data. The Fix: This PR ensures that when the standard planner (standard_planner) generates an InitPlan in the PlannedStmt, we check the executor parameters (PARAM nodes) in the join qualifiers for relations referenced by the InitPlan. If such references exist, distributed table references are converted to intermediate results rather than local tables. This adjustment ensures that local tables used in the InitPlan remain intact and behave as expected. This fix prevents incorrect query results in cases involving mixed local and distributed tables with WHERE EXISTS clauses and improves the accuracy of distributed query planning. --- .../planner/local_distributed_join_planner.c | 89 ++++++++++++++++++- 1 file changed, 87 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/planner/local_distributed_join_planner.c b/src/backend/distributed/planner/local_distributed_join_planner.c index a6502bf43c4..75cdfe6c8b3 100644 --- a/src/backend/distributed/planner/local_distributed_join_planner.c +++ b/src/backend/distributed/planner/local_distributed_join_planner.c @@ -87,6 +87,7 @@ #include "optimizer/optimizer.h" #include "optimizer/planner.h" #include "optimizer/prep.h" +#include "optimizer/restrictinfo.h" #include "parser/parse_relation.h" #include "parser/parsetree.h" #include "utils/builtins.h" @@ -135,6 +136,7 @@ typedef struct RangeTableEntryDetails RangeTblEntry *rangeTableEntry; List *requiredAttributeNumbers; bool hasConstantFilterOnUniqueColumn; + bool hasDependencyOnInitPlanParam; #if PG_VERSION_NUM >= PG_VERSION_16 RTEPermissionInfo *perminfo; #endif @@ -175,6 +177,10 @@ typedef enum ConversionChoice static bool HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry, RelationRestriction *relationRestriction); + +static bool HasDependencyOnInitPlanParam(RangeTblEntry *rangeTableEntry, + RelationRestriction *relationRestriction); + static ConversionCandidates * CreateConversionCandidates(PlannerRestrictionContext * plannerRestrictionContext, List *rangeTableList, @@ -290,7 +296,12 @@ GetConversionChoice(ConversionCandidates *conversionCandidates, if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL) { - return localRTECandidate ? CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES; + /* + * If Local table is referenced by the InitPlan that is kind of a One time filter, + * In that case we should refrain from converting the local tables. + */ + return localRTECandidate && (!localRTECandidate->hasDependencyOnInitPlanParam) ? + CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES; } else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED) { @@ -314,7 +325,8 @@ GetConversionChoice(ConversionCandidates *conversionCandidates, } else { - return localRTECandidate ? CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES; + return localRTECandidate && (!localRTECandidate->hasDependencyOnInitPlanParam) ? + CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES; } } } @@ -383,6 +395,76 @@ ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList) } +/* + * HasDependencyOnInitPlanParam + * + * This function returns true if the given rangeTableEntry has a dependency + * on an InitPlan parameter. + */ +static bool +HasDependencyOnInitPlanParam(RangeTblEntry *rangeTableEntry, + RelationRestriction *relationRestriction) +{ + List* whereClauseList; + List* initPlanParamIDs = NIL; + ListCell *lc = NULL; + + /* + * Exit early if the plan does not include the initPlan or if relationRestriction + * does not contain joininfo. + */ + if (rangeTableEntry == NULL || relationRestriction == NULL) + return false; + if (relationRestriction->relOptInfo->joininfo == NULL) + return false; + if (relationRestriction->plannerInfo->init_plans == NULL) + return false; + + /* + * Gather all parameter IDs referenced by the InitPlan + */ + foreach(lc, relationRestriction->plannerInfo->init_plans) + { + Node *plan = (Node *) lfirst(lc); + + if (IsA(plan, SubPlan)) + { + SubPlan *subplan = (SubPlan *) plan; + if (subplan->setParam != NIL) + { + initPlanParamIDs = list_concat_unique_int(initPlanParamIDs, subplan->setParam); + + } + } + } + if (initPlanParamIDs == NIL) + return false; + /* + * Check if any parameter in the join conditions (join info) for this relation + * is referenced by the initPlan. This is important to ensure that we can + * decide whether we want to convert local or remote tables. + */ + whereClauseList = extract_actual_clauses(relationRestriction->relOptInfo->joininfo, true); + + foreach(lc, whereClauseList) + { + Node *clause = (Node *) lfirst(lc); + + if (IsA(clause, Param)) + { + Param *param = (Param *) clause; + if (param->paramkind == PARAM_EXEC) + { + if (list_member_int(initPlanParamIDs, param->paramid)) + { + return true; + } + } + } + } + return false; +} + /* * HasConstantFilterOnUniqueColumn returns true if the given rangeTableEntry has a constant * filter on a unique column. @@ -581,6 +663,9 @@ CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext, RequiredAttrNumbersForRelation(rangeTableEntry, plannerRestrictionContext); rangeTableEntryDetails->hasConstantFilterOnUniqueColumn = HasConstantFilterOnUniqueColumn(rangeTableEntry, relationRestriction); + rangeTableEntryDetails->hasDependencyOnInitPlanParam = + HasDependencyOnInitPlanParam(rangeTableEntry, relationRestriction); + #if PG_VERSION_NUM >= PG_VERSION_16 rangeTableEntryDetails->perminfo = NULL; if (rangeTableEntry->perminfoindex) From 5d148b75c3557669c54ea0cad46955e8c39943f3 Mon Sep 17 00:00:00 2001 From: Muhammad Usama Date: Thu, 26 Dec 2024 00:51:38 +0500 Subject: [PATCH 2/3] Running citus_indent to resolve indentation issues --- .../planner/local_distributed_join_planner.c | 35 +++++++++++++------ 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/src/backend/distributed/planner/local_distributed_join_planner.c b/src/backend/distributed/planner/local_distributed_join_planner.c index 75cdfe6c8b3..13fc4465b76 100644 --- a/src/backend/distributed/planner/local_distributed_join_planner.c +++ b/src/backend/distributed/planner/local_distributed_join_planner.c @@ -179,7 +179,7 @@ static bool HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry, RelationRestriction *relationRestriction); static bool HasDependencyOnInitPlanParam(RangeTblEntry *rangeTableEntry, - RelationRestriction *relationRestriction); + RelationRestriction *relationRestriction); static ConversionCandidates * CreateConversionCandidates(PlannerRestrictionContext * plannerRestrictionContext, @@ -300,8 +300,9 @@ GetConversionChoice(ConversionCandidates *conversionCandidates, * If Local table is referenced by the InitPlan that is kind of a One time filter, * In that case we should refrain from converting the local tables. */ - return localRTECandidate && (!localRTECandidate->hasDependencyOnInitPlanParam) ? - CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES; + return localRTECandidate && + (!localRTECandidate->hasDependencyOnInitPlanParam) ? + CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES; } else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED) { @@ -325,8 +326,9 @@ GetConversionChoice(ConversionCandidates *conversionCandidates, } else { - return localRTECandidate && (!localRTECandidate->hasDependencyOnInitPlanParam) ? - CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES; + return localRTECandidate && + (!localRTECandidate->hasDependencyOnInitPlanParam) ? + CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES; } } } @@ -403,10 +405,9 @@ ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList) */ static bool HasDependencyOnInitPlanParam(RangeTblEntry *rangeTableEntry, - RelationRestriction *relationRestriction) + RelationRestriction *relationRestriction) { - List* whereClauseList; - List* initPlanParamIDs = NIL; + List *initPlanParamIDs = NIL; ListCell *lc = NULL; /* @@ -414,11 +415,17 @@ HasDependencyOnInitPlanParam(RangeTblEntry *rangeTableEntry, * does not contain joininfo. */ if (rangeTableEntry == NULL || relationRestriction == NULL) + { return false; + } if (relationRestriction->relOptInfo->joininfo == NULL) + { return false; + } if (relationRestriction->plannerInfo->init_plans == NULL) + { return false; + } /* * Gather all parameter IDs referenced by the InitPlan @@ -432,19 +439,24 @@ HasDependencyOnInitPlanParam(RangeTblEntry *rangeTableEntry, SubPlan *subplan = (SubPlan *) plan; if (subplan->setParam != NIL) { - initPlanParamIDs = list_concat_unique_int(initPlanParamIDs, subplan->setParam); - + initPlanParamIDs = list_concat_unique_int(initPlanParamIDs, + subplan->setParam); } } } if (initPlanParamIDs == NIL) + { return false; + } + /* * Check if any parameter in the join conditions (join info) for this relation * is referenced by the initPlan. This is important to ensure that we can * decide whether we want to convert local or remote tables. */ - whereClauseList = extract_actual_clauses(relationRestriction->relOptInfo->joininfo, true); + List *whereClauseList = extract_actual_clauses( + relationRestriction->relOptInfo->joininfo, + true); foreach(lc, whereClauseList) { @@ -465,6 +477,7 @@ HasDependencyOnInitPlanParam(RangeTblEntry *rangeTableEntry, return false; } + /* * HasConstantFilterOnUniqueColumn returns true if the given rangeTableEntry has a constant * filter on a unique column. From a89ab7e1769d20504596fe7460d324c9fe8c06e6 Mon Sep 17 00:00:00 2001 From: Muhammad Usama Date: Thu, 26 Dec 2024 01:31:55 +0500 Subject: [PATCH 3/3] Adding test case --- src/test/regress/expected/issue_7698_7697.out | 88 +++++++++++++++++++ src/test/regress/multi_schedule | 2 +- src/test/regress/sql/issue_7698_7697.sql | 66 ++++++++++++++ 3 files changed, 155 insertions(+), 1 deletion(-) create mode 100644 src/test/regress/expected/issue_7698_7697.out create mode 100644 src/test/regress/sql/issue_7698_7697.sql diff --git a/src/test/regress/expected/issue_7698_7697.out b/src/test/regress/expected/issue_7698_7697.out new file mode 100644 index 00000000000..d3c6f23eebe --- /dev/null +++ b/src/test/regress/expected/issue_7698_7697.out @@ -0,0 +1,88 @@ +-- Issue #7698: An incorrect query result, where the distributed query plan seems wrong +-- https://github.com/citusdata/citus/issues/7698 +CREATE TABLE t1 (vkey int4 ,c10 int4); +CREATE TABLE t3 (vkey int4); +INSERT INTO t3 (vkey) values (1); +INSERT INTO t1 (vkey,c10) values (4, -70); +SELECT t3.vkey + FROM (t1 RIGHT OUTER JOIN t3 + ON (t1.c10 = t3.vkey )) + WHERE EXISTS (SELECT * FROM t3); + vkey +--------------------------------------------------------------------- + 1 +(1 row) + +-- Make t1 a distributed table +SELECT create_distributed_table('t1', 'vkey'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.t1$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Result should remain the same after making t1 a distributed table +SELECT t3.vkey + FROM (t1 RIGHT OUTER JOIN t3 + ON (t1.c10 = t3.vkey )) + WHERE EXISTS (SELECT * FROM t3); + vkey +--------------------------------------------------------------------- + 1 +(1 row) + +--- cleanup +DROP TABLE t1; +DROP TABLE t3; +-- Issue #7697: Incorrect result from a distributed table full outer join an undistributed table. +-- https://github.com/citusdata/citus/issues/7697 +CREATE TABLE t0 (vkey int4 ,c3 timestamp); +CREATE TABLE t3 (vkey int4 ,c26 timestamp); +CREATE TABLE t4 (vkey int4); +INSERT INTO t0 (vkey, c3) VALUES + (13,make_timestamp(2019, 10, 23, 15, 34, 50)); +INSERT INTO t3 (vkey,c26) VALUES + (1, make_timestamp(2024, 3, 26, 17, 36, 53)); +INSERT INTO t4 (vkey) VALUES + (1); +SELECT * FROM + (t0 FULL OUTER JOIN t3 ON (t0.c3 = t3.c26 )) + WHERE ( + EXISTS (SELECT * FROM t4) + ); + vkey | c3 | vkey | c26 +--------------------------------------------------------------------- + 13 | Wed Oct 23 15:34:50 2019 | | + | | 1 | Tue Mar 26 17:36:53 2024 +(2 rows) + +-- change t0 to distributed table +SELECT create_distributed_table('t0', 'vkey'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.t0$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Result should remain the same after making t0 a distributed table +SELECT * FROM + (t0 FULL OUTER JOIN t3 ON (t0.c3 = t3.c26 )) + WHERE ( + EXISTS (SELECT * FROM t4) + ); + vkey | c3 | vkey | c26 +--------------------------------------------------------------------- + | | 1 | Tue Mar 26 17:36:53 2024 + 13 | Wed Oct 23 15:34:50 2019 | | +(2 rows) + +--- cleanup +DROP TABLE t0; +DROP TABLE t3; +DROP TABLE t4; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index bbb4047a950..322f836fd90 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -103,7 +103,7 @@ test: multi_dropped_column_aliases foreign_key_restriction_enforcement test: binary_protocol test: alter_table_set_access_method test: alter_distributed_table -test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7705 +test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7705 issue_7698_7697 test: object_propagation_debug test: undistribute_table test: run_command_on_all_nodes diff --git a/src/test/regress/sql/issue_7698_7697.sql b/src/test/regress/sql/issue_7698_7697.sql new file mode 100644 index 00000000000..2b09376252d --- /dev/null +++ b/src/test/regress/sql/issue_7698_7697.sql @@ -0,0 +1,66 @@ + +-- Issue #7698: An incorrect query result, where the distributed query plan seems wrong +-- https://github.com/citusdata/citus/issues/7698 + +CREATE TABLE t1 (vkey int4 ,c10 int4); +CREATE TABLE t3 (vkey int4); +INSERT INTO t3 (vkey) values (1); +INSERT INTO t1 (vkey,c10) values (4, -70); + +SELECT t3.vkey + FROM (t1 RIGHT OUTER JOIN t3 + ON (t1.c10 = t3.vkey )) + WHERE EXISTS (SELECT * FROM t3); + +-- Make t1 a distributed table +SELECT create_distributed_table('t1', 'vkey'); + +-- Result should remain the same after making t1 a distributed table + +SELECT t3.vkey + FROM (t1 RIGHT OUTER JOIN t3 + ON (t1.c10 = t3.vkey )) + WHERE EXISTS (SELECT * FROM t3); + +--- cleanup +DROP TABLE t1; +DROP TABLE t3; + +-- Issue #7697: Incorrect result from a distributed table full outer join an undistributed table. +-- https://github.com/citusdata/citus/issues/7697 + +CREATE TABLE t0 (vkey int4 ,c3 timestamp); +CREATE TABLE t3 (vkey int4 ,c26 timestamp); +CREATE TABLE t4 (vkey int4); + + +INSERT INTO t0 (vkey, c3) VALUES + (13,make_timestamp(2019, 10, 23, 15, 34, 50)); + +INSERT INTO t3 (vkey,c26) VALUES + (1, make_timestamp(2024, 3, 26, 17, 36, 53)); + +INSERT INTO t4 (vkey) VALUES + (1); + +SELECT * FROM + (t0 FULL OUTER JOIN t3 ON (t0.c3 = t3.c26 )) + WHERE ( + EXISTS (SELECT * FROM t4) + ); + +-- change t0 to distributed table +SELECT create_distributed_table('t0', 'vkey'); + +-- Result should remain the same after making t0 a distributed table + +SELECT * FROM + (t0 FULL OUTER JOIN t3 ON (t0.c3 = t3.c26 )) + WHERE ( + EXISTS (SELECT * FROM t4) + ); + +--- cleanup +DROP TABLE t0; +DROP TABLE t3; +DROP TABLE t4;