From d620a82b9f1c672b24d34ba21702c2d0ff2d1bee Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 2 Jul 2025 00:42:24 +0000 Subject: [PATCH] perf: Use CTEs more aggressively for smaller sql --- bigframes/core/compile/compiler.py | 11 ++++++++ bigframes/core/nodes.py | 30 ++++++++++++++++++++++ bigframes/core/rewrite/__init__.py | 2 ++ bigframes/core/rewrite/exract_ctes.py | 36 +++++++++++++++++++++++++++ bigframes/core/rewrite/order.py | 3 +++ 5 files changed, 82 insertions(+) create mode 100644 bigframes/core/rewrite/exract_ctes.py diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 0efbd47ae4..55f0019c03 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -50,6 +50,10 @@ def compile_sql(request: configs.CompileRequest) -> configs.CompileResult: # Need to do this before replacing unsupported ops, as that will rewrite slice ops result_node = rewrites.pull_up_limits(result_node) result_node = _replace_unsupported_ops(result_node) + + # must extract ctes before column pruning, which pushes constraints down + result_node = cast(nodes.ResultNode, rewrites.extract_ctes(result_node)) + # prune before pulling up order to avoid unnnecessary row_number() ops result_node = cast(nodes.ResultNode, rewrites.column_pruning(result_node)) result_node = rewrites.defer_order( @@ -284,3 +288,10 @@ def compile_explode(node: nodes.ExplodeNode, child: compiled.UnorderedIR): @_compile_node.register def compile_random_sample(node: nodes.RandomSampleNode, child: compiled.UnorderedIR): return child._uniform_sampling(node.fraction) + + +@_compile_node.register +def compile_cte_node(node: nodes.CteNode, child: compiled.UnorderedIR): + # CTE node is just an optimization barrier for ibis compiler + # Ibis itself will identify cte candidates and extract them + return child diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 205621fee2..6081704823 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -1235,6 +1235,36 @@ def remap_refs( return dataclasses.replace(self, input_output_pairs=new_fields) # type: ignore +@dataclasses.dataclass(frozen=True, eq=False) +class CteNode(UnaryNode): + @property + def fields(self) -> Sequence[Field]: + return self.child.fields + + @property + def variables_introduced(self) -> int: + # This operation only renames variables, doesn't actually create new ones + return 0 + + @property + def row_count(self) -> Optional[int]: + return self.child.row_count + + @property + def node_defined_ids(self) -> Tuple[identifiers.ColumnId, ...]: + return () + + def remap_vars( + self, mappings: Mapping[identifiers.ColumnId, identifiers.ColumnId] + ) -> CteNode: + return self + + def remap_refs( + self, mappings: Mapping[identifiers.ColumnId, identifiers.ColumnId] + ) -> CteNode: + return self + + @dataclasses.dataclass(frozen=True, eq=False) class ProjectionNode(UnaryNode, AdditiveNode): """Assigns new variables (without modifying existing ones)""" diff --git a/bigframes/core/rewrite/__init__.py b/bigframes/core/rewrite/__init__.py index 4e5295ae9d..dff6d406fc 100644 --- a/bigframes/core/rewrite/__init__.py +++ b/bigframes/core/rewrite/__init__.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from bigframes.core.rewrite.exract_ctes import extract_ctes from bigframes.core.rewrite.fold_row_count import fold_row_counts from bigframes.core.rewrite.identifiers import remap_variables from bigframes.core.rewrite.implicit_align import try_row_join @@ -44,4 +45,5 @@ "fold_row_counts", "pull_out_window_order", "defer_selection", + "extract_ctes", ] diff --git a/bigframes/core/rewrite/exract_ctes.py b/bigframes/core/rewrite/exract_ctes.py new file mode 100644 index 0000000000..ccaea6b832 --- /dev/null +++ b/bigframes/core/rewrite/exract_ctes.py @@ -0,0 +1,36 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from collections import defaultdict + +from bigframes.core import nodes + + +def extract_ctes(root: nodes.BigFrameNode) -> nodes.BigFrameNode: + # identify candidates + # candidates + node_parents: dict[nodes.BigFrameNode, int] = defaultdict(int) + for parent, child in root.edges(): + node_parents[child] += 1 + + # ok time to replace via extract + # we just mark in place, rather than pull out of the tree. + # if we did pull out of tree, we'd want to make sure to extract bottom-up + def insert_cte_markers(node: nodes.BigFrameNode) -> nodes.BigFrameNode: + if node_parents[node] > 1: + return nodes.CteNode(node) + return node + + return root.top_down(insert_cte_markers) diff --git a/bigframes/core/rewrite/order.py b/bigframes/core/rewrite/order.py index 5b5fb10753..c43f65d79b 100644 --- a/bigframes/core/rewrite/order.py +++ b/bigframes/core/rewrite/order.py @@ -113,6 +113,9 @@ def pull_up_order_inner( elif isinstance(node, bigframes.core.nodes.ProjectionNode): child_result, child_order = pull_up_order_inner(node.child) return node.replace_child(child_result), child_order + elif isinstance(node, bigframes.core.nodes.CteNode): + child_result, child_order = pull_up_order_inner(node.child) + return node.replace_child(child_result), child_order elif isinstance(node, bigframes.core.nodes.JoinNode): if node.propogate_order: return pull_order_join(node)