Skip to content

Commit

Permalink
add user friendly logs to cli
Browse files Browse the repository at this point in the history
  • Loading branch information
buremba committed Aug 5, 2024
1 parent ed9f396 commit 519ed22
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 34 deletions.
44 changes: 38 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ You can join Snowflake data with your local datasets, **without any need for a r
UniverSQL relies on Snowflake and Polaris for access control and data catalog so it's complementary to your Snowflake workloads.

> [!WARNING]
> Any SQL client that supports Snowflake, also supports UniverSQL but we need to support Snowflake's API to support compatibility. If you run into any problem with a client, please [create an issue on Github](https://github.com/buremba/universql/issues/new).
> Any SQL client that supports Snowflake, also supports UniverSQL as we implement Snowflake's API to support compatibility. If you run into any issue using an app or client, feel free to [create a discussion](https://github.com/buremba/universql/discussions/categories/quality-testing).
# How it works?

Expand Down Expand Up @@ -54,19 +54,51 @@ Iceberg supports predicate pushdown, which helps with partitioned tables to redu

# Getting Started

Install UniverSQL from PyPI as follows:
Install UniverSQL as a Python package:

```bash
pip install universql
python3 -m pip install universql
```

You can start Universql with the passing your account identifier:
## Using virtual environments

We recommend using virtual environments (venv) to namespace pip modules. Create a new venv as follows:

```bash
python -m venv universql-env # create the environment
```

Activate that same virtual environment each time you create a shell window or session:

```
source universql-env/bin/activate # activate the environment for Mac and Linux OR
universql-env\Scripts\activate # activate the environment for Windows
```

Alternatively, pull the Docker image: (recommended for running in background)

```bash
universql snowflake --account-url lt51601.europe-west2.gcp
docker pull buremba/universql
```

And then:

```bash
universql
--network=host \
--mount type=bind,source=,target=/usr/app \
snowflake --account-url lt51601.europe-west2.gcp
```

For Docker:

```bash
docker run buremba/universql snowflake --account eq06461.eu-west-2.aws
```

