diff --git a/airbyte/_processors/sql/bigquery.py b/airbyte/_processors/sql/bigquery.py index ffbd5aac..ff4edd94 100644 --- a/airbyte/_processors/sql/bigquery.py +++ b/airbyte/_processors/sql/bigquery.py @@ -117,6 +117,10 @@ def to_sql_type( return self.get_string_type() if isinstance(sql_type, sqlalchemy.types.BIGINT): return sqlalchemy_types.Integer() # All integers are 64-bit in BigQuery + if isinstance(sql_type, sqlalchemy.types.DECIMAL): + # Convert SQLAlchemy DECIMAL to BigQuery NUMERIC type + # BigQuery NUMERIC type has precision of 38 and scale of 9 by default + return sqlalchemy_types.Numeric(precision=38, scale=9) return sql_type @@ -287,8 +291,7 @@ def _swap_temp_table_with_final_table( deletion_name = f"{final_table_name}_deleteme" commands = "\n".join( [ - f"ALTER TABLE {self._fully_qualified(final_table_name)} " - f"RENAME TO {deletion_name};", + f"ALTER TABLE {self._fully_qualified(final_table_name)} RENAME TO {deletion_name};", f"ALTER TABLE {self._fully_qualified(temp_table_name)} " f"RENAME TO {final_table_name};", f"DROP TABLE {self._fully_qualified(deletion_name)};", diff --git a/tests/integration_tests/test_bigquery_cache.py b/tests/integration_tests/test_bigquery_cache.py index 2039399e..9779d2f8 100644 --- a/tests/integration_tests/test_bigquery_cache.py +++ b/tests/integration_tests/test_bigquery_cache.py @@ -4,8 +4,11 @@ from __future__ import annotations import pytest +from sqlalchemy import types as sqlalchemy_types import airbyte as ab +from airbyte._processors.sql.bigquery import BigQueryTypeConverter +from airbyte._util import text_util @pytest.mark.requires_creds @@ -24,3 +27,57 @@ def test_bigquery_props( assert new_bigquery_cache.get_database_name() == new_bigquery_cache.project_name, ( "Database name should be the same as project name." ) + + +@pytest.mark.requires_creds +def test_decimal_type_conversion( + new_bigquery_cache: ab.BigQueryCache, +) -> None: + """Test that DECIMAL(38,9) types are correctly converted to BigQuery NUMERIC types.""" + table_name = f"test_decimal_{text_util.generate_random_suffix()}" + + try: + # Verify type conversion + converter = BigQueryTypeConverter() + converted_type = converter.to_sql_type({"type": "number", "format": "decimal"}) + + # Check that the converted type is a NUMERIC type with correct precision and scale + assert isinstance(converted_type, sqlalchemy_types.Numeric), ( + "DECIMAL type should be converted to NUMERIC" + ) + assert converted_type.precision == 38, "Precision should be 38" + assert converted_type.scale == 9, "Scale should be 9" + + # Ensure schema exists before creating table + new_bigquery_cache._ensure_schema_exists() + + # Create a test table with a DECIMAL column + sql = f""" + CREATE TABLE {new_bigquery_cache.schema_name}.{table_name} ( + id INT64, + amount NUMERIC(38, 9) + ) + """ + new_bigquery_cache.execute_sql(sql) + + # Insert test data + sql = f""" + INSERT INTO {new_bigquery_cache.schema_name}.{table_name} (id, amount) + VALUES (1, 123.456789) + """ + new_bigquery_cache.execute_sql(sql) + + # Verify we can read the data back + sql = f"SELECT amount FROM {new_bigquery_cache.schema_name}.{table_name} WHERE id = 1" + result = new_bigquery_cache.execute_sql(sql).fetchone() + assert result is not None, "Should be able to read NUMERIC data" + assert isinstance(result[0], (float, int, str)), ( + "NUMERIC data should be readable" + ) + + finally: + # Clean up + cleanup_sql = ( + f"DROP TABLE IF EXISTS {new_bigquery_cache.schema_name}.{table_name}" + ) + new_bigquery_cache.execute_sql(cleanup_sql)