Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ simplejson==3.20.1
flask==3.1.1
pyyaml==6.0.2
pyiceberg[sql-sqlite,pyarrow]==0.8.1
deltalake==0.25.5
deltalake==1.1.3
azure-servicebus==7.14.2
45 changes: 32 additions & 13 deletions servc/svc/com/storage/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,29 @@ def __init__(self, config: Config, table: LakeTable):
"aws_conditional_put": "etag",
}

def _connect(self):
def _connect(self) -> bool | None:
if self.isOpen:
return None

tablename = self._get_table_name()
uri = os.path.join(self._location_prefix, tablename)
self._conn = DeltaTable.create(
DeltaTable.create(
table_uri=uri,
name=tablename,
schema=self._table["schema"],
partition_by=self._table["partitions"],
mode="ignore",
storage_options=self._storageOptions,
)
# Now load the created table
self._conn = DeltaTable(uri, storage_options=self._storageOptions)
Comment on lines +60 to +68
Copy link
Preview

Copilot AI Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The pattern of creating a table and then immediately loading it seems inefficient. Consider checking if the DeltaTable.create() method in version 1.1.3 returns a DeltaTable instance that can be used directly, which would eliminate the need for the separate instantiation step.

Copilot uses AI. Check for mistakes.


return super()._connect()

def optimize(self):
table = self.getConn()

print("Optimizing", self._get_table_name(), flush=True)
table.optimize.compact()
table.optimize().compact()
table.vacuum()
table.cleanup_metadata()
table.create_checkpoint()
Expand All @@ -81,7 +82,7 @@ def getPartitions(self) -> Dict[str, List[Any]] | None:
table = self.getConn()

partitions: Dict[str, List[Any]] = {}
for obj in table.partitions():
for obj in table.get_partitions():
for key, value in obj.items():
if key not in partitions:
partitions[key] = []
Expand All @@ -92,7 +93,7 @@ def getPartitions(self) -> Dict[str, List[Any]] | None:

def getCurrentVersion(self) -> str | None:
table = self.getConn()
return str(table.version())
return str(table.get_version())

def getVersions(self) -> List[str] | None:
return [str(self.getCurrentVersion())]
Expand All @@ -107,6 +108,25 @@ def insert(self, data: List[Any]) -> bool:
)
return True

def _escape_value(self, val: Any) -> str:
"""Properly escape values for predicate building to prevent injection issues."""
if isinstance(val, str):
# Escape single quotes in strings and wrap in quotes
return f"'{val.replace(\"'\", \"''\")}"
elif isinstance(val, (int, float)):
return str(val)
elif isinstance(val, bool):
return str(val).lower()
elif val is None:
return "null"
elif isinstance(val, list):
# Handle list values for 'in' operations
escaped_items = [self._escape_value(item) for item in val]
return f"({', '.join(escaped_items)})"
else:
# Fallback to string representation with quotes
return f"'{str(val).replace(\"'\", \"''\")}"

def _filters(
self,
partitions: Dict[str, List[Any]] | None = None,
Expand All @@ -129,15 +149,14 @@ def overwrite(
predicate: str | None = None
filter = self._filters(partitions)
if filter is not None:
predicate = " & ".join([" ".join(x) for x in filter])
predicate = " & ".join([f"{col} {op} {self._escape_value(val)}" for col, op, val in filter])

write_deltalake(
table,
data=pa.Table.from_pylist(data, self.getSchema()),
storage_options=self._storageOptions,
mode="overwrite",
predicate=predicate,
engine="rust",
)
return True

Expand All @@ -150,7 +169,7 @@ def readRaw(
) -> Table:
table = self.getConn()
if version is not None:
table.load_as_version(int(version))
table.load_version(int(version))

if options is None or not isinstance(options, dict):
options = {}
Expand All @@ -159,14 +178,14 @@ def readRaw(

if options.get("filter", None) is not None:
return table.to_pyarrow_dataset(
partitions=self._filters(partitions),
filters=self._filters(partitions),
).to_table(
filter=options.get("filter"),
columns=rcolumns,
)
return table.to_pyarrow_table(
columns=rcolumns,
partitions=self._filters(partitions),
filters=self._filters(partitions),
)

def read(
Expand All @@ -181,9 +200,9 @@ def read(
def getSchema(self) -> Schema | None:
table = self.getConn()

return table.schema().to_pyarrow()
return table.schema.to_pyarrow()

def _close(self):
def _close(self) -> bool:
if self._isOpen:
self._isReady = False
self._isOpen = False
Expand Down
Loading