```
> universql snowflake --help
Usage: universql snowflake [OPTIONS]
Options:
Expand Down Expand Up @@ -96,7 +128,7 @@ Options:
--max-memory TEXT DuckDB Max memory to use for the server
(default: 80% of total memory)
--cache-directory TEXT Data lake cache directory (default:
/Users/bkabak/.universql/cache)
~/.universql/cache)
--max-cache-size TEXT DuckDB maximum cache used in local disk
(default: 80% of total available disk)
--help Show this message and exit.
Expand Down
15 changes: 13 additions & 2 deletions tests/sqlglot_tests.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
import time
from pathlib import Path

import psutil
import pyarrow as pa

import duckdb
import sqlglot
from fakesnow.fakes import FakeSnowflakeCursor, FakeSnowflakeConnection

from universql.util import time_me
from universql.warehouse.duckdb import fix_snowflake_to_duckdb_types

cache = "/Users/bkabak/.universql/cache"
@time_me
def test_cache_size():
print(sum(f.stat().st_size for f in Path(cache).glob('**/*') if f.is_file()))

test_cache_size()
test_cache_size()
# queries = sqlglot.parse("""
# SET tables = (SHOW TABLES);
#
Expand All @@ -28,7 +38,8 @@
pa.field("timezone", nullable=False, type=pa.int32()),
]
pa_type = pa.struct(fields)
pa.StructArray.from_arrays(arrays=[pa.array([1, 2, 3], type=pa.int64()), pa.array([1, 2, 3], type=pa.int32()), pa.array([1, 2, 3], type=pa.int32())], fields=fields)
pa.StructArray.from_arrays(arrays=[pa.array([1, 2, 3], type=pa.int64()), pa.array([1, 2, 3], type=pa.int32()),
pa.array([1, 2, 3], type=pa.int32())], fields=fields)

query = """
SELECT
Expand Down
7 changes: 5 additions & 2 deletions universql/catalog/snow/polaris.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pyiceberg
import sqlglot
from pyiceberg.catalog import load_catalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.exceptions import NoSuchTableError, OAuthError
from pyiceberg.io import PY_IO_IMPL
from pyiceberg.typedef import Identifier
from snowflake.connector.options import pyarrow
Expand Down Expand Up @@ -45,7 +45,10 @@ def __init__(self, cache_directory : str, account: str, query_id: str, credentia
"warehouse": current_database,
"scope": "PRINCIPAL_ROLE:ALL"
}
self.rest_catalog = load_catalog(None, **iceberg_rest_credentials)
try:
self.rest_catalog = load_catalog(None, **iceberg_rest_credentials)
except OAuthError as e:
raise SnowflakeError(self.query_id, e.args[0])
self.rest_catalog.properties[CACHE_DIRECTORY_KEY] = cache_directory
self.rest_catalog.properties[PY_IO_IMPL] = "universql.lake.cloud.iceberg"

Expand Down
5 changes: 4 additions & 1 deletion universql/catalog/snow/show_iceberg_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ class SnowflakeShowIcebergTables(IcebergCatalog):
def __init__(self, account: str, query_id: str, credentials: dict):
super().__init__(query_id, credentials)
self.databases = {}
self.connection = snowflake.connector.connect(**credentials, account=account)
try:
self.connection = snowflake.connector.connect(**credentials, account=account)
except DatabaseError as e:
raise SnowflakeError(self.query_id, e.msg, e.sqlstate)

def cursor(self) -> Cursor:
return SnowflakeIcebergCursor(self.query_id, self.connection.cursor())
Expand Down
7 changes: 5 additions & 2 deletions universql/lake/fsspec_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from fsspec.implementations.cache_mapper import AbstractCacheMapper
from fsspec.implementations.cached import SimpleCacheFileSystem

from universql.util import get_total_directory_size

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("fsspec")

Expand Down Expand Up @@ -62,13 +64,14 @@ def sizeof_fmt(num, suffix="B"):
first_free = None


def pprint_disk_usage(storage: str) -> str:
def get_friendly_disk_usage(storage: str) -> str:
global last_free
global first_free
usage = psutil.disk_usage(storage)
if first_free is None:
first_free = usage.free
message = f"(free {sizeof_fmt(usage.free)} {int(100 - usage.percent)}%)"
current_usage = get_total_directory_size(storage)
message = f"(usage {sizeof_fmt(current_usage)} free {sizeof_fmt(usage.free)} {int(100 - usage.percent)}%)"
if last_free is not None:
downloaded_recently = last_free - usage.free
if downloaded_recently > 10_000_000:
Expand Down
9 changes: 3 additions & 6 deletions universql/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ def cli():
help='DuckDB maximum cache used in local disk (default: 80% of total available disk)')
def snowflake(host, port, ssl_keyfile, ssl_certfile, account, catalog, compute, **kwargs):
context__params = click.get_current_context().params
params = {k: v for k, v in context__params.items() if
v is not None and k not in ["host", "port"]}
auto_catalog_mode = catalog is None
if auto_catalog_mode:
try:
Expand All @@ -82,17 +80,15 @@ def snowflake(host, port, ssl_keyfile, ssl_certfile, account, catalog, compute,

adjective = "apparently" if auto_catalog_mode else ""
logger.info(f"UniverSQL is starting reverse proxy for {account}.snowflakecomputing.com, "
f"it's {adjective} a {context__params['catalog']} server. Happy compute!")
f"it's {adjective} a {context__params['catalog']} server.")

if compute == Compute.AUTO.value:
logger.info("The queries run on DuckDB and fallback to Snowflake if they fail.")
logger.info("The queries will run on DuckDB and fallback to Snowflake if they fail.")
elif compute == Compute.LOCAL.value:
logger.info("The queries will run locally")
elif compute == Compute.SNOWFLAKE.value:
logger.info("The queries will run directly on Snowflake")

click.secho(yaml.dump(params).strip())

if not ssl_keyfile or not ssl_certfile:
data = socket.gethostbyname_ex("localhostcomputing.com")
logger.info(f"Using the SSL keyfile and certfile for localhostcomputing.com. DNS resolves to {data}")
Expand Down Expand Up @@ -132,6 +128,7 @@ def filter(self, record: logging.LogRecord) -> bool:

uvicorn_logger = logging.getLogger("uvicorn.access")
uvicorn_logger.addFilter(EndpointFilter(path="/session/heartbeat"))
uvicorn_logger.addFilter(EndpointFilter(path="/session/delete"))
uvicorn_logger.addFilter(EndpointFilter(path="/telemetry/send"))
uvicorn_logger.addFilter(EndpointFilter(path="/queries/v1/query-request"))
uvicorn_logger.addFilter(EndpointFilter(path="/session/v1/login-request"))
Expand Down
42 changes: 30 additions & 12 deletions universql/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
import click
import psutil
import pyarrow as pa
import yaml

from fastapi import FastAPI
from pyiceberg.exceptions import OAuthError
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.responses import JSONResponse, Response

from universql.lake.fsspec_util import pprint_disk_usage
from universql.lake.fsspec_util import get_friendly_disk_usage
from universql.util import unpack_request_body, session_from_request, SnowflakeError, parameters, \
print_dict_as_markdown_table
from fastapi.encoders import jsonable_encoder
Expand Down Expand Up @@ -62,11 +62,18 @@ async def login_request(request: Request) -> JSONResponse:
try:
session = UniverSQLSession(current_context, token, credentials, login_data.get("SESSION_PARAMETERS"))
sessions[session.token] = session
except OAuthError as e:
message = e.args[0]
except SnowflakeError as e:
message = e.message

client = f"{request.client.host}:{request.client.port}"

if message is None:
logger.info(
f"[{token}] Created local session for user {credentials.get('user')} from {client}")
else:
logger.error(
f"Rejected login request from {client} for user {credentials.get('user')}. Reason: {message}")

logger.info(
f"[{token}] Created local session for user {credentials.get('user')} from {request.client.host}:{request.client.port}")
return JSONResponse(
{
"data":
Expand Down Expand Up @@ -111,6 +118,7 @@ async def delete_session(request: Request):
return JSONResponse({"success": True})

del sessions[session.token]
logger.info(f"[{session.token}] Session closed, cleaning up resources.")
session.close()
return JSONResponse({"success": True})
return Response(status_code=404)
Expand Down Expand Up @@ -181,7 +189,7 @@ async def query_request(request: Request) -> JSONResponse:

@app.get("/")
async def home(request: Request) -> JSONResponse:
return JSONResponse({"success": True})
return JSONResponse({"success": True, "status": "X-Duck is ducking 🐥"})


@app.get("/monitoring/queries/{query_id:str}") # type: ignore[arg-type]
Expand All @@ -193,16 +201,20 @@ async def query_monitoring_query(self, request: Request) -> JSONResponse:
return JSONResponse({"data": {"queries": [{"status": "SUCCESS"}]}, "success": True})


ENABLE_DEBUG_WATCH_TOWER = False
WATCH_TOWER_SCHEDULE_SECONDS = 3


def watch_tower(cache_directory, **kwargs):
while True:
time.sleep(3)
time.sleep(WATCH_TOWER_SCHEDULE_SECONDS)
processing_sessions = sum(session.processing for token, session in sessions.items())
if processing_sessions > 0:
if ENABLE_DEBUG_WATCH_TOWER or processing_sessions > 0:
process = psutil.Process()
percent = psutil.cpu_percent()
cpu_percent = "%.1f" % percent
memory_percent = "%.1f" % process.memory_percent()
disk_info = pprint_disk_usage(cache_directory)
disk_info = get_friendly_disk_usage(cache_directory)
logger.info(f"[CPU: {cpu_percent}%] [Memory: {memory_percent}%] [Disk: {disk_info}] "
f"Currently {len(sessions)} sessions running {processing_sessions} queries. ")

Expand Down Expand Up @@ -239,7 +251,13 @@ async def startup_event():
"Python": f"snowflake.connector.connect(host='{current_context.get('host')}', port='{current_context.get('port')}')",
"PHP": f"new PDO('snowflake:host={host_port}', '<user>', '<password>')",
"Go": f"sql.Open('snowflake', 'user:pass@{host_port}/dbname')",
".NET": f"host=;{host_port};db=testdb",
".NET": f"host={host_port};db=testdb",
"ODBC": f"Server={current_context.get('host')}; Database=dbname; Port={current_context.get('port')}",
}
click.secho(print_dict_as_markdown_table(connections, footer_message=f"For other clients and applications, see https://github.com/buremba/universql",))
params = {k: v for k, v in current_context.items() if
v is not None and k not in ["host", "port"]}
click.secho(yaml.dump(params).strip())
click.secho(print_dict_as_markdown_table(connections,
footer_message=(
"You can connect to UniverSQL with any Snowflake client using your Snowflake credentials.",
"For application support, see https://github.com/buremba/universql",)))
25 changes: 22 additions & 3 deletions universql/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import time
from dataclasses import dataclass
from enum import Enum
from typing import Any, List
from pathlib import Path
from typing import Any, List, Tuple

import humanize
from pyarrow import Schema
Expand Down Expand Up @@ -326,9 +327,27 @@ def prepend_to_lines(input_string, prepend_string=" ", vertical_string='------')
return modified_string + '\n' + vertical_string


def print_dict_as_markdown_table(input_dict, footer_message : str, column_width=(8, 80)):
def print_dict_as_markdown_table(input_dict, footer_message: Tuple[str], column_width=(8, 80)):
top_bottom_line = "─" * (87 + 8)
result = top_bottom_line
for key, value in input_dict.items():
result += f"\n{str(key).ljust(column_width[0])}{str(value).ljust(column_width[1])} │"
return result + '\n' + top_bottom_line + "\n│ " + footer_message.ljust(92) + '│\n' + top_bottom_line

footer = '\n' + top_bottom_line + '\n' + '\n'.join(
["│ " + message.ljust(92) + '│' for message in footer_message]) + '\n'
return result + footer + top_bottom_line


def time_me(func):
def wrapper(*args, **kwargs):
start = time.perf_counter()
original_return_val = func(*args, **kwargs)
end = time.perf_counter()
print("time elapsed in ", func.__name__, ": ", end - start, sep='')
return original_return_val

return wrapper


def get_total_directory_size(directory: str):
return sum(f.stat().st_size for f in Path(directory).glob('**/*') if f.is_file())

0 comments on commit 519ed22

Please sign in to comment.