-
Notifications
You must be signed in to change notification settings - Fork 111
[PECOBLR-201] add variant support #560
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -675,7 +675,7 @@ def convert_col(t_column_desc): | |
return pyarrow.schema([convert_col(col) for col in t_table_schema.columns]) | ||
|
||
@staticmethod | ||
def _col_to_description(col): | ||
def _col_to_description(col, field): | ||
type_entry = col.typeDesc.types[0] | ||
|
||
if type_entry.primitiveEntry: | ||
|
@@ -702,12 +702,36 @@ def _col_to_description(col): | |
else: | ||
precision, scale = None, None | ||
|
||
# Extract variant type from field if available | ||
if field is not None: | ||
try: | ||
# Check for variant type in metadata | ||
if field.metadata and b"Spark:DataType:SqlName" in field.metadata: | ||
sql_type = field.metadata.get(b"Spark:DataType:SqlName") | ||
if sql_type == b"VARIANT": | ||
cleaned_type = "variant" | ||
except Exception as e: | ||
logger.debug(f"Could not extract variant type from field: {e}") | ||
|
||
Comment on lines
+706
to
+715
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please check with eng-sqlgateway if there is a way to get this from thrift metadata. python connector uses thrift metadata for getting metadata There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there is some documentation/contract around it or is it purely from empirical evidence? |
||
return col.columnName, cleaned_type, None, None, precision, scale, None | ||
|
||
@staticmethod | ||
def _hive_schema_to_description(t_table_schema): | ||
def _hive_schema_to_description(t_table_schema, schema_bytes=None): | ||
# Create a field lookup dictionary for efficient column access | ||
field_dict = {} | ||
if pyarrow and schema_bytes: | ||
try: | ||
arrow_schema = pyarrow.ipc.read_schema(pyarrow.py_buffer(schema_bytes)) | ||
# Build a dictionary mapping column names to fields | ||
for field in arrow_schema: | ||
field_dict[field.name] = field | ||
except Exception as e: | ||
logger.debug(f"Could not parse arrow schema: {e}") | ||
|
||
# Process each column with its corresponding Arrow field (if available) | ||
return [ | ||
ThriftBackend._col_to_description(col) for col in t_table_schema.columns | ||
ThriftBackend._col_to_description(col, field_dict.get(col.columnName)) | ||
for col in t_table_schema.columns | ||
] | ||
|
||
def _results_message_to_execute_response(self, resp, operation_state): | ||
|
@@ -736,9 +760,6 @@ def _results_message_to_execute_response(self, resp, operation_state): | |
or (not direct_results.resultSet) | ||
or direct_results.resultSet.hasMoreRows | ||
) | ||
description = self._hive_schema_to_description( | ||
t_result_set_metadata_resp.schema | ||
) | ||
|
||
if pyarrow: | ||
schema_bytes = ( | ||
|
@@ -750,6 +771,10 @@ def _results_message_to_execute_response(self, resp, operation_state): | |
else: | ||
schema_bytes = None | ||
|
||
description = self._hive_schema_to_description( | ||
t_result_set_metadata_resp.schema, schema_bytes | ||
) | ||
|
||
lz4_compressed = t_result_set_metadata_resp.lz4Compressed | ||
is_staging_operation = t_result_set_metadata_resp.isStagingOperation | ||
if direct_results and direct_results.resultSet: | ||
|
@@ -803,9 +828,6 @@ def get_execution_result(self, op_handle, cursor): | |
lz4_compressed = t_result_set_metadata_resp.lz4Compressed | ||
is_staging_operation = t_result_set_metadata_resp.isStagingOperation | ||
has_more_rows = resp.hasMoreRows | ||
description = self._hive_schema_to_description( | ||
t_result_set_metadata_resp.schema | ||
) | ||
|
||
if pyarrow: | ||
schema_bytes = ( | ||
|
@@ -817,6 +839,10 @@ def get_execution_result(self, op_handle, cursor): | |
else: | ||
schema_bytes = None | ||
|
||
description = self._hive_schema_to_description( | ||
t_result_set_metadata_resp.schema, schema_bytes | ||
) | ||
|
||
queue = ResultSetQueueFactory.build_queue( | ||
row_set_type=resp.resultSetMetadata.resultFormat, | ||
t_row_set=resp.results, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
import pytest | ||
from datetime import datetime | ||
import json | ||
try: | ||
import pyarrow | ||
except ImportError: | ||
pyarrow = None | ||
|
||
from tests.e2e.test_driver import PySQLPytestTestCase | ||
from tests.e2e.common.predicates import pysql_supports_arrow | ||
|
||
class TestVariantTypes(PySQLPytestTestCase): | ||
"""Tests for the proper detection and handling of VARIANT type columns""" | ||
|
||
@pytest.fixture(scope="class") | ||
def variant_table(self, connection_details): | ||
"""A pytest fixture that creates a test table and cleans up after tests""" | ||
self.arguments = connection_details.copy() | ||
table_name = "pysql_test_variant_types_table" | ||
|
||
with self.cursor() as cursor: | ||
try: | ||
# Create the table with variant columns | ||
cursor.execute( | ||
""" | ||
CREATE TABLE IF NOT EXISTS pysql_test_variant_types_table ( | ||
id INTEGER, | ||
variant_col VARIANT, | ||
regular_string_col STRING | ||
) | ||
""" | ||
) | ||
|
||
# Insert test records with different variant values | ||
cursor.execute( | ||
""" | ||
INSERT INTO pysql_test_variant_types_table | ||
VALUES | ||
(1, PARSE_JSON('{"name": "John", "age": 30}'), 'regular string'), | ||
(2, PARSE_JSON('[1, 2, 3, 4]'), 'another string') | ||
""" | ||
) | ||
yield table_name | ||
finally: | ||
cursor.execute(f"DROP TABLE IF EXISTS {table_name}") | ||
|
||
@pytest.mark.skipif(not pysql_supports_arrow(), reason="Requires arrow support") | ||
def test_variant_type_detection(self, variant_table): | ||
"""Test that VARIANT type columns are properly detected in schema""" | ||
with self.cursor() as cursor: | ||
cursor.execute(f"SELECT * FROM {variant_table} LIMIT 0") | ||
|
||
# Verify column types in description | ||
assert cursor.description[0][1] == 'int', "Integer column type not correctly identified" | ||
assert cursor.description[1][1] == 'variant', "VARIANT column type not correctly identified" | ||
assert cursor.description[2][1] == 'string', "String column type not correctly identified" | ||
|
||
@pytest.mark.skipif(not pysql_supports_arrow(), reason="Requires arrow support") | ||
def test_variant_data_retrieval(self, variant_table): | ||
"""Test that VARIANT data is properly retrieved and can be accessed as JSON""" | ||
with self.cursor() as cursor: | ||
cursor.execute(f"SELECT * FROM {variant_table} ORDER BY id") | ||
rows = cursor.fetchall() | ||
|
||
# First row should have a JSON object | ||
json_obj = rows[0][1] | ||
assert isinstance(json_obj, str), "VARIANT column should be returned as string" | ||
|
||
parsed = json.loads(json_obj) | ||
assert parsed.get('name') == 'John' | ||
assert parsed.get('age') == 30 | ||
|
||
# Second row should have a JSON array | ||
json_array = rows[1][1] | ||
assert isinstance(json_array, str), "VARIANT array should be returned as string" | ||
|
||
# Parsing to verify it's valid JSON array | ||
parsed_array = json.loads(json_array) | ||
assert isinstance(parsed_array, list) | ||
assert parsed_array == [1, 2, 3, 4] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure this is correct? I tried and was getting
metadata
as null when the column type isvariant
. Also for variant thepyarrow
schema just showsstring
in my testing, shouldn't the server returnvariant
type ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes,
debug output:
[SHIVAM] field pyarrow.Field<CAST(1 AS VARIANT): string>
[SHIVAM] field metadata {b'Spark:DataType:SqlName': b'VARIANT', b'Spark:DataType:JsonType': b'"variant"'}
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shivam2680 I am getting this as the arrow_schema, where metadata is null. Is this some transient behaviour ? or am I missing something
