Skip to content

Commit

Permalink
bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryan Waldorf committed Feb 5, 2025
1 parent 8b5be61 commit 314b8e5
Showing 1 changed file with 51 additions and 14 deletions.
65 changes: 51 additions & 14 deletions universql/protocol/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def perform_query(self, alternative_executor: Executor, raw_query, ast=None) ->
must_run_on_catalog = must_run_on_catalog or self._must_run_on_catalog(tables_list, ast)
if not must_run_on_catalog:
op_name = alternative_executor.__class__.__name__
new_locations = None
with sentry_sdk.start_span(op=op_name, name="Get table paths"):
locations = self.get_table_paths_from_catalog(alternative_executor.catalog, tables_list)
with sentry_sdk.start_span(op=op_name, name="Execute query"):
Expand All @@ -188,25 +189,61 @@ def perform_query(self, alternative_executor: Executor, raw_query, ast=None) ->
message = f"Unable to perform transformation {transform.__class__}"
logger.error(message, exc_info=e)
raise QueryError(f"{message}: {str(e)}")
max_query_attempts = 1
if isinstance(ast, Copy):
max_query_attempts = 3
query_attempt = 0
while query_attempt < max_query_attempts:
try:
new_locations = alternative_executor.execute(current_ast, self.catalog_executor, locations)
break
except Exception as e:
print(f"There was an issue executing this query: {e}. Trying again.")
query_attempt += 1
if query_attempt >= max_query_attempts:
raise e # Re-raise the last exception if all attempts fail
time.sleep(query_attempt * 1.0/2)
new_locations = alternative_executor.execute(current_ast, self.catalog_executor, locations)
if new_locations is not None:
with sentry_sdk.start_span(op=op_name, name="Register new locations"):
self.catalog.register_locations(new_locations)
return alternative_executor

# def perform_query(self, alternative_executor: Executor, raw_query, ast=None) -> Executor:
# if ast is not None and alternative_executor != self.catalog_executor:
# must_run_on_catalog = False
# if isinstance(ast, Create):
# if ast.kind in ('TABLE', 'VIEW'):
# tables = self._find_tables(ast.expression) if ast.expression is not None else []
# else:
# tables = []
# must_run_on_catalog = True
# elif isinstance(ast, Use):
# tables = []
# else:
# tables = self._find_tables(ast)
# tables_list = [table[0] for table in tables]
# must_run_on_catalog = must_run_on_catalog or self._must_run_on_catalog(tables_list, ast)
# if not must_run_on_catalog:
# op_name = alternative_executor.__class__.__name__
# new_locations = None
# with sentry_sdk.start_span(op=op_name, name="Get table paths"):
# locations = self.get_table_paths_from_catalog(alternative_executor.catalog, tables_list)
# with sentry_sdk.start_span(op=op_name, name="Execute query"):
# current_ast = ast
# for transform in self.transforms:
# try:
# current_ast = transform.transform_sql(current_ast, alternative_executor)
# except Exception as e:
# print_exc(10)
# message = f"Unable to perform transformation {transform.__class__}"
# logger.error(message, exc_info=e)
# raise QueryError(f"{message}: {str(e)}")
# max_query_attempts = 1
# if isinstance(ast, Copy):
# max_query_attempts = 3
# query_attempt = 0
# while query_attempt < max_query_attempts:
# try:
# new_locations = alternative_executor.execute(current_ast, self.catalog_executor, locations)
# break
# except Exception as e:
# print(f"There was an issue executing this query: {e}. Trying again.")
# query_attempt += 1
# if query_attempt >= max_query_attempts:
# raise e # Re-raise the last exception if all attempts fail
# time.sleep(query_attempt * 1.0/2)
# if new_locations is not None:
# with sentry_sdk.start_span(op=op_name, name="Register new locations"):
# self.catalog.register_locations(new_locations)
# return alternative_executor

with sentry_sdk.start_span(name="Execute query on Snowflake"):
last_executor = self.catalog_executor
if ast is None:
Expand Down

0 comments on commit 314b8e5

Please sign in to comment.