Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions caveclient/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,5 +335,9 @@
+ "/{datastack_name}/bulk/gen_skeletons/{skeleton_version}/{root_ids}",
"gen_bulk_skeletons_via_skvn_rids_as_post": skeleton_v1
+ "/{datastack_name}/bulk/gen_skeletons",
"get_cached_skeletons_bulk_as_post": skeleton_v1
+ "/{datastack_name}/bulk/get_cached_skeletons/{skeleton_version}/{output_format}",
"get_skeleton_token_as_post": skeleton_v1
+ "/{datastack_name}/bulk/get_skeleton_token/{skeleton_version}",
}
skeletonservice_api_versions = {1: skeletonservice_endpoints_v1}
230 changes: 230 additions & 0 deletions caveclient/skeletonservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io
import json
import logging
import urllib.parse
from io import BytesIO, StringIO
from timeit import default_timer
from typing import List, Literal, Optional, Union
Expand All @@ -27,6 +28,7 @@

MAX_SKELETONS_EXISTS_QUERY_SIZE = 1000
MAX_BULK_SYNCHRONOUS_SKELETONS = 10
MAX_BULK_CACHED_SKELETONS = 500 # mirrors server-side MAX_BULK_CACHED_SKELETONS
MAX_BULK_ASYNCHRONOUS_SKELETONS = 10000
BULK_SKELETONS_BATCH_SIZE = 1000

Expand Down Expand Up @@ -956,3 +958,231 @@ def generate_bulk_skeletons_async(
)

return estimated_async_time_secs_upper_bound_sum

def get_cached_skeletons_bulk(
self,
root_ids: List,
datastack_name: Optional[str] = None,
skeleton_version: Optional[int] = 4,
output_format: Literal["dict", "swc"] = "dict",
generate_missing_skeletons: bool = False,
verbose_level: Optional[int] = 0,
) -> dict:
"""Retrieve already-cached skeletons in bulk, up to MAX_BULK_CACHED_SKELETONS at a time.

Unlike get_bulk_skeletons(), this endpoint:
- Accepts up to 500 root IDs per call (vs the 10-skeleton limit of get_bulk_skeletons)
- Skips per-RID CAVEclient validation, so it never blocks on chunkedgraph network calls
- Never triggers skeleton generation inline; only returns what is already cached

Parameters
----------
root_ids : List
Root IDs to retrieve skeletons for. Truncated to MAX_BULK_CACHED_SKELETONS if longer.
datastack_name : str, optional
Datastack name. Defaults to the client's configured datastack.
skeleton_version : int, optional
Skeleton version to retrieve. Default is 4 (latest).
output_format : "dict" or "swc"
Output format for skeleton data. Default is "dict".
generate_missing_skeletons : bool
If True, RIDs not found in the cache are queued for async generation and
returned under the "async_queued" key. Default False.
verbose_level : int, optional
Verbosity level for server-side logging.

Returns
-------
dict with keys:
"skeletons" : dict mapping root_id (str) -> decoded skeleton object
"missing" : list of root IDs not found in cache (and not queued)
"async_queued": list of root IDs not in cache that were queued for generation
"""
if datastack_name is None:
datastack_name = self._datastack_name
assert datastack_name is not None
assert skeleton_version is not None

if len(root_ids) > MAX_BULK_CACHED_SKELETONS:
logging.warning(
f"The number of root_ids exceeds MAX_BULK_CACHED_SKELETONS ({MAX_BULK_CACHED_SKELETONS}). "
f"Only the first {MAX_BULK_CACHED_SKELETONS} will be requested."
)
root_ids = root_ids[:MAX_BULK_CACHED_SKELETONS]

if output_format == "dict":
server_format = "flatdict"
elif output_format == "swc":
server_format = "swccompressed"
else:
raise ValueError(f"output_format must be 'dict' or 'swc', got '{output_format}'")

endpoint_mapping = self.default_url_mapping
endpoint_mapping["datastack_name"] = datastack_name
endpoint_mapping["skeleton_version"] = skeleton_version
endpoint_mapping["output_format"] = server_format
url = self._endpoints["get_cached_skeletons_bulk_as_post"].format_map(endpoint_mapping)
url += f"?verbose_level={verbose_level}"

data = {
"root_ids": root_ids,
"generate_missing": generate_missing_skeletons,
"verbose_level": verbose_level,
}
response = self.session.post(url, json=data)
response = handle_response(response)

