SpinePort是一个可插拔的数据处理骨架
-
入口是 Port(CLI/HTTP/Async)
-
格式是 Codec(用Wire来统一建模,目前支持JSON/MsgPack)
-
算法是 Core(Top-K 聚合、日志归一去重)。
-
所有路径都走同一条不变量链路:
bytes → Wire → DTO/Domain → Core → DTO → Wire → bytes入口(Port)可以换,格式(Codec)可以换,逻辑不换(我们可以为同一段核心业务逻辑兼容不同的外围)
输出稳定、可快照回放、可对拍验证。
1. 当前版本已提供的功能:
-
两条业务算子(交易Top-K,日志归一化去重)
① Top-K的使用场景
统计用户/账户/产品在某段时间内的交易额/频率/均值等,并提取前K。
适用于经营分析(谁是大客户?哪个商品销量好?热榜榜单的前Top10?),
风控预警(交易异常突增)等核心指标面板, 该算子几乎是现代“数据监控平台”必备。
② 日志归一化去重的使用场景
对系统日志、访问日志等进行给定时间窗口内的去重合并。
应对日志爆量场景:如客户端上传过多重复日志;减少存储压力,提高聚合效率。
我的内心OS: 这俩算子是现代数据系统最常见的预处理原语,之所以选择先实现这两个算子,是因为它们足够基础——是工程基础算子, 具备典型的滑窗处理、聚合归并、聚簇过滤等特征, 这在任何现代数据系统的“数据预处理 + 归档 + 展示”流程中都可复用。
-
已打通三种入口(同步 CLI、异步 CLI、HTTP)
同步CLI:适用于本地文件处理/脚本管道,
异步CLI:高速标准输入输出的异步处理流
目前的异步主要是演示Codec/Core与async的自然结合, 为未来对接消息队列等async流做准备
HTTP:Web API, 可以对接前端/平台。支持在线处理, 对接浏览器, 用Curl调试等等。
实现接口级复用与拓展能力,可部署为内网服务/微服务, 兼容MsgPack格式,能够适配高性能场景。
-
与两种编码(JSON、MsgPack)
Json: 可读性强, 适合调试/接口文档/人机交互, 配合
Jq可直接观察校验。MsgPack: 高性能二进制协议, 适合高频低延迟服务或嵌入式设备, 比Json小很多, 序列化/反序列化快,支持二进制字段, 适用于服务间高频调用或IoT场景。
2. 通过目录来看系统架构
spineports/
├─ Cargo.toml
├─ examples/ # 可直接跑的演示输入
│ ├─ txn_input.json
│ ├─ logs_input.json
│ └─ curl_http.json # HTTP 路由/参数示例
├─ tests/
│ ├─ fixtures/ # 集成快照契约(输入/期望输出)
│ │ ├─ txn_input.json
│ │ ├─ txn_expected_output.json
│ │ ├─ logs_input.json
│ │ └─ logs_expected_output.json
│ ├─ txn_snapshot.rs # 交易Top-K快照测试(端到端)
│ └─ dedup_snapshot.rs # 日志去重快照测试(端到端)
├─ src/
│ ├─ lib.rs # Facade:统一re-export,外部只引这个
│ ├─ error.rs # AppError(IO/JSON/MsgPack/Core配置等)
│ ├─ core/ # 纯业务内核(无serde/HTTP)
│ │ ├─ mod.rs
│ │ ├─ txn.rs # 交易Top-K:Domain + transform()
│ │ └─ logs.rs # 日志去重:Domain + dedup_logs()
│ └─ codec/ # 只处理Bytes ↔ DTO ↔ Domain(挂serde/rmp)
│ ├─ mod.rs
│ ├─ dto.rs # DTO 结构 + From/Into 映射
│ ├─ json.rs # JSON 编解码:decode_* / encode_*
│ └─ msgpack.rs # MsgPack 编解码:decode_* / encode_*
└─ src/bin/ # Ports:不同入口(只做编排与协议)
├─ txn.rs # CLI:交易 Top-K(JSON)
├─ dedup.rs # CLI:日志去重(JSON)
├─ async_cli.rs # Async CLI:Dedup(对拍验证 async==sync)
└─ http.rs # HTTP:/txn/topk(JSON);/logs/dedup(JSON/MsgPack)
-
src/bin/txn.rs:Port=CLI,同步;Codec=JSON;Core=Top-K;链路透视: 输入
TxnDto[]→Txn[]→transform→AggRowDto[] -
src/bin/dedup.rs:Port=CLI,同步;Codec=JSON;Core=Dedup;链路透视: 输入
LogDto[]→Log[]→dedup_logs→DedupRowDto[] -
src/bin/async_cli.rs:Port=CLI,Tokio异步;Codec/Core复用(LogDedup);链路透视:
stdin→ bytes → decode → core → encode →stdout -
src/bin/http.rs:Port=HTTP(axum);Codec=JSON和MsgPack;Core复用(两个算子)链路透视: 请求体(Request) → DTO → Domain → Core → DTO → 响应体(Response)
入口换了三种,但编排始终是同一条链路:decode → transform → encode。
先编译:cargo build
同步 CLI(交易 Top-K,JSON):
cargo run --bin txn -- examples/txn_input.json -同步 CLI(日志去重,JSON):
cargo run --bin dedup -- examples/logs_input.json -异步 CLI(Dedup,对拍 sync):
cat examples/logs_input.json | cargo run --bin async_cli -- - - | jq -S . > /tmp/async.json
cargo run --bin dedup -- examples/logs_input.json - | jq -S . > /tmp/sync.json
diff -q /tmp/async.json /tmp/sync.json && echo "✅ async == sync"HTTP 服务(127.0.0.1:8080):
# 先开一个终端执行:
cargo run --bin http# 另开终端:
# JSON 路径(交易 Top-K)
curl -sS -H 'Content-Type: application/json' -H 'Accept: application/json' \
-X POST 'http://127.0.0.1:8080/txn/topk?top_k=2&metric=total' \
--data-binary @examples/txn_input.json | jq -S .
# MsgPack 路径(日志去重)
python3 - <<'PY'
import json,msgpack; open("/tmp/req.msgp","wb").write(msgpack.packb(json.load(open("examples/logs_input.json")), use_bin_type=True))
PY
curl -sS -H 'Content-Type: application/msgpack' -H 'Accept: application/msgpack' \
--data-binary @/tmp/req.msgp http://127.0.0.1:8080/logs/dedup > /tmp/resp.msgp
python3 - <<'PY'
import msgpack,json; print(json.dumps(msgpack.unpackb(open("/tmp/resp.msgp","rb").read(), raw=False), indent=2, sort_keys=True))
PY如果没有安装 msgpack 的 Python 包,可以临时只验证响应头:
curl -i -H 'Content-Type: application/json' -H 'Accept: application/msgpack' \
--data-binary @examples/logs_input.json http://127.0.0.1:8080/logs/dedup | head -20
# 看到 Content-Type: application/msgpack 即可配置开关(仅 Top-K):通过环境变量控制(CLI),或 Query 控制(HTTP)
TOP_K(默认 3)METRIC:total|count|avg:<scale>(如avg:1000)WINDOW:时间窗"a..b"(毫秒,含端点)
示例:
TOP_K=2 METRIC=avg:1000 WINDOW=1000..2000 cargo run --bin txn -- examples/txn_input.json -这套测试把整条链路当黑盒:固定输入 → 跑 Core+Codec → JSON 结构与期望快照相等。
先填充(已对齐当前 DTO 字段):
tests/fixtures/txn_input.json
[
{"account": "A", "amount": 100, "ts_ms": 1000},
{"account": "A", "amount": 200, "ts_ms": 1100},
{"account": "B", "amount": 150, "ts_ms": 1200},
{"account": "B", "amount": 150, "ts_ms": 1300},
{"account": "C", "amount": 50, "ts_ms": 1400}
]tests/fixtures/logs_input.json
[
{"service":"auth","host":"h1","level":"info","ts_ms":1000,"message":"login user=jack id=123 ok"},
{"service":"auth","host":"h2","level":"info","ts_ms":1010,"message":"login user=jack id=456 ok"}
]用程序生成期望快照:
# 与测试配置对齐:TOP_K=3, METRIC=total
TOP_K=3 METRIC=total cargo run -q --bin txn -- tests/fixtures/txn_input.json - | jq -S . \
> tests/fixtures/txn_expected_output.json
cargo run -q --bin dedup -- tests/fixtures/logs_input.json - | jq -S . \
> tests/fixtures/logs_expected_output.json跑测试:
cargo test -q