Skip to content

Commit

Permalink
initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
buremba committed Jan 27, 2025
1 parent feae7ff commit 49f7b8d
Show file tree
Hide file tree
Showing 8 changed files with 809 additions and 36 deletions.
24 changes: 24 additions & 0 deletions resources/example.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
property,property_type,property_value,property_default,
TYPE,String,CSV,CSV,""""
RECORD_DELIMITER,String,\n,\n,""""
FIELD_DELIMITER,String,",",",",""""
FILE_EXTENSION,String,,,""""
SKIP_HEADER,Integer,0,0,""""
PARSE_HEADER,Boolean,FALSE,FALSE,""""
DATE_FORMAT,String,AUTO,AUTO,""""
TIME_FORMAT,String,AUTO,AUTO,""""
TIMESTAMP_FORMAT,String,AUTO,AUTO,""""
BINARY_FORMAT,String,HEX,HEX,""""
ESCAPE,String,NONE,NONE,""""
ESCAPE_UNENCLOSED_FIELD,String,\\,\\,""""
TRIM_SPACE,Boolean,FALSE,FALSE,""""
FIELD_OPTIONALLY_ENCLOSED_BY,String,NONE,NONE,""""
NULL_IF,List,"[\\N, NONE, NaN]",[\\N],""""
COMPRESSION,String,AUTO,AUTO,""""
ERROR_ON_COLUMN_COUNT_MISMATCH,Boolean,TRUE,TRUE,""""
VALIDATE_UTF8,Boolean,TRUE,TRUE,""""
SKIP_BLANK_LINES,Boolean,FALSE,FALSE,""""
REPLACE_INVALID_CHARACTERS,Boolean,FALSE,FALSE,""""
EMPTY_FIELD_AS_NULL,Boolean,TRUE,TRUE,""""
SKIP_BYTE_ORDER_MARK,Boolean,TRUE,TRUE,""""
ENCODING,String,UTF8,UTF8,""""
6 changes: 5 additions & 1 deletion tests/integration/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ def test_union(self):
result = execute_query(conn, "select 1 union all select 2")
assert result.num_rows == 2

def test_stage(self):
with universql_connection(warehouse=None) as conn:
result = execute_query(conn, "select * from @iceberg_db.public.landing_stage/initial_objects/device_metadata.csv")
assert result.num_rows > 0

def test_copy_into(self):
with universql_connection(warehouse=None) as conn:
result = execute_query(conn, """
Expand Down Expand Up @@ -196,5 +201,4 @@ def test_copy_into(self):
""")

result = execute_query(conn, "select count(*) from hits2")

assert result.num_rows == 10
19 changes: 19 additions & 0 deletions tests/plugins/snow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import uuid

from sqlglot import parse_one

from universql.plugins.snow import TableSampleUniversqlPlugin, SnowflakeStageUniversqlPlugin
from universql.protocol.session import UniverSQLSession
from universql.warehouse.duckdb import DuckDBExecutor, DuckDBCatalog
from universql.warehouse.snowflake import SnowflakeExecutor, SnowflakeCatalog

session = UniverSQLSession(
{'account': 'dhb43249.us-east-1',
'cache_directory': '/Users/bkabak/.universql/cache',
'catalog': 'snowflake',
'home_directory': '/Users/bkabak'}, uuid.uuid4(), {"warehouse": "duckdb()"}, {})
compute = {"warehouse": "duckdb()"}
plugin = SnowflakeStageUniversqlPlugin(SnowflakeExecutor(SnowflakeCatalog(session, compute)))
sql = (parse_one("select * from @stagename", dialect="snowflake")
.transform(plugin.transform_sql, DuckDBExecutor(DuckDBCatalog(session, compute))))
print(sql.sql(dialect="duckdb"))
17 changes: 9 additions & 8 deletions tests/scratch/sqlglot_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@

class FixTimestampTypes(UniversqlPlugin):

def transform_sql(self, expression, target_executor: Executor):
if isinstance(target_executor, DuckDBExecutor) and isinstance(expression, sqlglot.exp.DataType):
if expression.this.value in ["TIMESTAMPLTZ", "TIMESTAMPTZ"]:
return sqlglot.exp.DataType.build("TIMESTAMPTZ")
if expression.this.value in ["VARIANT"]:
return sqlglot.exp.DataType.build("JSON")

return expression
def transform_sql(self, ast, target_executor: Executor):
def fix_timestamp_types(expression):
if isinstance(target_executor, DuckDBExecutor) and isinstance(expression, sqlglot.exp.DataType):
if expression.this.value in ["TIMESTAMPLTZ", "TIMESTAMPTZ"]:
return sqlglot.exp.DataType.build("TIMESTAMPTZ")
if expression.this.value in ["VARIANT"]:
return sqlglot.exp.DataType.build("JSON")

return ast.transform(fix_timestamp_types)


class RewriteCreateAsIceberg(UniversqlPlugin):
Expand Down
2 changes: 1 addition & 1 deletion universql/catalog/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from pyiceberg.exceptions import NoSuchTableError, OAuthError
from pyiceberg.io import PY_IO_IMPL

from universql.plugin import Locations, ICatalog
from universql.protocol.session import UniverSQLSession
from universql.warehouse import ICatalog, Executor, Locations
from universql.lake.cloud import CACHE_DIRECTORY_KEY
from universql.util import SnowflakeError

Expand Down
27 changes: 27 additions & 0 deletions universql/lake/fsspec_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import inspect
import logging
import os
import shutil
from datetime import timedelta, datetime
from functools import wraps

Expand Down Expand Up @@ -103,6 +104,32 @@ def _check_file(self, path):
# def glob(self, path):
# return [self._strip_protocol(path)]

def get_file(self, path, lpath, **kwargs):
"""
Overridden method to manage the local caching process manually.
Downloads the remote file to `lpath + '.tmp'` and then renames it to `lpath`.
"""

# If the final file already exists and we are not forcing re-download, skip
if os.path.exists(lpath):
return

tmp_path = lpath + ".tmp"

# In case a previous failed download left a stale tmp file
if os.path.exists(tmp_path):
os.remove(tmp_path)

# Ensure the target directory for lpath exists
os.makedirs(os.path.dirname(lpath), exist_ok=True)

# Open the remote file and download to the temporary local file
with self.fs.open(path, 'rb') as source, open(tmp_path, 'wb') as target:
shutil.copyfileobj(source, target)

# Atomically move the temporary file to the final location
os.rename(tmp_path, lpath)

def size(self, path):
cached_file = self._check_file(self._strip_protocol(path))
if cached_file is None:
Expand Down
Loading

0 comments on commit 49f7b8d

Please sign in to comment.