raw_skeletons = response.get("skeletons", {})
missing = response.get("missing", [])
async_queued = response.get("async_queued", [])

skeletons = {}
for rid, encoded in raw_skeletons.items():
try:
if output_format == "dict":
skeletons[rid] = SkeletonClient.decompressBytesToDict(
io.BytesIO(binascii.unhexlify(encoded)).getvalue()
)
elif output_format == "swc":
sk_csv = io.BytesIO(binascii.unhexlify(encoded)).getvalue().decode()
skeletons[rid] = pd.read_csv(
StringIO(sk_csv),
sep=" ",
names=["id", "type", "x", "y", "z", "radius", "parent"],
)
except Exception as e:
logging.error(f"Error decoding skeleton for root_id {rid}: {e}")

return {"skeletons": skeletons, "missing": missing, "async_queued": async_queued}

def get_skeleton_access_token(
self,
root_ids: List,
datastack_name: Optional[str] = None,
skeleton_version: Optional[int] = 4,
expiration_minutes: int = 60,
verbose_level: Optional[int] = 0,
) -> dict:
"""Get a short-lived GCS access token to download skeleton files directly from the bucket.

The token is a downscoped OAuth2 Bearer token, valid for up to one hour, that
authorizes read-only access to the skeleton bucket prefix for the given datastack
and skeleton version. The response also includes the GCS object path for each
requested root ID that exists in the cache.

Clients can use the token to download skeleton H5 files directly from GCS without
routing through this service, which is significantly faster for bulk downloads:

import requests, urllib.parse
token_resp = client.skeleton.get_skeleton_access_token([rid1, rid2])
bucket = token_resp["bucket"]
for rid, obj_path in token_resp["object_paths"].items():
encoded = urllib.parse.quote(obj_path, safe="")
url = f"https://storage.googleapis.com/download/storage/v1/b/{bucket}/o/{encoded}?alt=media"
h5_bytes = requests.get(url, headers={"Authorization": f"Bearer {token_resp['token']}"}).content

Parameters
----------
root_ids : List
Root IDs to check and generate download paths for.
datastack_name : str, optional
Datastack name. Defaults to the client's configured datastack.
skeleton_version : int, optional
Skeleton version. Default is 4 (latest).
expiration_minutes : int
Token lifetime in minutes. Maximum is 60 (GCP IAM limit). Default is 60.
verbose_level : int, optional
Verbosity level for server-side logging.

Returns
-------
dict with keys:
"token" : str — short-lived Bearer token
"token_type" : "Bearer"
"expiry" : str — ISO-8601 expiry datetime
"bucket" : str — GCS bucket name (no gs:// prefix)
"object_paths" : dict mapping root_id (str) -> GCS object path within the bucket
"missing" : list of root IDs not found in the cache
"""
if datastack_name is None:
datastack_name = self._datastack_name
assert datastack_name is not None
assert skeleton_version is not None

endpoint_mapping = self.default_url_mapping
endpoint_mapping["datastack_name"] = datastack_name
endpoint_mapping["skeleton_version"] = skeleton_version
url = self._endpoints["get_skeleton_token_as_post"].format_map(endpoint_mapping)
url += f"?verbose_level={verbose_level}"

data = {
"root_ids": root_ids,
"expiration_minutes": expiration_minutes,
}
response = self.session.post(url, json=data)
return handle_response(response)

def download_skeletons_with_token(
self,
token_response: dict,
) -> dict:
"""Download and parse skeleton H5 files directly from GCS using a token.

Takes the response from :meth:`get_skeleton_access_token` and downloads
each skeleton file directly from GCS, bypassing the SkeletonService for
the data transfer. Returns skeletons in the same dict format as
:meth:`get_skeleton` with ``output_format="dict"``.

Parameters
----------
token_response : dict
The dict returned by :meth:`get_skeleton_access_token`, containing
``"token"``, ``"bucket"``, and ``"object_paths"`` keys.

Returns
-------
dict
Mapping of root_id (str) to skeleton dict with numpy-array fields:
``vertices``, ``edges``, and optionally ``mesh_to_skel_map``,
``lvl2_ids``, ``radius``, ``compartment``, and ``meta``.
"""
import h5py

token = token_response["token"]
bucket = token_response["bucket"]
object_paths = token_response["object_paths"]

