Skip to content

Commit

Permalink
better mesages
Browse files Browse the repository at this point in the history
  • Loading branch information
buremba committed Aug 5, 2024
1 parent 86e490a commit 99e009f
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 16 deletions.
16 changes: 15 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ sqlalchemy = "^2.0.31"
fastapi-utils = "^0.7.0"

fakesnow = "^0.9.20"
humanize = "^4.10.0"
[tool.poetry.dev-dependencies]
pylint = ">=2.11.1"

Expand Down
2 changes: 1 addition & 1 deletion universql/catalog/snow/polaris.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _get_table(self, cursor: duckdb.DuckDBPyConnection, table: sqlglot.exp.Table
try:
iceberg_table = self.rest_catalog.load_table(table_ref)
except NoSuchTableError:
raise SnowflakeError(self.query_id, f"Table {table_ref} doesn't exist in Polaris catalog!")
raise SnowflakeError(self.query_id, f"Table {table_ref} doesn't exist in Polaris catalog `{self.credentials.get('database')}` or your role doesn't have access to the table.")
table_ref_sql = table.sql()
cursor.register(table_ref_sql, iceberg_table.scan().to_arrow())
return sqlglot.exp.parse_identifier(table_ref_sql)
Expand Down
6 changes: 3 additions & 3 deletions universql/lake/cloud.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import aiobotocore
import gcsfs
import s3fs
from duckdb import DuckDBPyConnection
# from fsspec.utils import setup_logging
from fsspec.core import logger
from fsspec.utils import setup_logging
from pyiceberg.io import PY_IO_IMPL
from pyiceberg.io.fsspec import FsspecFileIO
from pyiceberg.table import StaticTable
Expand All @@ -21,7 +21,7 @@ def s3(cache_storage: str, profile: str = "default"):


def gcs(cache_storage, project=None, token=None):
# setup_logging(logger=logger, level="DEBUG")
setup_logging(logger=logger, level="ERROR")
gcs_file_system = gcsfs.GCSFileSystem(project=project, token=token)
caching_fs = MonitoredSimpleCacheFileSystem(
fs=gcs_file_system,
Expand Down
13 changes: 13 additions & 0 deletions universql/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from enum import Enum
from typing import Any, List

import humanize
from pyarrow import Schema
from starlette.exceptions import HTTPException
from starlette.requests import Request
Expand Down Expand Up @@ -311,3 +312,15 @@ def sizeof_fmt(num, suffix="B"):
return f"{num:3.1f}{unit}{suffix}"
num /= 1000.0
return f"{num:.1f}Y{suffix}"


def get_friendly_time_since(start_time):
return humanize.precisedelta(datetime.timedelta(seconds=time.perf_counter() - start_time),
suppress=["days"], format="%0.3f")


def prepend_to_lines(input_string, prepend_string=" ", vertical_string='------'):
lines = input_string.split('\n')
modified_lines = [prepend_string + line for line in lines]
modified_string = '\n'.join(modified_lines)
return modified_string + '\n' + vertical_string
21 changes: 10 additions & 11 deletions universql/warehouse/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import List, Optional

import duckdb
import humanize
import pyarrow
import pyarrow as pa
import sqlglot
Expand All @@ -19,7 +20,8 @@
from universql.catalog import get_catalog
from universql.catalog.snow.show_iceberg_tables import cloud_logger
from universql.lake.cloud import s3, gcs
from universql.util import get_columns_for_duckdb, SnowflakeError, Compute, Catalog
from universql.util import get_columns_for_duckdb, SnowflakeError, Compute, Catalog, get_friendly_time_since, \
prepend_to_lines

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("🐥")
Expand Down Expand Up @@ -57,7 +59,7 @@ def register_data_lake(self, args: dict):
def sync_duckdb_catalog(self, tables: List[sqlglot.exp.Expression], ast: sqlglot.exp.Expression) -> Optional[
sqlglot.exp.Expression]:
try:
locations = self.catalog.get_table_references(self.duckdb_emulator, tables)
locations = self.catalog.get_table_references(self.duckdb, tables)
except DatabaseError as e:
error_message = (f"[{self.token}] Unable to find location of Iceberg tables. "
f"See: https://github.com/buremba/universql#cant-query-native-snowflake-tables. Cause: {e.msg}")
Expand All @@ -69,7 +71,7 @@ def sync_duckdb_catalog(self, tables: List[sqlglot.exp.Expression], ast: sqlglot
views_sql = "\n".join(views)
if views:
self.duckdb.execute(views_sql)
logger.info(f"[{self.token}] Creating views for Iceberg tables: \n{views_sql}")
logger.info(f"[{self.token}] DuckDB environment is setting up, creating views for remote tables: \n{prepend_to_lines(views_sql)}")

def replace_icebergs_with_duckdb_reference(
expression: sqlglot.exp.Expression) -> sqlglot.exp.Expression:
Expand Down Expand Up @@ -113,21 +115,18 @@ def _do_query(self, raw_query: str) -> (str, List, pyarrow.Table):
can_run_locally = False
break
sql = transformed_ast.sql(dialect="duckdb", pretty=True)
planned_duration = time.perf_counter() - start_time

try:
self.duckdb_emulator.execute(sql)
timedelta = datetime.timedelta(seconds=planned_duration)
logger.info("[%s] Duckdb environment is prepared. (%s)\n%s" % (self.token, timedelta, sql))
logger.info(f"[{self.token}] executing DuckDB query:\n{prepend_to_lines(sql)}")
# except duckdb.Error as e:
except DatabaseError as e:
local_error_message = f"Unable to run the query locally on DuckDB. {e.msg}"
can_run_locally = False
break

if can_run_locally and not run_snowflake_already and should_run_locally:
formatting = (self.token, datetime.timedelta(seconds=time.perf_counter() - start_time))
logger.info(f"[{self.token}] Run locally 🚀 ({formatting[1]})")
logger.info(f"[{self.token}] Run locally 🚀 ({get_friendly_time_since(start_time)})")
return self.get_duckdb_result()
else:
if local_error_message:
Expand All @@ -140,15 +139,15 @@ def _do_query(self, raw_query: str) -> (str, List, pyarrow.Table):
def do_snowflake_query(self, queries, raw_query, start_time, local_error_message):
try:
self.snowflake.execute(queries, raw_query)
formatting = (self.token, datetime.timedelta(seconds=time.perf_counter() - start_time))
logger.info(f"[{self.token}] Query is done. ({formatting[1]})")
logger.info(f"[{self.token}] Query is done. ({get_friendly_time_since(start_time)})")
except SnowflakeError as e:
final_error = f"{local_error_message}. {e.message}"
cloud_logger.error(f"[{self.token}] {final_error}")
raise SnowflakeError(self.token, final_error, e.sql_state)


def do_query(self, raw_query: str) -> (str, List, pyarrow.Table):
logger.info("[%s] Executing \n%s" % (self.token, raw_query))
logger.info(f"[{self.token}] Executing \n{prepend_to_lines(raw_query)}")
self.processing = True
try:
return self._do_query(raw_query)
Expand Down

0 comments on commit 99e009f

Please sign in to comment.