Skip to content

Commit 54b9c67

Browse files
claude[bot]drgroot
andcommitted
fix: update delta.py for deltalake 1.1.3 API compatibility
- Update DeltaTable.create() method signature and instantiation - Change optimize.compact() to optimize().compact() - Replace partitions() with get_partitions() - Replace version() with get_version() - Replace load_as_version() with load_version() - Remove deprecated engine=''rust'' parameter from write_deltalake() - Change ''partitions='' to ''filters='' in to_pyarrow methods - Update schema().to_pyarrow() to schema.to_pyarrow() - Fix string concatenation in predicate building for type safety - Add Optional import for better type annotation support Fixes type checking issues after deltalake upgrade from 0.25.5 to 1.1.3 Co-authored-by: Yusuf Ali <[email protected]>
1 parent c882c79 commit 54b9c67

File tree

2 files changed

+13
-13
lines changed

2 files changed

+13
-13
lines changed

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@ simplejson==3.20.1
55
flask==3.1.1
66
pyyaml==6.0.2
77
pyiceberg[sql-sqlite,pyarrow]==0.8.1
8-
deltalake==0.25.5
8+
deltalake==1.1.3
99
azure-servicebus==7.14.2

servc/svc/com/storage/delta.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import os
2-
from typing import Any, Dict, List, Tuple
2+
from typing import Any, Dict, List, Optional, Tuple
33

44
import pyarrow as pa
55
from deltalake import DeltaTable, write_deltalake
@@ -57,22 +57,23 @@ def _connect(self):
5757

5858
tablename = self._get_table_name()
5959
uri = os.path.join(self._location_prefix, tablename)
60-
self._conn = DeltaTable.create(
60+
DeltaTable.create(
6161
table_uri=uri,
62-
name=tablename,
6362
schema=self._table["schema"],
6463
partition_by=self._table["partitions"],
6564
mode="ignore",
6665
storage_options=self._storageOptions,
6766
)
67+
# Now load the created table
68+
self._conn = DeltaTable(uri, storage_options=self._storageOptions)
6869

6970
return super()._connect()
7071

7172
def optimize(self):
7273
table = self.getConn()
7374

7475
print("Optimizing", self._get_table_name(), flush=True)
75-
table.optimize.compact()
76+
table.optimize().compact()
7677
table.vacuum()
7778
table.cleanup_metadata()
7879
table.create_checkpoint()
@@ -81,7 +82,7 @@ def getPartitions(self) -> Dict[str, List[Any]] | None:
8182
table = self.getConn()
8283

8384
partitions: Dict[str, List[Any]] = {}
84-
for obj in table.partitions():
85+
for obj in table.get_partitions():
8586
for key, value in obj.items():
8687
if key not in partitions:
8788
partitions[key] = []
@@ -92,7 +93,7 @@ def getPartitions(self) -> Dict[str, List[Any]] | None:
9293

9394
def getCurrentVersion(self) -> str | None:
9495
table = self.getConn()
95-
return str(table.version())
96+
return str(table.get_version())
9697

9798
def getVersions(self) -> List[str] | None:
9899
return [str(self.getCurrentVersion())]
@@ -129,15 +130,14 @@ def overwrite(
129130
predicate: str | None = None
130131
filter = self._filters(partitions)
131132
if filter is not None:
132-
predicate = " & ".join([" ".join(x) for x in filter])
133+
predicate = " & ".join([f"{col} {op} {str(val)}" for col, op, val in filter])
133134

134135
write_deltalake(
135136
table,
136137
data=pa.Table.from_pylist(data, self.getSchema()),
137138
storage_options=self._storageOptions,
138139
mode="overwrite",
139140
predicate=predicate,
140-
engine="rust",
141141
)
142142
return True
143143

@@ -150,7 +150,7 @@ def readRaw(
150150
) -> Table:
151151
table = self.getConn()
152152
if version is not None:
153-
table.load_as_version(int(version))
153+
table.load_version(int(version))
154154

155155
if options is None or not isinstance(options, dict):
156156
options = {}
@@ -159,14 +159,14 @@ def readRaw(
159159

160160
if options.get("filter", None) is not None:
161161
return table.to_pyarrow_dataset(
162-
partitions=self._filters(partitions),
162+
filters=self._filters(partitions),
163163
).to_table(
164164
filter=options.get("filter"),
165165
columns=rcolumns,
166166
)
167167
return table.to_pyarrow_table(
168168
columns=rcolumns,
169-
partitions=self._filters(partitions),
169+
filters=self._filters(partitions),
170170
)
171171

172172
def read(
@@ -181,7 +181,7 @@ def read(
181181
def getSchema(self) -> Schema | None:
182182
table = self.getConn()
183183

184-
return table.schema().to_pyarrow()
184+
return table.schema.to_pyarrow()
185185

186186
def _close(self):
187187
if self._isOpen:

0 commit comments

Comments
 (0)