gcs_headers = {"Authorization": f"Bearer {token}"}
skeletons = {}

for rid, obj_path in object_paths.items():
encoded_path = urllib.parse.quote(obj_path, safe="")
url = f"https://storage.googleapis.com/download/storage/v1/b/{bucket}/o/{encoded_path}?alt=media"

# Use a direct get with the GCS token (overrides any session auth headers)
resp = self.session.get(url, headers=gcs_headers)
resp.raise_for_status()

# Stored files are gzip-compressed H5
h5_bytes = gzip.decompress(resp.content)

with h5py.File(io.BytesIO(h5_bytes), "r") as f:
sk = {
"vertices": np.array(f["vertices"][()]),
"edges": np.array(f["edges"][()]),
}
if "mesh_to_skel_map" in f:
sk["mesh_to_skel_map"] = np.array(f["mesh_to_skel_map"][()])
if "lvl2_ids" in f:
sk["lvl2_ids"] = np.array(f["lvl2_ids"][()])
if "vertex_properties" in f:
for vp_key in f["vertex_properties"].keys():
sk[vp_key] = np.array(
json.loads(f["vertex_properties"][vp_key][()])
)
if "meta" in f:
sk["meta"] = json.loads(f["meta"][()].tobytes())

skeletons[rid] = sk

return skeletons
2 changes: 1 addition & 1 deletion docs/api/skeleton.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ title: client.skeleton
options:
heading_level: 2
show_bases: false
members: ['server_version', 'get_skeleton', 'get_cache_contents', 'skeletons_exist', 'get_bulk_skeletons', 'generate_bulk_skeletons_async']
members: ['server_version', 'get_skeleton', 'get_cache_contents', 'skeletons_exist', 'get_bulk_skeletons', 'generate_bulk_skeletons_async', 'get_cached_skeletons_bulk', 'get_skeleton_access_token', 'download_skeletons_with_token']
6 changes: 6 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
---
title: Changelog
---
## 8.1.0 (March 13, 2026)
- Added `get_cached_skeletons_bulk()`: bulk retrieval of already-cached skeletons with a 500 root ID limit (vs. the 10-skeleton limit of `get_bulk_skeletons()`). Skips per-RID chunkedgraph validation for faster response. Requires server-side SkeletonService >= v0.22.51.
- Added `get_skeleton_access_token()`: returns a short-lived, downscoped GCS Bearer token and object paths so clients can download skeleton H5 files directly from the storage bucket, bypassing the service for data transfer. Requires server-side SkeletonService >= v0.22.51.
- Added `download_skeletons_with_token()`: convenience method that takes the response from `get_skeleton_access_token()` and downloads and parses all skeleton H5 files directly from GCS, returning them in the same dict format as `get_skeleton()`.
- Added `h5py` as a required dependency (needed for H5 parsing in `download_skeletons_with_token()`).

## 8.0.0 (November 2, 2025)
- Improved mangling of types from sql queries. Previously, the server side method to read data from PostGres into pandas was via csv streaming, which was caused pandas to infer types. There were cases where this inference was wrong or incomplete. For example if you had a string column, but all your entries for your query happened to be numbers (i.e ["1", "2"]) the result would return those as numbers not strings, but then if your query changed so there was a mix of numbers and strings, those same rows which were numbers would go back to strings (i.e. ["1", "apple"]). Also, boolean columns were being returned as strings "t" or "f".

