Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 32 additions & 34 deletions src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ def wrap(*args, **kwargs):

query = getattr(e, "query", None)
tb = sys.exc_info()[2]
sf_plan_in_arg = None
for arg in args:
if isinstance(arg, SnowflakePlan):
# this wrapper is triggered through collect or describe queries through
# ServerConnection.get_result_set, SnowflakePlan._analyze_attributes,
# or Selectable._analyze_attributes. In all these cases, there can be at
# most one SnowflakePlan in the args.
sf_plan_in_arg = arg
break

assert e.msg is not None
if "unexpected 'as'" in e.msg.lower():
ne = SnowparkClientExceptionMessages.SQL_PYTHON_REPORT_UNEXPECTED_ALIAS(
Expand All @@ -175,16 +185,14 @@ def wrap(*args, **kwargs):
)
raise ne.with_traceback(tb) from None
col = match.group(1)
children = [
arg for arg in args if isinstance(arg, SnowflakePlan)
]
remapped = [
SnowflakePlan.Decorator.__wrap_exception_regex_sub.sub(
"", val
)
for child in children
for val in child.expr_to_alias.values()
]
remapped = []
if sf_plan_in_arg is not None:
remapped = [
SnowflakePlan.Decorator.__wrap_exception_regex_sub.sub(
"", val
)
for val in sf_plan_in_arg.expr_to_alias.values()
]
if col in remapped:
unaliased_cols = (
snowflake.snowpark.dataframe._get_unaliased(col)
Expand Down Expand Up @@ -226,8 +234,8 @@ def wrap(*args, **kwargs):
col = match.group(1)

quoted_identifiers = []
for child in children:
plan_nodes = child.children_plan_nodes
if sf_plan_in_arg is not None:
plan_nodes = sf_plan_in_arg.children_plan_nodes
for node in plan_nodes:
if isinstance(node, Selectable):
quoted_identifiers.extend(
Expand Down Expand Up @@ -305,28 +313,18 @@ def search_read_file_node(
return result
return None

for arg in args:
if isinstance(arg, SnowflakePlan):
read_file_node = search_read_file_node(arg)
if (
read_file_node
and read_file_node.xml_reader_udtf is not None
):
row_tag = read_file_node.options.get(
XML_ROW_TAG_STRING
)
file_path = read_file_node.path
ne = SnowparkClientExceptionMessages.DF_XML_ROW_TAG_NOT_FOUND(
row_tag, file_path
)
raise ne.with_traceback(tb) from None
# when the describe query fails, the arg is a query string
elif isinstance(arg, str):
if f'"{XML_ROW_DATA_COLUMN_NAME}"' in arg:
ne = (
SnowparkClientExceptionMessages.DF_XML_ROW_TAG_NOT_FOUND()
)
raise ne.with_traceback(tb) from None
if sf_plan_in_arg is not None:
read_file_node = search_read_file_node(sf_plan_in_arg)
if (
read_file_node
and read_file_node.xml_reader_udtf is not None
):
row_tag = read_file_node.options.get(XML_ROW_TAG_STRING)
file_path = read_file_node.path
ne = SnowparkClientExceptionMessages.DF_XML_ROW_TAG_NOT_FOUND(
row_tag, file_path
)
raise ne.with_traceback(tb) from None

ne = SnowparkClientExceptionMessages.SQL_EXCEPTION_FROM_PROGRAMMING_ERROR(
e
Expand Down
Loading