Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: airbnb/omniduct
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v1.2.1
Choose a base ref
...
head repository: airbnb/omniduct
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: main
Choose a head ref
  • 2 commits
  • 1 file changed
  • 1 contributor

Commits on Jun 1, 2024

  1. presto: Improve error handling, fix duplicate queries when things fai…

    …l, and avoid running more than one query when writing to a table.
    matthewwardrop committed Jun 1, 2024
    Copy the full SHA
    2afcb64 View commit details
  2. Copy the full SHA
    e50a768 View commit details
Showing with 14 additions and 27 deletions.
  1. +14 −27 omniduct/databases/presto.py
41 changes: 14 additions & 27 deletions omniduct/databases/presto.py
Original file line number Diff line number Diff line change
@@ -5,7 +5,6 @@
import ast
import logging
import re
import sys

import pandas.io.sql
from interface_meta import override
@@ -76,7 +75,6 @@ def _init(
self.schema = schema
self.server_protocol = server_protocol
self.source = source
self.__presto = None
self.connection_fields += ("catalog", "schema")
self._requests_session = requests_session

@@ -103,18 +101,11 @@ def _connect(self):

@override
def _is_connected(self):
try:
return self.__presto is not None
except: # pylint: disable=bare-except
return False
return True

@override
def _disconnect(self):
logger.info("Disconnecting from Presto coordinator...")
try:
self.__presto.close()
except: # pylint: disable=bare-except
pass
self._sqlalchemy_engine = None
self._sqlalchemy_metadata = None
self._schemas = None # pylint: disable=attribute-defined-outside-init
@@ -162,10 +153,6 @@ def _execute(self, statement, cursor, wait, session_properties):
logger.progress(100, complete=True)
return cursor
except (DatabaseError, pandas.io.sql.DatabaseError) as e:
# Attempt to parse database error, before ultimately reraising the same
# exception, maintaining the full stacktrace.
exception, exception_args, traceback = sys.exc_info()

try:
message = e.args[0]
if isinstance(message, str):
@@ -191,12 +178,6 @@ def _execute(self, statement, cursor, wait, session_properties):
)
)

class ErrContext:
def __repr__(self):
return context

# logged twice so that both notebook and console users see the error context
exception_args.args = [exception_args, ErrContext()]
logger.error(context)
except: # pylint: disable=bare-except
logger.warn(
@@ -206,17 +187,14 @@ def __repr__(self):
)
)

if isinstance(exception, type):
exception = exception(exception_args)

raise exception.with_traceback(traceback)
raise

@override
def _query_to_table(self, statement, table, if_exists, **kwargs):
from pyhive.exc import DatabaseError

statements = []

if if_exists == "fail" and self.table_exists(table):
raise RuntimeError(f"Table {table} already exists!")
if if_exists == "replace":
statements.append(f"DROP TABLE IF EXISTS {table};\n")
elif if_exists == "append":
@@ -225,7 +203,16 @@ def _query_to_table(self, statement, table, if_exists, **kwargs):
)

statements.append(f"CREATE TABLE {table} AS ({statement})")
return self.execute("\n".join(statements), **kwargs)

try:
return self.execute("\n".join(statements), **kwargs)
except DatabaseError as e:
if (
isinstance(e.args, dict)
and e.args.get("errorName") == "TABLE_ALREADY_EXISTS"
):
raise RuntimeError(f"Table {table} already exists!") from e
raise

@override
def _dataframe_to_table(self, df, table, if_exists="fail", **kwargs):