Expand Down
103 changes: 99 additions & 4 deletions docs/tutorials/skeletonization.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,38 @@ get_bulk_skeletons(
)
```

## Retrieving large numbers of already-cached skeletons

`get_bulk_skeletons()` has a hard limit of 10 root IDs per call because it may need to generate missing skeletons inline, which is a slow synchronous operation. If you already know your skeletons are in the cache, use `get_cached_skeletons_bulk()` instead, which accepts up to 500 root IDs per call and skips per-RID validation:

```python
result = client.skeleton.get_cached_skeletons_bulk(
root_ids=[<root_id>, <root_id>, ...],
)
skeletons = result["skeletons"] # dict of {root_id: skeleton}
missing = result["missing"] # list of root IDs not found in the cache
```

The return value is a dict with three keys:

| Key | Description |
|---|---|
| `"skeletons"` | Dict mapping root ID (str) → skeleton object |
| `"missing"` | List of root IDs not found in the cache |
| `"async_queued"` | List of root IDs queued for async generation (if `generate_missing_skeletons=True`) |

The same optional parameters apply as in `get_bulk_skeletons()`:

```python
result = client.skeleton.get_cached_skeletons_bulk(
root_ids=[<root_id>, <root_id>, ...],
datastack_name=<datastack_name>,
skeleton_version=<sk_version>,
output_format=<"dict"|"swc">,
generate_missing_skeletons=True, # queue missing ones for async generation
)
```

## Generating multiple skeletons in parallel

`get_bulk_skeletons()` is not an effective way to produce a large number of skeletons since it operates synchronously, generating one skeleton at a time. In order to generate a large number of skeletons it is better to do so in parallel. The following function dispatches numerous root ids for skeletonization without returning anything immediately. The root ids are then distributed on the server for parallel skeletonization and eventual caching. Once they are in the cache, you can retrieve them. Of course, it can be tricky to know when they are available. That is addressed further below. Here's how to dispatch asynchronous bulk skeletonization:
Expand All @@ -232,9 +264,9 @@ generate_bulk_skeletons_async(

In order to retrieve asynchronously generated skeletons, it is necessary to _poll_ the cache for the availability of the skeletons and then eventually retrieve them. Here's an example of such a workflow:

```
```python
# Dispatch multiple asynchronous, parallel skeletonization and caching processes
generate_bulk_skeletons_async(root_ids)
client.skeleton.generate_bulk_skeletons_async(root_ids)

# Repeatedly query the cache for the existence of the skeletons until they are all available
while True:
Expand All @@ -244,6 +276,69 @@ while True:
break
sleep(10) # Pause for ten seconds and check again

# Retrieve the skeletons (remember, SWC is also offered)
skeletons_json = get_bulk_skeletons(root_ids)
# Retrieve all skeletons at once — get_cached_skeletons_bulk() supports up to 500 at a time
result = client.skeleton.get_cached_skeletons_bulk(root_ids)
skeletons = result["skeletons"]
```

## Downloading skeletons directly from storage

For maximum throughput — especially when retrieving hundreds of skeletons — you can obtain a short-lived access token and have your machine download skeleton files directly from the storage bucket, bypassing the SkeletonService entirely for the data transfer.

### Step 1: Get an access token

```python
token_resp = client.skeleton.get_skeleton_access_token(
root_ids=[<root_id>, <root_id>, ...],
skeleton_version=4,
expiration_minutes=60, # maximum is 60 (GCP limit)
)
```

The response is a dict with the following keys:

| Key | Description |
|---|---|
| `"token"` | Short-lived Bearer token (valid up to 60 minutes) |
| `"token_type"` | `"Bearer"` |
| `"expiry"` | ISO-8601 expiry datetime string |
| `"bucket"` | GCS bucket name (no `gs://` prefix) |
| `"object_paths"` | Dict mapping root ID (str) → GCS object path within the bucket |
| `"missing"` | List of root IDs not found in the cache |

The token grants read-only access to the skeleton prefix for the requested datastack and version. Root IDs not in the cache appear in `"missing"` and have no entry in `"object_paths"`.

### Step 2: Download and parse skeletons

The simplest approach uses the built-in convenience method, which downloads and parses all skeletons into the same dict format as `get_skeleton()`:

```python
skeletons = client.skeleton.download_skeletons_with_token(token_resp)
# {"123456789": {"vertices": np.array(...), "edges": np.array(...), ...}, ...}
```

!!! note
`download_skeletons_with_token()` requires `h5py`, which is included as a dependency of `caveclient`.

### Manual download (for custom parallelism)

If you prefer to manage downloads yourself — for example, using `asyncio` or a thread pool for parallel fetching — the token and object paths from the response are all you need:

```python
import requests
import urllib.parse

token = token_resp["token"]
bucket = token_resp["bucket"]

for rid, obj_path in token_resp["object_paths"].items():
encoded = urllib.parse.quote(obj_path, safe="")
url = (
f"https://storage.googleapis.com/download/storage/v1/b/"
f"{bucket}/o/{encoded}?alt=media"
)
h5_gz_bytes = requests.get(
url, headers={"Authorization": f"Bearer {token}"}
).content
# h5_gz_bytes is a gzip-compressed HDF5 file
```
Loading
Loading