-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprotocol_pi0.py
More file actions
38 lines (26 loc) · 1.31 KB
/
protocol_pi0.py
File metadata and controls
38 lines (26 loc) · 1.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
"""Serialization for π₀ observations and actions.
Uses openpi_client.msgpack_numpy so the same module works:
- in the openpi Python environment (bench.py)
- inside the Modal container (server_pi0.py via add_local_python_source)
Wire format (flat msgpack dict, numpy arrays encoded as dicts):
pack_obs({"cam_high": <uint8 CHW>, "state": <float32>}) → bytes
unpack_obs(bytes) → dict
pack_response(action_array, inference_ms_float) → bytes
unpack_response(bytes) → (action_array, inference_ms_float)
"""
import msgpack
import numpy as np
from openpi_client import msgpack_numpy as _mnp
def pack_obs(obs: dict[str, np.ndarray]) -> bytes:
"""Serialize flat observation dict (str → ndarray) to msgpack bytes."""
return _mnp.packb(obs)
def unpack_obs(data: bytes) -> dict[str, np.ndarray]:
"""Deserialize msgpack bytes to flat observation dict."""
return _mnp.unpackb(data)
def pack_response(action: np.ndarray, inference_ms: float) -> bytes:
"""Serialize action array + server-side GPU timing."""
return _mnp.packb({"action": action, "inference_ms": inference_ms})
def unpack_response(data: bytes) -> tuple[np.ndarray, float]:
"""Deserialize action response. Returns (action_array, inference_ms)."""
resp = _mnp.unpackb(data)
return resp["action"], resp["inference_ms"]