Skip to content

Commit

Permalink
support passthrough
Browse files Browse the repository at this point in the history
  • Loading branch information
buremba committed Aug 5, 2024
1 parent a65f8ee commit 0e2e326
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: "3.10"
python-version: "3.11"

- name: Upgrade pip
run: |
Expand Down
15 changes: 8 additions & 7 deletions universql/warehouse/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def sync_duckdb_catalog(self, tables: List[sqlglot.exp.Expression], ast: sqlglot
if views:
self.duckdb.execute(views_sql)
logger.info(f"[{self.token}] Creating views for Iceberg tables: \n{views_sql}")

def replace_icebergs_with_duckdb_reference(
expression: sqlglot.exp.Expression) -> sqlglot.exp.Expression:
if isinstance(expression, sqlglot.exp.Table):
Expand All @@ -87,6 +88,7 @@ def replace_icebergs_with_duckdb_reference(
def _do_query(self, raw_query: str) -> (str, List, pyarrow.Table):
start_time = time.perf_counter()
compute = self.context.get('compute')
catalog = self.context.get('catalog')
local_error_message = ""

try:
Expand All @@ -101,7 +103,7 @@ def _do_query(self, raw_query: str) -> (str, List, pyarrow.Table):

if can_run_locally and should_run_locally:
for ast in queries:
if ast.key in queries_that_doesnt_need_warehouse:
if ast.key in queries_that_doesnt_need_warehouse and catalog == Catalog.SNOWFLAKE.value:
self.do_snowflake_query(queries, raw_query, start_time, local_error_message)
run_snowflake_already = True
else:
Expand All @@ -123,10 +125,10 @@ def _do_query(self, raw_query: str) -> (str, List, pyarrow.Table):
can_run_locally = False
break

catalog = self.context.get('catalog')
if compute == Compute.LOCAL.value or catalog == Catalog.POLARIS.value:
raise SnowflakeError(self.token, f"Can't run the query locally, {local_error_message}")

if compute == Compute.LOCAL.value:
if not should_run_locally:
raise SnowflakeError(self.token, f"Can't run the query locally, {local_error_message}")
return self.get_snowflake_result()
if can_run_locally and not run_snowflake_already and should_run_locally:
formatting = (self.token, datetime.timedelta(seconds=time.perf_counter() - start_time))
logger.info(f"[{self.token}] Run locally 🚀 ({formatting})")
Expand All @@ -135,7 +137,6 @@ def _do_query(self, raw_query: str) -> (str, List, pyarrow.Table):
self.do_snowflake_query(queries, raw_query, start_time, local_error_message)
return self.get_snowflake_result()


def do_snowflake_query(self, queries, raw_query, start_time, local_error_message):
try:
self.snowflake.execute(queries, raw_query)
Expand Down Expand Up @@ -167,7 +168,7 @@ def get_field_from_duckdb(self, column: list[str], arrow_table: Table, idx: int)
value = arrow_table[idx]

if field_type == 'NUMBER':
pa_type = pa.decimal128(getattr(value.type, 'precision', 38) , getattr(value.type, 'scale', 0))
pa_type = pa.decimal128(getattr(value.type, 'precision', 38), getattr(value.type, 'scale', 0))
value = value.cast(pa_type)
metadata["logicalType"] = "FIXED"
metadata["precision"] = "1"
Expand Down

0 comments on commit 0e2e326

Please sign in to comment.