diff --git a/README.md b/README.md index 5b08173..01c0853 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ You can join Snowflake data with your local datasets, **without any need for a r UniverSQL relies on Snowflake and Polaris for access control and data catalog so it's complementary to your Snowflake workloads. > [!WARNING] -> Any SQL client that supports Snowflake, also supports UniverSQL but we need to support Snowflake's API to support compatibility. If you run into any problem with a client, please [create an issue on Github](https://github.com/buremba/universql/issues/new). +> Any SQL client that supports Snowflake, also supports UniverSQL as we implement Snowflake's API to support compatibility. If you run into any issue using an app or client, feel free to [create a discussion](https://github.com/buremba/universql/discussions/categories/quality-testing). # How it works? @@ -54,19 +54,51 @@ Iceberg supports predicate pushdown, which helps with partitioned tables to redu # Getting Started -Install UniverSQL from PyPI as follows: +Install UniverSQL as a Python package: ```bash -pip install universql +python3 -m pip install universql ``` -You can start Universql with the passing your account identifier: +## Using virtual environments + +We recommend using virtual environments (venv) to namespace pip modules. Create a new venv as follows: + +```bash +python -m venv universql-env # create the environment +``` + +Activate that same virtual environment each time you create a shell window or session: + +``` +source universql-env/bin/activate # activate the environment for Mac and Linux OR +universql-env\Scripts\activate # activate the environment for Windows +``` + +Alternatively, pull the Docker image: (recommended for running in background) ```bash -universql snowflake --account-url lt51601.europe-west2.gcp +docker pull buremba/universql ``` +And then: + +```bash +universql + --network=host \ + --mount type=bind,source=,target=/usr/app \ + snowflake --account-url lt51601.europe-west2.gcp ``` + +For Docker: + +```bash +docker run buremba/universql snowflake --account eq06461.eu-west-2.aws +``` + +``` +> universql snowflake --help + Usage: universql snowflake [OPTIONS] Options: @@ -96,7 +128,7 @@ Options: --max-memory TEXT DuckDB Max memory to use for the server (default: 80% of total memory) --cache-directory TEXT Data lake cache directory (default: - /Users/bkabak/.universql/cache) + ~/.universql/cache) --max-cache-size TEXT DuckDB maximum cache used in local disk (default: 80% of total available disk) --help Show this message and exit. diff --git a/tests/sqlglot_tests.py b/tests/sqlglot_tests.py index d318581..359fcb8 100644 --- a/tests/sqlglot_tests.py +++ b/tests/sqlglot_tests.py @@ -1,12 +1,22 @@ import time +from pathlib import Path + +import psutil import pyarrow as pa import duckdb import sqlglot -from fakesnow.fakes import FakeSnowflakeCursor, FakeSnowflakeConnection +from universql.util import time_me from universql.warehouse.duckdb import fix_snowflake_to_duckdb_types +cache = "/Users/bkabak/.universql/cache" +@time_me +def test_cache_size(): + print(sum(f.stat().st_size for f in Path(cache).glob('**/*') if f.is_file())) + +test_cache_size() +test_cache_size() # queries = sqlglot.parse(""" # SET tables = (SHOW TABLES); # @@ -28,7 +38,8 @@ pa.field("timezone", nullable=False, type=pa.int32()), ] pa_type = pa.struct(fields) -pa.StructArray.from_arrays(arrays=[pa.array([1, 2, 3], type=pa.int64()), pa.array([1, 2, 3], type=pa.int32()), pa.array([1, 2, 3], type=pa.int32())], fields=fields) +pa.StructArray.from_arrays(arrays=[pa.array([1, 2, 3], type=pa.int64()), pa.array([1, 2, 3], type=pa.int32()), + pa.array([1, 2, 3], type=pa.int32())], fields=fields) query = """ SELECT diff --git a/universql/catalog/snow/polaris.py b/universql/catalog/snow/polaris.py index 67254c6..dc84fb2 100644 --- a/universql/catalog/snow/polaris.py +++ b/universql/catalog/snow/polaris.py @@ -5,7 +5,7 @@ import pyiceberg import sqlglot from pyiceberg.catalog import load_catalog -from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.exceptions import NoSuchTableError, OAuthError from pyiceberg.io import PY_IO_IMPL from pyiceberg.typedef import Identifier from snowflake.connector.options import pyarrow @@ -45,7 +45,10 @@ def __init__(self, cache_directory : str, account: str, query_id: str, credentia "warehouse": current_database, "scope": "PRINCIPAL_ROLE:ALL" } - self.rest_catalog = load_catalog(None, **iceberg_rest_credentials) + try: + self.rest_catalog = load_catalog(None, **iceberg_rest_credentials) + except OAuthError as e: + raise SnowflakeError(self.query_id, e.args[0]) self.rest_catalog.properties[CACHE_DIRECTORY_KEY] = cache_directory self.rest_catalog.properties[PY_IO_IMPL] = "universql.lake.cloud.iceberg" diff --git a/universql/catalog/snow/show_iceberg_tables.py b/universql/catalog/snow/show_iceberg_tables.py index 097be83..1be876b 100644 --- a/universql/catalog/snow/show_iceberg_tables.py +++ b/universql/catalog/snow/show_iceberg_tables.py @@ -183,7 +183,10 @@ class SnowflakeShowIcebergTables(IcebergCatalog): def __init__(self, account: str, query_id: str, credentials: dict): super().__init__(query_id, credentials) self.databases = {} - self.connection = snowflake.connector.connect(**credentials, account=account) + try: + self.connection = snowflake.connector.connect(**credentials, account=account) + except DatabaseError as e: + raise SnowflakeError(self.query_id, e.msg, e.sqlstate) def cursor(self) -> Cursor: return SnowflakeIcebergCursor(self.query_id, self.connection.cursor()) diff --git a/universql/lake/fsspec_util.py b/universql/lake/fsspec_util.py index e57db8b..8725f20 100644 --- a/universql/lake/fsspec_util.py +++ b/universql/lake/fsspec_util.py @@ -8,6 +8,8 @@ from fsspec.implementations.cache_mapper import AbstractCacheMapper from fsspec.implementations.cached import SimpleCacheFileSystem +from universql.util import get_total_directory_size + logging.basicConfig(level=logging.INFO) logger = logging.getLogger("fsspec") @@ -62,13 +64,14 @@ def sizeof_fmt(num, suffix="B"): first_free = None -def pprint_disk_usage(storage: str) -> str: +def get_friendly_disk_usage(storage: str) -> str: global last_free global first_free usage = psutil.disk_usage(storage) if first_free is None: first_free = usage.free - message = f"(free {sizeof_fmt(usage.free)} {int(100 - usage.percent)}%)" + current_usage = get_total_directory_size(storage) + message = f"(usage {sizeof_fmt(current_usage)} free {sizeof_fmt(usage.free)} {int(100 - usage.percent)}%)" if last_free is not None: downloaded_recently = last_free - usage.free if downloaded_recently > 10_000_000: diff --git a/universql/main.py b/universql/main.py index ce0c4f2..973fbf0 100644 --- a/universql/main.py +++ b/universql/main.py @@ -59,8 +59,6 @@ def cli(): help='DuckDB maximum cache used in local disk (default: 80% of total available disk)') def snowflake(host, port, ssl_keyfile, ssl_certfile, account, catalog, compute, **kwargs): context__params = click.get_current_context().params - params = {k: v for k, v in context__params.items() if - v is not None and k not in ["host", "port"]} auto_catalog_mode = catalog is None if auto_catalog_mode: try: @@ -82,17 +80,15 @@ def snowflake(host, port, ssl_keyfile, ssl_certfile, account, catalog, compute, adjective = "apparently" if auto_catalog_mode else "" logger.info(f"UniverSQL is starting reverse proxy for {account}.snowflakecomputing.com, " - f"it's {adjective} a {context__params['catalog']} server. Happy compute!") + f"it's {adjective} a {context__params['catalog']} server.") if compute == Compute.AUTO.value: - logger.info("The queries run on DuckDB and fallback to Snowflake if they fail.") + logger.info("The queries will run on DuckDB and fallback to Snowflake if they fail.") elif compute == Compute.LOCAL.value: logger.info("The queries will run locally") elif compute == Compute.SNOWFLAKE.value: logger.info("The queries will run directly on Snowflake") - click.secho(yaml.dump(params).strip()) - if not ssl_keyfile or not ssl_certfile: data = socket.gethostbyname_ex("localhostcomputing.com") logger.info(f"Using the SSL keyfile and certfile for localhostcomputing.com. DNS resolves to {data}") @@ -132,6 +128,7 @@ def filter(self, record: logging.LogRecord) -> bool: uvicorn_logger = logging.getLogger("uvicorn.access") uvicorn_logger.addFilter(EndpointFilter(path="/session/heartbeat")) +uvicorn_logger.addFilter(EndpointFilter(path="/session/delete")) uvicorn_logger.addFilter(EndpointFilter(path="/telemetry/send")) uvicorn_logger.addFilter(EndpointFilter(path="/queries/v1/query-request")) uvicorn_logger.addFilter(EndpointFilter(path="/session/v1/login-request")) diff --git a/universql/server.py b/universql/server.py index 3d9066b..c9391f8 100644 --- a/universql/server.py +++ b/universql/server.py @@ -13,14 +13,14 @@ import click import psutil import pyarrow as pa +import yaml from fastapi import FastAPI -from pyiceberg.exceptions import OAuthError from starlette.exceptions import HTTPException from starlette.requests import Request from starlette.responses import JSONResponse, Response -from universql.lake.fsspec_util import pprint_disk_usage +from universql.lake.fsspec_util import get_friendly_disk_usage from universql.util import unpack_request_body, session_from_request, SnowflakeError, parameters, \ print_dict_as_markdown_table from fastapi.encoders import jsonable_encoder @@ -62,11 +62,18 @@ async def login_request(request: Request) -> JSONResponse: try: session = UniverSQLSession(current_context, token, credentials, login_data.get("SESSION_PARAMETERS")) sessions[session.token] = session - except OAuthError as e: - message = e.args[0] + except SnowflakeError as e: + message = e.message + + client = f"{request.client.host}:{request.client.port}" + + if message is None: + logger.info( + f"[{token}] Created local session for user {credentials.get('user')} from {client}") + else: + logger.error( + f"Rejected login request from {client} for user {credentials.get('user')}. Reason: {message}") - logger.info( - f"[{token}] Created local session for user {credentials.get('user')} from {request.client.host}:{request.client.port}") return JSONResponse( { "data": @@ -111,6 +118,7 @@ async def delete_session(request: Request): return JSONResponse({"success": True}) del sessions[session.token] + logger.info(f"[{session.token}] Session closed, cleaning up resources.") session.close() return JSONResponse({"success": True}) return Response(status_code=404) @@ -181,7 +189,7 @@ async def query_request(request: Request) -> JSONResponse: @app.get("/") async def home(request: Request) -> JSONResponse: - return JSONResponse({"success": True}) + return JSONResponse({"success": True, "status": "X-Duck is ducking šŸ„"}) @app.get("/monitoring/queries/{query_id:str}") # type: ignore[arg-type] @@ -193,16 +201,20 @@ async def query_monitoring_query(self, request: Request) -> JSONResponse: return JSONResponse({"data": {"queries": [{"status": "SUCCESS"}]}, "success": True}) +ENABLE_DEBUG_WATCH_TOWER = False +WATCH_TOWER_SCHEDULE_SECONDS = 3 + + def watch_tower(cache_directory, **kwargs): while True: - time.sleep(3) + time.sleep(WATCH_TOWER_SCHEDULE_SECONDS) processing_sessions = sum(session.processing for token, session in sessions.items()) - if processing_sessions > 0: + if ENABLE_DEBUG_WATCH_TOWER or processing_sessions > 0: process = psutil.Process() percent = psutil.cpu_percent() cpu_percent = "%.1f" % percent memory_percent = "%.1f" % process.memory_percent() - disk_info = pprint_disk_usage(cache_directory) + disk_info = get_friendly_disk_usage(cache_directory) logger.info(f"[CPU: {cpu_percent}%] [Memory: {memory_percent}%] [Disk: {disk_info}] " f"Currently {len(sessions)} sessions running {processing_sessions} queries. ") @@ -239,7 +251,13 @@ async def startup_event(): "Python": f"snowflake.connector.connect(host='{current_context.get('host')}', port='{current_context.get('port')}')", "PHP": f"new PDO('snowflake:host={host_port}', '', '')", "Go": f"sql.Open('snowflake', 'user:pass@{host_port}/dbname')", - ".NET": f"host=;{host_port};db=testdb", + ".NET": f"host={host_port};db=testdb", "ODBC": f"Server={current_context.get('host')}; Database=dbname; Port={current_context.get('port')}", } - click.secho(print_dict_as_markdown_table(connections, footer_message=f"For other clients and applications, see https://github.com/buremba/universql",)) + params = {k: v for k, v in current_context.items() if + v is not None and k not in ["host", "port"]} + click.secho(yaml.dump(params).strip()) + click.secho(print_dict_as_markdown_table(connections, + footer_message=( + "You can connect to UniverSQL with any Snowflake client using your Snowflake credentials.", + "For application support, see https://github.com/buremba/universql",))) diff --git a/universql/util.py b/universql/util.py index 30243ad..c68d411 100644 --- a/universql/util.py +++ b/universql/util.py @@ -4,7 +4,8 @@ import time from dataclasses import dataclass from enum import Enum -from typing import Any, List +from pathlib import Path +from typing import Any, List, Tuple import humanize from pyarrow import Schema @@ -326,9 +327,27 @@ def prepend_to_lines(input_string, prepend_string=" ", vertical_string='------') return modified_string + '\n' + vertical_string -def print_dict_as_markdown_table(input_dict, footer_message : str, column_width=(8, 80)): +def print_dict_as_markdown_table(input_dict, footer_message: Tuple[str], column_width=(8, 80)): top_bottom_line = "ā”€" * (87 + 8) result = top_bottom_line for key, value in input_dict.items(): result += f"\nā”‚ {str(key).ljust(column_width[0])} ā”‚ {str(value).ljust(column_width[1])} ā”‚" - return result + '\n' + top_bottom_line + "\nā”‚ " + footer_message.ljust(92) + 'ā”‚\n' + top_bottom_line + + footer = '\n' + top_bottom_line + '\n' + '\n'.join( + ["ā”‚ " + message.ljust(92) + 'ā”‚' for message in footer_message]) + '\n' + return result + footer + top_bottom_line + + +def time_me(func): + def wrapper(*args, **kwargs): + start = time.perf_counter() + original_return_val = func(*args, **kwargs) + end = time.perf_counter() + print("time elapsed in ", func.__name__, ": ", end - start, sep='') + return original_return_val + + return wrapper + + +def get_total_directory_size(directory: str): + return sum(f.stat().st_size for f in Path(directory).glob('**/*') if f.is_file())