diff --git a/requirements.txt b/requirements.txt index 7ff1cef..c804cb8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,5 +5,5 @@ simplejson==3.20.1 flask==3.1.1 pyyaml==6.0.2 pyiceberg[sql-sqlite,pyarrow]==0.8.1 -deltalake==0.25.5 +deltalake==1.1.3 azure-servicebus==7.14.2 diff --git a/servc/svc/com/storage/delta.py b/servc/svc/com/storage/delta.py index c2c4f40..4786a6b 100644 --- a/servc/svc/com/storage/delta.py +++ b/servc/svc/com/storage/delta.py @@ -51,20 +51,21 @@ def __init__(self, config: Config, table: LakeTable): "aws_conditional_put": "etag", } - def _connect(self): + def _connect(self) -> bool | None: if self.isOpen: return None tablename = self._get_table_name() uri = os.path.join(self._location_prefix, tablename) - self._conn = DeltaTable.create( + DeltaTable.create( table_uri=uri, - name=tablename, schema=self._table["schema"], partition_by=self._table["partitions"], mode="ignore", storage_options=self._storageOptions, ) + # Now load the created table + self._conn = DeltaTable(uri, storage_options=self._storageOptions) return super()._connect() @@ -72,7 +73,7 @@ def optimize(self): table = self.getConn() print("Optimizing", self._get_table_name(), flush=True) - table.optimize.compact() + table.optimize().compact() table.vacuum() table.cleanup_metadata() table.create_checkpoint() @@ -81,7 +82,7 @@ def getPartitions(self) -> Dict[str, List[Any]] | None: table = self.getConn() partitions: Dict[str, List[Any]] = {} - for obj in table.partitions(): + for obj in table.get_partitions(): for key, value in obj.items(): if key not in partitions: partitions[key] = [] @@ -92,7 +93,7 @@ def getPartitions(self) -> Dict[str, List[Any]] | None: def getCurrentVersion(self) -> str | None: table = self.getConn() - return str(table.version()) + return str(table.get_version()) def getVersions(self) -> List[str] | None: return [str(self.getCurrentVersion())] @@ -107,6 +108,25 @@ def insert(self, data: List[Any]) -> bool: ) return True + def _escape_value(self, val: Any) -> str: + """Properly escape values for predicate building to prevent injection issues.""" + if isinstance(val, str): + # Escape single quotes in strings and wrap in quotes + return f"'{val.replace(\"'\", \"''\")}" + elif isinstance(val, (int, float)): + return str(val) + elif isinstance(val, bool): + return str(val).lower() + elif val is None: + return "null" + elif isinstance(val, list): + # Handle list values for 'in' operations + escaped_items = [self._escape_value(item) for item in val] + return f"({', '.join(escaped_items)})" + else: + # Fallback to string representation with quotes + return f"'{str(val).replace(\"'\", \"''\")}" + def _filters( self, partitions: Dict[str, List[Any]] | None = None, @@ -129,7 +149,7 @@ def overwrite( predicate: str | None = None filter = self._filters(partitions) if filter is not None: - predicate = " & ".join([" ".join(x) for x in filter]) + predicate = " & ".join([f"{col} {op} {self._escape_value(val)}" for col, op, val in filter]) write_deltalake( table, @@ -137,7 +157,6 @@ def overwrite( storage_options=self._storageOptions, mode="overwrite", predicate=predicate, - engine="rust", ) return True @@ -150,7 +169,7 @@ def readRaw( ) -> Table: table = self.getConn() if version is not None: - table.load_as_version(int(version)) + table.load_version(int(version)) if options is None or not isinstance(options, dict): options = {} @@ -159,14 +178,14 @@ def readRaw( if options.get("filter", None) is not None: return table.to_pyarrow_dataset( - partitions=self._filters(partitions), + filters=self._filters(partitions), ).to_table( filter=options.get("filter"), columns=rcolumns, ) return table.to_pyarrow_table( columns=rcolumns, - partitions=self._filters(partitions), + filters=self._filters(partitions), ) def read( @@ -181,9 +200,9 @@ def read( def getSchema(self) -> Schema | None: table = self.getConn() - return table.schema().to_pyarrow() + return table.schema.to_pyarrow() - def _close(self): + def _close(self) -> bool: if self._isOpen: self._isReady = False self._isOpen = False