From 99e009fe7e85b3626262b0d8538a7d3d569b14b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Kabakc=C4=B1?= Date: Mon, 5 Aug 2024 04:12:40 +0100 Subject: [PATCH] better mesages --- poetry.lock | 16 +++++++++++++++- pyproject.toml | 1 + universql/catalog/snow/polaris.py | 2 +- universql/lake/cloud.py | 6 +++--- universql/util.py | 13 +++++++++++++ universql/warehouse/duckdb.py | 21 ++++++++++----------- 6 files changed, 43 insertions(+), 16 deletions(-) diff --git a/poetry.lock b/poetry.lock index 7463da1..217a112 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1564,6 +1564,20 @@ files = [ [package.dependencies] pyreadline3 = {version = "*", markers = "sys_platform == \"win32\" and python_version >= \"3.8\""} +[[package]] +name = "humanize" +version = "4.10.0" +description = "Python humanize utilities" +optional = false +python-versions = ">=3.8" +files = [ + {file = "humanize-4.10.0-py3-none-any.whl", hash = "sha256:39e7ccb96923e732b5c2e27aeaa3b10a8dfeeba3eb965ba7b74a3eb0e30040a6"}, + {file = "humanize-4.10.0.tar.gz", hash = "sha256:06b6eb0293e4b85e8d385397c5868926820db32b9b654b932f57fa41c23c9978"}, +] + +[package.extras] +tests = ["freezegun", "pytest", "pytest-cov"] + [[package]] name = "idna" version = "3.7" @@ -4402,4 +4416,4 @@ test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools", [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "1b4d28aa8f31d91c4dbd9df823909e63f6ffe36c96acc728fcfaae3da6ea24a7" +content-hash = "6e3363d75ac049e422fece79b0d905de3941dcd90ffddc9dea5486442e1b3149" diff --git a/pyproject.toml b/pyproject.toml index 0d8844b..dadd30d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ sqlalchemy = "^2.0.31" fastapi-utils = "^0.7.0" fakesnow = "^0.9.20" +humanize = "^4.10.0" [tool.poetry.dev-dependencies] pylint = ">=2.11.1" diff --git a/universql/catalog/snow/polaris.py b/universql/catalog/snow/polaris.py index c88021e..67254c6 100644 --- a/universql/catalog/snow/polaris.py +++ b/universql/catalog/snow/polaris.py @@ -58,7 +58,7 @@ def _get_table(self, cursor: duckdb.DuckDBPyConnection, table: sqlglot.exp.Table try: iceberg_table = self.rest_catalog.load_table(table_ref) except NoSuchTableError: - raise SnowflakeError(self.query_id, f"Table {table_ref} doesn't exist in Polaris catalog!") + raise SnowflakeError(self.query_id, f"Table {table_ref} doesn't exist in Polaris catalog `{self.credentials.get('database')}` or your role doesn't have access to the table.") table_ref_sql = table.sql() cursor.register(table_ref_sql, iceberg_table.scan().to_arrow()) return sqlglot.exp.parse_identifier(table_ref_sql) diff --git a/universql/lake/cloud.py b/universql/lake/cloud.py index d1e96b8..28a8d66 100644 --- a/universql/lake/cloud.py +++ b/universql/lake/cloud.py @@ -1,8 +1,8 @@ import aiobotocore import gcsfs import s3fs -from duckdb import DuckDBPyConnection -# from fsspec.utils import setup_logging +from fsspec.core import logger +from fsspec.utils import setup_logging from pyiceberg.io import PY_IO_IMPL from pyiceberg.io.fsspec import FsspecFileIO from pyiceberg.table import StaticTable @@ -21,7 +21,7 @@ def s3(cache_storage: str, profile: str = "default"): def gcs(cache_storage, project=None, token=None): - # setup_logging(logger=logger, level="DEBUG") + setup_logging(logger=logger, level="ERROR") gcs_file_system = gcsfs.GCSFileSystem(project=project, token=token) caching_fs = MonitoredSimpleCacheFileSystem( fs=gcs_file_system, diff --git a/universql/util.py b/universql/util.py index 57a0dbd..a218459 100644 --- a/universql/util.py +++ b/universql/util.py @@ -6,6 +6,7 @@ from enum import Enum from typing import Any, List +import humanize from pyarrow import Schema from starlette.exceptions import HTTPException from starlette.requests import Request @@ -311,3 +312,15 @@ def sizeof_fmt(num, suffix="B"): return f"{num:3.1f}{unit}{suffix}" num /= 1000.0 return f"{num:.1f}Y{suffix}" + + +def get_friendly_time_since(start_time): + return humanize.precisedelta(datetime.timedelta(seconds=time.perf_counter() - start_time), + suppress=["days"], format="%0.3f") + + +def prepend_to_lines(input_string, prepend_string=" ", vertical_string='------'): + lines = input_string.split('\n') + modified_lines = [prepend_string + line for line in lines] + modified_string = '\n'.join(modified_lines) + return modified_string + '\n' + vertical_string diff --git a/universql/warehouse/duckdb.py b/universql/warehouse/duckdb.py index a9380b0..785ac35 100644 --- a/universql/warehouse/duckdb.py +++ b/universql/warehouse/duckdb.py @@ -6,6 +6,7 @@ from typing import List, Optional import duckdb +import humanize import pyarrow import pyarrow as pa import sqlglot @@ -19,7 +20,8 @@ from universql.catalog import get_catalog from universql.catalog.snow.show_iceberg_tables import cloud_logger from universql.lake.cloud import s3, gcs -from universql.util import get_columns_for_duckdb, SnowflakeError, Compute, Catalog +from universql.util import get_columns_for_duckdb, SnowflakeError, Compute, Catalog, get_friendly_time_since, \ + prepend_to_lines logging.basicConfig(level=logging.INFO) logger = logging.getLogger("🐥") @@ -57,7 +59,7 @@ def register_data_lake(self, args: dict): def sync_duckdb_catalog(self, tables: List[sqlglot.exp.Expression], ast: sqlglot.exp.Expression) -> Optional[ sqlglot.exp.Expression]: try: - locations = self.catalog.get_table_references(self.duckdb_emulator, tables) + locations = self.catalog.get_table_references(self.duckdb, tables) except DatabaseError as e: error_message = (f"[{self.token}] Unable to find location of Iceberg tables. " f"See: https://github.com/buremba/universql#cant-query-native-snowflake-tables. Cause: {e.msg}") @@ -69,7 +71,7 @@ def sync_duckdb_catalog(self, tables: List[sqlglot.exp.Expression], ast: sqlglot views_sql = "\n".join(views) if views: self.duckdb.execute(views_sql) - logger.info(f"[{self.token}] Creating views for Iceberg tables: \n{views_sql}") + logger.info(f"[{self.token}] DuckDB environment is setting up, creating views for remote tables: \n{prepend_to_lines(views_sql)}") def replace_icebergs_with_duckdb_reference( expression: sqlglot.exp.Expression) -> sqlglot.exp.Expression: @@ -113,12 +115,10 @@ def _do_query(self, raw_query: str) -> (str, List, pyarrow.Table): can_run_locally = False break sql = transformed_ast.sql(dialect="duckdb", pretty=True) - planned_duration = time.perf_counter() - start_time try: self.duckdb_emulator.execute(sql) - timedelta = datetime.timedelta(seconds=planned_duration) - logger.info("[%s] Duckdb environment is prepared. (%s)\n%s" % (self.token, timedelta, sql)) + logger.info(f"[{self.token}] executing DuckDB query:\n{prepend_to_lines(sql)}") # except duckdb.Error as e: except DatabaseError as e: local_error_message = f"Unable to run the query locally on DuckDB. {e.msg}" @@ -126,8 +126,7 @@ def _do_query(self, raw_query: str) -> (str, List, pyarrow.Table): break if can_run_locally and not run_snowflake_already and should_run_locally: - formatting = (self.token, datetime.timedelta(seconds=time.perf_counter() - start_time)) - logger.info(f"[{self.token}] Run locally 🚀 ({formatting[1]})") + logger.info(f"[{self.token}] Run locally 🚀 ({get_friendly_time_since(start_time)})") return self.get_duckdb_result() else: if local_error_message: @@ -140,15 +139,15 @@ def _do_query(self, raw_query: str) -> (str, List, pyarrow.Table): def do_snowflake_query(self, queries, raw_query, start_time, local_error_message): try: self.snowflake.execute(queries, raw_query) - formatting = (self.token, datetime.timedelta(seconds=time.perf_counter() - start_time)) - logger.info(f"[{self.token}] Query is done. ({formatting[1]})") + logger.info(f"[{self.token}] Query is done. ({get_friendly_time_since(start_time)})") except SnowflakeError as e: final_error = f"{local_error_message}. {e.message}" cloud_logger.error(f"[{self.token}] {final_error}") raise SnowflakeError(self.token, final_error, e.sql_state) + def do_query(self, raw_query: str) -> (str, List, pyarrow.Table): - logger.info("[%s] Executing \n%s" % (self.token, raw_query)) + logger.info(f"[{self.token}] Executing \n{prepend_to_lines(raw_query)}") self.processing = True try: return self._do_query(raw_query)