Skip to content

Commit e366407

Browse files
committed
feat: update ai_list_files schema and upgrade opendal
1 parent a74dcf5 commit e366407

File tree

5 files changed

+59
-31
lines changed

5 files changed

+59
-31
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ uv run databend-aiserver --port 8815
2323

2424
```sql
2525
CREATE OR REPLACE FUNCTION ai_list_files(stage_location STAGE_LOCATION, max_files INT)
26-
RETURNS TABLE (stage_name VARCHAR, relative_path VARCHAR, path VARCHAR, is_dir BOOLEAN, size BIGINT, mode VARCHAR, content_type VARCHAR, etag VARCHAR, truncated BOOLEAN)
26+
RETURNS TABLE (stage_name VARCHAR, path VARCHAR, fullpath VARCHAR, size UINT64, last_modified VARCHAR, etag VARCHAR, content_type VARCHAR)
2727
LANGUAGE PYTHON HANDLER = 'ai_list_files' ADDRESS = '<your-ai-server-address>';
2828

2929
CREATE OR REPLACE FUNCTION ai_embed_1024(text VARCHAR)

databend_aiserver/stages/operator.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,19 @@ def _build_memory_options(_: Mapping[str, Any]) -> Dict[str, Any]:
134134
return {}
135135

136136

137-
_STORAGE_BUILDERS: Dict[str, Any] = {"s3": _build_s3_options, "memory": _build_memory_options}
137+
def _build_fs_options(storage: Mapping[str, Any]) -> Dict[str, Any]:
138+
# Useful for local testing with real files.
139+
root = _first_present(storage, "root")
140+
if not root:
141+
raise StageConfigurationError("FS stage is missing root configuration")
142+
return {"root": root}
143+
144+
145+
_STORAGE_BUILDERS: Dict[str, Any] = {
146+
"s3": _build_s3_options,
147+
"memory": _build_memory_options,
148+
"fs": _build_fs_options,
149+
}
138150

139151

140152
def _cache_key(stage: StageLocation) -> str:
@@ -251,23 +263,30 @@ def stage_file_suffix(path: str) -> str:
251263
return Path(path).suffix or ".bin"
252264

253265

254-
def resolve_full_path(stage_location: StageLocation, path: str) -> str:
266+
def resolve_storage_uri(stage_location: StageLocation, path: str) -> str:
255267
"""
256-
Resolve the full path (URI) of a file in the stage.
257-
Useful for returning the full S3 path in metadata.
268+
Resolve the full URI of a path relative to the storage root.
258269
"""
259-
resolved_path = resolve_stage_subpath(stage_location, path)
260270
storage = stage_location.storage or {}
261271
storage_root = str(storage.get("root", "") or "")
262272
bucket = storage.get("bucket") or storage.get("name")
263273

264274
if storage_root.startswith("s3://"):
265275
base = storage_root.rstrip("/")
266-
return f"{base}/{resolved_path}"
276+
return f"{base}/{path}"
267277
elif bucket:
268278
base = f"s3://{bucket}"
269279
if storage_root:
270280
base = f"{base}/{storage_root.strip('/')}"
271-
return f"{base}/{resolved_path}"
281+
return f"{base}/{path}"
272282

273-
return resolved_path or path
283+
return path
284+
285+
286+
def resolve_full_path(stage_location: StageLocation, path: str) -> str:
287+
"""
288+
Resolve the full path (URI) of a file in the stage.
289+
Useful for returning the full S3 path in metadata.
290+
"""
291+
resolved_path = resolve_stage_subpath(stage_location, path)
292+
return resolve_storage_uri(stage_location, resolved_path)

databend_aiserver/udfs/stage.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
from typing import Any, Dict, Iterable, List, Optional
2222

2323
from databend_udf import StageLocation, udf
24-
from opendal import exceptions as opendal_exceptions
25-
2624
from databend_aiserver.stages.operator import (
2725
StageConfigurationError,
2826
get_operator,
2927
resolve_stage_subpath,
28+
resolve_storage_uri,
3029
)
30+
from opendal import exceptions as opendal_exceptions
3131

3232

3333
def _format_last_modified(value: Any) -> Optional[str]:
@@ -119,14 +119,12 @@ def _collect_stage_files(
119119
input_types=["INT"],
120120
result_type=[
121121
("stage_name", "VARCHAR"),
122-
("relative_path", "VARCHAR"),
123122
("path", "VARCHAR"),
124-
("is_dir", "BOOLEAN"),
125-
("size", "BIGINT"),
126-
("mode", "VARCHAR"),
127-
("content_type", "VARCHAR"),
123+
("fullpath", "VARCHAR"),
124+
("size", "UINT64"),
125+
("last_modified", "VARCHAR"),
128126
("etag", "VARCHAR"),
129-
("last_modified", "TIMESTAMP"),
127+
("content_type", "VARCHAR"),
130128
],
131129
name="ai_list_files",
132130
)
@@ -170,22 +168,19 @@ def ai_list_files(
170168
metadata = op.stat(entry.path)
171169
# Check if directory using mode (opendal.Metadata doesn't have is_dir())
172170
# Mode for directories typically has specific bits set, or path ends with /
173-
is_dir = entry.path.endswith('/')
174171

175172
# Convert mode to string if it exists, otherwise None
176-
mode_str = str(metadata.mode) if metadata.mode is not None else None
177-
last_modified = _format_last_modified(getattr(metadata, "last_modified", None))
178173

179174
yield {
180175
"stage_name": stage_location.stage_name,
181-
"relative_path": stage_location.relative_path,
182176
"path": entry.path,
183-
"is_dir": is_dir,
177+
"fullpath": resolve_storage_uri(stage_location, entry.path),
184178
"size": metadata.content_length,
185-
"mode": mode_str,
186-
"content_type": metadata.content_type,
179+
"last_modified": _format_last_modified(
180+
getattr(metadata, "last_modified", None)
181+
),
187182
"etag": metadata.etag,
188-
"last_modified": last_modified,
183+
"content_type": metadata.content_type,
189184
}
190185

191186
except Exception as e:

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ requires-python = ">=3.10"
1111
authors = [{ name = "Databend Labs" }]
1212
dependencies = [
1313
"databend-udf==0.2.18",
14-
"opendal==0.45.1",
14+
"opendal>=0.46.0",
1515
"pypdf==4.0.0",
1616
"python-docx==1.2.0",
1717
"uvicorn>=0.29.0",

tests/integration/test_listing_integration.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import pytest
1516
from databend_udf.client import UDFClient
1617

1718
from tests.integration.conftest import build_stage_mapping
@@ -42,18 +43,31 @@ def test_list_stage_files_content(running_server, memory_stage):
4243
def test_list_stage_files_metadata(running_server, memory_stage):
4344
rows = _get_listing(running_server, memory_stage)
4445
assert {row["stage_name"] for row in rows} == {memory_stage.stage_name}
45-
assert {row["relative_path"] for row in rows} == {memory_stage.relative_path}
46+
# Check for fullpath instead of relative_path
47+
# Memory stage fullpath might be just the path if no bucket/root
48+
assert all("fullpath" in row for row in rows)
49+
assert all(row["fullpath"].endswith(row["path"]) for row in rows)
50+
# Check that last_modified key exists (value might be None for memory backend)
51+
assert all("last_modified" in row for row in rows)
4652

4753

4854
def test_list_stage_files_schema(running_server, memory_stage):
4955
rows = _get_listing(running_server, memory_stage)
5056
for row in rows:
51-
assert "is_dir" in row
57+
assert "path" in row
58+
assert "fullpath" in row
5259
assert "size" in row
53-
assert "mode" in row
54-
assert "content_type" in row # May be None
55-
assert "etag" in row # May be None
5660
assert "last_modified" in row
61+
assert "etag" in row # May be None
62+
assert "content_type" in row # May be None
63+
64+
# Verify order implicitly by checking keys list if needed,
65+
# but for now just existence is enough as dicts are ordered in Python 3.7+
66+
keys = list(row.keys())
67+
# Expected keys: stage_name, path, fullpath, size, last_modified, etag, content_type
68+
# Note: stage_name is added by _get_listing or the UDF logic, let's check the core ones
69+
assert keys.index("path") < keys.index("fullpath")
70+
assert keys.index("last_modified") < keys.index("etag")
5771

5872

5973
def test_list_stage_files_truncation(running_server, memory_stage):

0 commit comments

Comments
 (0)