Skip to content

Add identity_insert materialisation option for MSSQL tablestores #172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
non-nullable columns by default. If neither nullable nor non_nullable are specified, the default `CREATE TABLE as SELECT`
is kept unmodified except for primary key columns where some dialects require explicit `NOT NULL` statements.
- Fix that unlogged tables were created as logged tables when they were copied as cache valid
- Added MaterializationDetails class for MSSQL Table Stores. Currently only one option is supported: identity_insert. If set to true the autoincrement attributes of all columns in a table are set to true

## 0.7.2 (2024-03-25)
- Disable Kroki links by default. New setting disable_kroki=True allows to still default kroki_url to https://kroki.io.
Expand Down
14 changes: 14 additions & 0 deletions pipedag.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ instances:
blob_store:
blob_store_connection: no_blob

mssql_materialization_details:
instance_id: mssql_materialization_details
table_store:
table_store_connection: mssql
args:
strict_materialization_details: false
materialization_details:
__any__:
identity_insert: false
with_identity_insert:
identity_insert: true
blob_store:
blob_store_connection: no_blob

ibm_db2:
instance_id: pd_ibm_db2
stage_commit_technique: READ_VIEWS
Expand Down
2 changes: 2 additions & 0 deletions src/pydiverse/pipedag/backend/table/sql/ddl.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ def __init__(
column_types: list[str],
nullable: bool | list[bool] | None = None,
cap_varchar_max: int | None = None,
autoincrement: bool | None = None,
):
if not isinstance(nullable, list):
nullable = [nullable for _ in column_names]
Expand All @@ -338,6 +339,7 @@ def __init__(
self.column_types = column_types
self.nullable = nullable
self.cap_varchar_max = cap_varchar_max
self.autoincrement = False if autoincrement is None else autoincrement


class ChangeColumnNullable(DDLElement):
Expand Down
9 changes: 3 additions & 6 deletions src/pydiverse/pipedag/backend/table/sql/dialects/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def _execute_materialize(

# Create empty table with correct schema
cls._dialect_create_empty_table(store, df, table, schema, dtypes)
store.add_indexes_and_set_nullable(
store.add_indexes_and_set_nullable_and_set_autoincrement(
table, schema, on_empty_table=True, table_cols=df.columns
)

Expand All @@ -98,11 +98,8 @@ def _execute_materialize(
with duckdb.connect(connection_uri) as conn:
conn.execute(f"INSERT INTO {schema_name}.{table_name} SELECT * FROM df")

store.add_indexes_and_set_nullable(
table,
schema,
on_empty_table=False,
table_cols=df.columns,
store.add_indexes_and_set_nullable_and_set_autoincrement(
table, schema, on_empty_table=False, table_cols=df.columns
)


Expand Down
5 changes: 3 additions & 2 deletions src/pydiverse/pipedag/backend/table/sql/dialects/ibm_db2.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,16 @@ def get_forced_nullability_columns(
]
return nullable_cols, non_nullable_cols

def add_indexes_and_set_nullable(
def add_indexes_and_set_nullable_and_set_autoincrement(
self,
table: Table,
schema: Schema,
*,
on_empty_table: bool | None = None,
table_cols: Iterable[str] | None = None,
enable_identity_insert: bool | None = None,
):
super().add_indexes_and_set_nullable(
super().add_indexes_and_set_nullable_and_set_autoincrement(
table, schema, on_empty_table=on_empty_table, table_cols=table_cols
)
table_name = self.engine.dialect.identifier_preparer.quote(table.name)
Expand Down
102 changes: 101 additions & 1 deletion src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import re
from collections.abc import Iterable
from dataclasses import dataclass
from typing import Any

import pandas as pd
Expand All @@ -22,6 +23,26 @@
from pydiverse.pipedag.backend.table.util import DType
from pydiverse.pipedag.materialize import Table
from pydiverse.pipedag.materialize.container import RawSql
from pydiverse.pipedag.materialize.details import (
BaseMaterializationDetails,
resolve_materialization_details_label,
)


@dataclass(frozen=True)
class MSSQLMaterializationDetails(BaseMaterializationDetails):
"""
:param identity_insert: Allows explicit values to be inserted
into the identity column of a table.

.. _identity_insert:
https://learn.microsoft.com/en-us/sql/t-sql/statements/set-identity-insert-transact-sql?view=sql-server-ver16
"""

def __post_init__(self):
assert isinstance(self.identity_insert, bool)

identity_insert: bool = False


class MSSqlTableStore(SQLTableStore):
Expand Down Expand Up @@ -127,13 +148,14 @@ def get_forced_nullability_columns(
# the list of nullable columns as well
return self._process_table_nullable_parameters(table, table_cols)

def add_indexes_and_set_nullable(
def add_indexes_and_set_nullable_and_set_autoincrement(
self,
table: Table,
schema: Schema,
*,
on_empty_table: bool | None = None,
table_cols: Iterable[str] | None = None,
enable_identity_insert: bool | None = None,
):
if on_empty_table is None or on_empty_table:
# Set non-nullable and primary key on empty table
Expand Down Expand Up @@ -164,6 +186,7 @@ def add_indexes_and_set_nullable(
nullable_cols,
sql_types,
nullable=True,
autoincrement=enable_identity_insert,
)
)
sql_types = [types[col] for col in non_nullable_cols]
Expand All @@ -175,6 +198,7 @@ def add_indexes_and_set_nullable(
non_nullable_cols,
sql_types,
nullable=False,
autoincrement=enable_identity_insert,
)
)
if len(key_columns) > 0:
Expand All @@ -189,8 +213,10 @@ def add_indexes_and_set_nullable(
sql_types,
nullable=False,
cap_varchar_max=1024,
autoincrement=enable_identity_insert,
)
)

self.add_table_primary_key(table, schema)
if on_empty_table is None or not on_empty_table:
self.add_table_indexes(table, schema)
Expand Down Expand Up @@ -297,6 +323,28 @@ def resolve_alias(self, table: Table, stage_name: str):
table_name, schema = super().resolve_alias(table, stage_name)
return PipedagMSSqlReflection.resolve_alias(self.engine, table_name, schema)

def _set_materialization_details(
self, materialization_details: dict[str, dict[str | list[str]]] | None
) -> None:
self.materialization_details = (
MSSQLMaterializationDetails.create_materialization_details_dict(
materialization_details,
self.strict_materialization_details,
self.default_materialization_details,
self.logger,
)
)

def get_identity_insert(self, materialization_details_label: str | None) -> bool:
return MSSQLMaterializationDetails.get_attribute_from_dict(
self.materialization_details,
materialization_details_label,
self.default_materialization_details,
"identity_insert",
self.strict_materialization_details,
self.logger,
)


@MSSqlTableStore.register_table(pd)
class PandasTableHook(PandasTableHook):
Expand All @@ -311,6 +359,58 @@ def _get_dialect_dtypes(cls, dtypes: dict[str, DType], table: Table[pd.DataFrame
}
)

@classmethod
def _execute_materialize(
cls,
df: pd.DataFrame,
store: MSSqlTableStore,
table: Table[pd.DataFrame],
schema: Schema,
dtypes: dict[str, DType],
):
dtypes = cls._get_dialect_dtypes(dtypes, table)
if table.type_map:
dtypes.update(table.type_map)

store.check_materialization_details_supported(
resolve_materialization_details_label(table)
)

enable_identity_insert = store.get_identity_insert(
resolve_materialization_details_label(table)
)

if early := store.dialect_requests_empty_creation(table, is_sql=False):
cls._dialect_create_empty_table(store, df, table, schema, dtypes)
store.add_indexes_and_set_nullable_and_set_autoincrement(
table,
schema,
on_empty_table=True,
table_cols=df.columns,
enable_identity_insert=enable_identity_insert,
)

with store.engine_connect() as conn:
with conn.begin():
if early:
store.lock_table(table, schema, conn)
df.to_sql(
table.name,
conn,
schema=schema.get(),
index=False,
dtype=dtypes,
chunksize=100_000,
if_exists="append" if early else "fail",
)
store.add_indexes_and_set_nullable_and_set_autoincrement(
table,
schema,
on_empty_table=False if early else None,
table_cols=df.columns,
enable_identity_insert=enable_identity_insert,
)


try:
import ibis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def _execute_materialize(

# Create empty table
cls._dialect_create_empty_table(store, df, table, schema, dtypes)
store.add_indexes_and_set_nullable(
store.add_indexes_and_set_nullable_and_set_autoincrement(
table, schema, on_empty_table=True, table_cols=df.columns
)

Expand Down
14 changes: 9 additions & 5 deletions src/pydiverse/pipedag/backend/table/sql/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ def materialize(
suffix=suffix,
)
)
store.add_indexes_and_set_nullable(table, schema, on_empty_table=True)
store.add_indexes_and_set_nullable_and_set_autoincrement(
table, schema, on_empty_table=True
)
statements = store.lock_table(table, schema)
statements += store.lock_source_tables(source_tables)
statements += [
Expand All @@ -113,7 +115,9 @@ def materialize(
statements,
truncate_printed_select=True,
)
store.add_indexes_and_set_nullable(table, schema, on_empty_table=False)
store.add_indexes_and_set_nullable_and_set_autoincrement(
table, schema, on_empty_table=False
)
else:
statements = store.lock_source_tables(source_tables)
statements += [
Expand All @@ -126,7 +130,7 @@ def materialize(
)
]
store.execute(statements)
store.add_indexes_and_set_nullable(table, schema)
store.add_indexes_and_set_nullable_and_set_autoincrement(table, schema)

@classmethod
def retrieve(
Expand Down Expand Up @@ -323,7 +327,7 @@ def _execute_materialize(

if early := store.dialect_requests_empty_creation(table, is_sql=False):
cls._dialect_create_empty_table(store, df, table, schema, dtypes)
store.add_indexes_and_set_nullable(
store.add_indexes_and_set_nullable_and_set_autoincrement(
table, schema, on_empty_table=True, table_cols=df.columns
)

Expand All @@ -340,7 +344,7 @@ def _execute_materialize(
chunksize=100_000,
if_exists="append" if early else "fail",
)
store.add_indexes_and_set_nullable(
store.add_indexes_and_set_nullable_and_set_autoincrement(
table,
schema,
on_empty_table=False if early else None,
Expand Down
3 changes: 2 additions & 1 deletion src/pydiverse/pipedag/backend/table/sql/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,13 +677,14 @@ def dialect_requests_empty_creation(self, table: Table, is_sql: bool) -> bool:
_ = is_sql
return table.nullable is not None or table.non_nullable is not None

def add_indexes_and_set_nullable(
def add_indexes_and_set_nullable_and_set_autoincrement(
self,
table: Table,
schema: Schema,
*,
on_empty_table: bool | None = None,
table_cols: Iterable[str] | None = None,
enable_identity_insert: bool | None = None,
):
if on_empty_table is None or on_empty_table:
# By default, we set non-nullable on empty table
Expand Down
2 changes: 2 additions & 0 deletions tests/fixtures/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"postgres_unlogged": pytest.mark.postgres,
"mssql": pytest.mark.mssql,
"mssql_pytsql": pytest.mark.mssql,
"mssql_materialization_details": pytest.mark.mssql,
"ibm_db2": pytest.mark.ibm_db2,
"ibm_db2_avoid_schema": pytest.mark.ibm_db2,
"ibm_db2_materialization_details": pytest.mark.ibm_db2,
Expand Down Expand Up @@ -54,6 +55,7 @@
"postgres_unlogged",
"mssql",
"mssql_pytsql",
"mssql_materialization_details",
"ibm_db2",
"ibm_db2_avoid_schema",
"ibm_db2_materialization_details",
Expand Down
24 changes: 24 additions & 0 deletions tests/test_identity_insert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

from pydiverse.pipedag import Flow, Stage

# Parameterize all tests in this file with several instance_id configurations
from tests.fixtures.instances import (
DATABASE_INSTANCES,
skip_instances,
with_instances,
)
from tests.util import tasks_library as m

pytestmark = [with_instances(DATABASE_INSTANCES)]


@with_instances(DATABASE_INSTANCES, "mssql_materialization_details")
@skip_instances("ibm_db2", "postgres", "duckdb")
def test_identity_insert():
with Flow("flow") as f:
with Stage("stage"):
_ = m.simple_dataframe()
_ = m.simple_identity_insert()

assert f.run().successful
13 changes: 13 additions & 0 deletions tests/util/tasks_library.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,19 @@ def simple_table_default_compressed():
)


@materialize(lazy=True, version="1.0.0")
def simple_identity_insert():
df = pd.DataFrame(
{
"col1": [0, 1, 2, 3],
"col2": ["0", "1", "2", "3"],
}
)
return Table(
df, name="identity_insert_on", materialization_details="with_identity_insert"
)


@materialize(version="1.0")
def pd_dataframe(data: dict[str, list]):
return pd.DataFrame(data)
Expand Down