From d57618aaab3e026ad7135c8ec86fb32694235f67 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <209825114+claude[bot]@users.noreply.github.com> Date: Thu, 31 Jul 2025 13:10:26 +0000 Subject: [PATCH 1/3] fix: update delta.py for deltalake 1.1.3 API compatibility - Update DeltaTable.create() method signature and instantiation - Change optimize.compact() to optimize().compact() - Replace partitions() with get_partitions() - Replace version() with get_version() - Replace load_as_version() with load_version() - Remove deprecated engine=''rust'' parameter from write_deltalake() - Change ''partitions='' to ''filters='' in to_pyarrow methods - Update schema().to_pyarrow() to schema.to_pyarrow() - Fix string concatenation in predicate building for type safety - Add Optional import for better type annotation support Fixes type checking issues after deltalake upgrade from 0.25.5 to 1.1.3 Co-authored-by: Yusuf Ali --- requirements.txt | 2 +- servc/svc/com/storage/delta.py | 24 ++++++++++++------------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/requirements.txt b/requirements.txt index 7ff1cef..c804cb8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,5 +5,5 @@ simplejson==3.20.1 flask==3.1.1 pyyaml==6.0.2 pyiceberg[sql-sqlite,pyarrow]==0.8.1 -deltalake==0.25.5 +deltalake==1.1.3 azure-servicebus==7.14.2 diff --git a/servc/svc/com/storage/delta.py b/servc/svc/com/storage/delta.py index c2c4f40..0472a48 100644 --- a/servc/svc/com/storage/delta.py +++ b/servc/svc/com/storage/delta.py @@ -1,5 +1,5 @@ import os -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Optional, Tuple import pyarrow as pa from deltalake import DeltaTable, write_deltalake @@ -57,14 +57,15 @@ def _connect(self): tablename = self._get_table_name() uri = os.path.join(self._location_prefix, tablename) - self._conn = DeltaTable.create( + DeltaTable.create( table_uri=uri, - name=tablename, schema=self._table["schema"], partition_by=self._table["partitions"], mode="ignore", storage_options=self._storageOptions, ) + # Now load the created table + self._conn = DeltaTable(uri, storage_options=self._storageOptions) return super()._connect() @@ -72,7 +73,7 @@ def optimize(self): table = self.getConn() print("Optimizing", self._get_table_name(), flush=True) - table.optimize.compact() + table.optimize().compact() table.vacuum() table.cleanup_metadata() table.create_checkpoint() @@ -81,7 +82,7 @@ def getPartitions(self) -> Dict[str, List[Any]] | None: table = self.getConn() partitions: Dict[str, List[Any]] = {} - for obj in table.partitions(): + for obj in table.get_partitions(): for key, value in obj.items(): if key not in partitions: partitions[key] = [] @@ -92,7 +93,7 @@ def getPartitions(self) -> Dict[str, List[Any]] | None: def getCurrentVersion(self) -> str | None: table = self.getConn() - return str(table.version()) + return str(table.get_version()) def getVersions(self) -> List[str] | None: return [str(self.getCurrentVersion())] @@ -129,7 +130,7 @@ def overwrite( predicate: str | None = None filter = self._filters(partitions) if filter is not None: - predicate = " & ".join([" ".join(x) for x in filter]) + predicate = " & ".join([f"{col} {op} {str(val)}" for col, op, val in filter]) write_deltalake( table, @@ -137,7 +138,6 @@ def overwrite( storage_options=self._storageOptions, mode="overwrite", predicate=predicate, - engine="rust", ) return True @@ -150,7 +150,7 @@ def readRaw( ) -> Table: table = self.getConn() if version is not None: - table.load_as_version(int(version)) + table.load_version(int(version)) if options is None or not isinstance(options, dict): options = {} @@ -159,14 +159,14 @@ def readRaw( if options.get("filter", None) is not None: return table.to_pyarrow_dataset( - partitions=self._filters(partitions), + filters=self._filters(partitions), ).to_table( filter=options.get("filter"), columns=rcolumns, ) return table.to_pyarrow_table( columns=rcolumns, - partitions=self._filters(partitions), + filters=self._filters(partitions), ) def read( @@ -181,7 +181,7 @@ def read( def getSchema(self) -> Schema | None: table = self.getConn() - return table.schema().to_pyarrow() + return table.schema.to_pyarrow() def _close(self): if self._isOpen: From ed0d1940c788755b7b2f235ab40096d6a6167309 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <209825114+claude[bot]@users.noreply.github.com> Date: Thu, 31 Jul 2025 13:16:42 +0000 Subject: [PATCH 2/3] fix: improve type safety and address security concerns in delta.py - Replace string concatenation with proper value escaping to prevent injection issues - Add _escape_value method for safe predicate building - Standardize type annotations to use Optional instead of | None syntax - Fix all typing issues for deltalake 1.1.3 compatibility Co-authored-by: Yusuf Ali --- servc/svc/com/storage/delta.py | 49 +++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/servc/svc/com/storage/delta.py b/servc/svc/com/storage/delta.py index 0472a48..116ac59 100644 --- a/servc/svc/com/storage/delta.py +++ b/servc/svc/com/storage/delta.py @@ -78,7 +78,7 @@ def optimize(self): table.cleanup_metadata() table.create_checkpoint() - def getPartitions(self) -> Dict[str, List[Any]] | None: + def getPartitions(self) -> Optional[Dict[str, List[Any]]]: table = self.getConn() partitions: Dict[str, List[Any]] = {} @@ -91,11 +91,11 @@ def getPartitions(self) -> Dict[str, List[Any]] | None: return partitions - def getCurrentVersion(self) -> str | None: + def getCurrentVersion(self) -> Optional[str]: table = self.getConn() return str(table.get_version()) - def getVersions(self) -> List[str] | None: + def getVersions(self) -> Optional[List[str]]: return [str(self.getCurrentVersion())] def insert(self, data: List[Any]) -> bool: @@ -108,10 +108,29 @@ def insert(self, data: List[Any]) -> bool: ) return True + def _escape_value(self, val: Any) -> str: + """Properly escape values for predicate building to prevent injection issues.""" + if isinstance(val, str): + # Escape single quotes in strings and wrap in quotes + return f"'{val.replace(\"'\", \"''\")}" + elif isinstance(val, (int, float)): + return str(val) + elif isinstance(val, bool): + return str(val).lower() + elif val is None: + return "null" + elif isinstance(val, list): + # Handle list values for 'in' operations + escaped_items = [self._escape_value(item) for item in val] + return f"({', '.join(escaped_items)})" + else: + # Fallback to string representation with quotes + return f"'{str(val).replace(\"'\", \"''\")}" + def _filters( self, - partitions: Dict[str, List[Any]] | None = None, - ) -> List[Tuple[str, str, Any]] | None: + partitions: Optional[Dict[str, List[Any]]] = None, + ) -> Optional[List[Tuple[str, str, Any]]]: filters: List[Tuple[str, str, Any]] = [] if partitions is None: return None @@ -123,14 +142,14 @@ def _filters( return filters if len(filters) > 0 else None def overwrite( - self, data: List[Any], partitions: Dict[str, List[Any]] | None = None + self, data: List[Any], partitions: Optional[Dict[str, List[Any]]] = None ) -> bool: table = self.getConn() - predicate: str | None = None + predicate: Optional[str] = None filter = self._filters(partitions) if filter is not None: - predicate = " & ".join([f"{col} {op} {str(val)}" for col, op, val in filter]) + predicate = " & ".join([f"{col} {op} {self._escape_value(val)}" for col, op, val in filter]) write_deltalake( table, @@ -144,9 +163,9 @@ def overwrite( def readRaw( self, columns: List[str], - partitions: Dict[str, List[Any]] | None = None, - version: str | None = None, - options: Any | None = None, + partitions: Optional[Dict[str, List[Any]]] = None, + version: Optional[str] = None, + options: Optional[Any] = None, ) -> Table: table = self.getConn() if version is not None: @@ -172,13 +191,13 @@ def readRaw( def read( self, columns: List[str], - partitions: Dict[str, List[Any]] | None = None, - version: str | None = None, - options: Any | None = None, + partitions: Optional[Dict[str, List[Any]]] = None, + version: Optional[str] = None, + options: Optional[Any] = None, ) -> Table: return self.readRaw(columns, partitions, version, options) - def getSchema(self) -> Schema | None: + def getSchema(self) -> Optional[Schema]: table = self.getConn() return table.schema.to_pyarrow() From 415068b03f261d4f663210e6ebd741e6d9ef96b4 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <209825114+claude[bot]@users.noreply.github.com> Date: Thu, 31 Jul 2025 14:23:36 +0000 Subject: [PATCH 3/3] fix: standardize type annotations in delta.py for mypy compatibility - Replace Optional[T] with T | None syntax for Python 3.11+ compatibility - Add missing return type annotations for _connect() and _close() methods - Ensure consistency with parent Lake class type annotations - All union types now use modern syntax compatible with mypy strict checking Co-authored-by: Yusuf Ali --- servc/svc/com/storage/delta.py | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/servc/svc/com/storage/delta.py b/servc/svc/com/storage/delta.py index 116ac59..4786a6b 100644 --- a/servc/svc/com/storage/delta.py +++ b/servc/svc/com/storage/delta.py @@ -1,5 +1,5 @@ import os -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Tuple import pyarrow as pa from deltalake import DeltaTable, write_deltalake @@ -51,7 +51,7 @@ def __init__(self, config: Config, table: LakeTable): "aws_conditional_put": "etag", } - def _connect(self): + def _connect(self) -> bool | None: if self.isOpen: return None @@ -78,7 +78,7 @@ def optimize(self): table.cleanup_metadata() table.create_checkpoint() - def getPartitions(self) -> Optional[Dict[str, List[Any]]]: + def getPartitions(self) -> Dict[str, List[Any]] | None: table = self.getConn() partitions: Dict[str, List[Any]] = {} @@ -91,11 +91,11 @@ def getPartitions(self) -> Optional[Dict[str, List[Any]]]: return partitions - def getCurrentVersion(self) -> Optional[str]: + def getCurrentVersion(self) -> str | None: table = self.getConn() return str(table.get_version()) - def getVersions(self) -> Optional[List[str]]: + def getVersions(self) -> List[str] | None: return [str(self.getCurrentVersion())] def insert(self, data: List[Any]) -> bool: @@ -129,8 +129,8 @@ def _escape_value(self, val: Any) -> str: def _filters( self, - partitions: Optional[Dict[str, List[Any]]] = None, - ) -> Optional[List[Tuple[str, str, Any]]]: + partitions: Dict[str, List[Any]] | None = None, + ) -> List[Tuple[str, str, Any]] | None: filters: List[Tuple[str, str, Any]] = [] if partitions is None: return None @@ -142,11 +142,11 @@ def _filters( return filters if len(filters) > 0 else None def overwrite( - self, data: List[Any], partitions: Optional[Dict[str, List[Any]]] = None + self, data: List[Any], partitions: Dict[str, List[Any]] | None = None ) -> bool: table = self.getConn() - predicate: Optional[str] = None + predicate: str | None = None filter = self._filters(partitions) if filter is not None: predicate = " & ".join([f"{col} {op} {self._escape_value(val)}" for col, op, val in filter]) @@ -163,9 +163,9 @@ def overwrite( def readRaw( self, columns: List[str], - partitions: Optional[Dict[str, List[Any]]] = None, - version: Optional[str] = None, - options: Optional[Any] = None, + partitions: Dict[str, List[Any]] | None = None, + version: str | None = None, + options: Any | None = None, ) -> Table: table = self.getConn() if version is not None: @@ -191,18 +191,18 @@ def readRaw( def read( self, columns: List[str], - partitions: Optional[Dict[str, List[Any]]] = None, - version: Optional[str] = None, - options: Optional[Any] = None, + partitions: Dict[str, List[Any]] | None = None, + version: str | None = None, + options: Any | None = None, ) -> Table: return self.readRaw(columns, partitions, version, options) - def getSchema(self) -> Optional[Schema]: + def getSchema(self) -> Schema | None: table = self.getConn() return table.schema.to_pyarrow() - def _close(self): + def _close(self) -> bool: if self._isOpen: self._isReady = False self._isOpen = False