diff --git a/deploy/MLOG_SETUP.md b/deploy/MLOG_SETUP.md index 92c37ca3..b7c76923 100644 --- a/deploy/MLOG_SETUP.md +++ b/deploy/MLOG_SETUP.md @@ -40,15 +40,15 @@ curl https://interop-relay.cloudflare.mediaoverquic.com:443/mlog/22c73802597dcd9 The output is JSON-SEQ: each record starts with ASCII Record Separator (`0x1e`) and contains a JSON object. The first record is a header; subsequent records are events: ```json -{"time":0.179,"name":"moqt:control_message_parsed","data":{"message_type":"client_setup","supported_versions":["DRAFT_14"],...}} -{"time":0.216,"name":"moqt:control_message_created","data":{"message_type":"server_setup","selected_version":"DRAFT_14",...}} +{"time":1744243200179.0,"name":"moqt:control_message_parsed","data":{"stream_id":0,"message":{"type":"client_setup","supported_versions":["DRAFT_14"],...}}} +{"time":1744243200216.0,"name":"moqt:control_message_created","data":{"stream_id":0,"message":{"type":"server_setup","selected_version":"DRAFT_14",...}}} ``` -- `control_message_parsed` = the relay **received** a message from your client +- `control_message_parsed` = the relay **received** a message from your client (the `message.type` field identifies which control message) - `control_message_created` = the relay **sent** a message to your client -- `time` = milliseconds since connection start +- `time` = milliseconds since Unix epoch (absolute timestamp) -If you see `client_setup` parsed and `server_setup` created, your client is speaking valid MoQ Transport and the relay understood it. +If you see `client_setup` parsed and `server_setup` created (check `message.type`), your client is speaking valid MoQ Transport and the relay understood it. ## Worked Examples @@ -67,30 +67,34 @@ cargo run --bin moq-test-client -- \ ```json { - "time": 0.179, + "time": 1744243200179.0, "name": "moqt:control_message_parsed", "data": { - "event_type": "control_message_parsed", "stream_id": 0, - "message_type": "client_setup", - "number_of_supported_versions": 1, - "supported_versions": ["DRAFT_14"], - "parameters": [["2", "100"]] + "message": { + "type": "client_setup", + "number_of_supported_versions": 1, + "supported_versions": ["DRAFT_14"], + "number_of_parameters": 1, + "setup_parameters": [{"name": "path", "value": "/"}] + } } } ``` -The relay parsed your CLIENT_SETUP. It offered version DRAFT_14. The `parameters` array contains SETUP parameters as `[id, value]` pairs (here, PATH with max length 100). +The relay parsed your CLIENT_SETUP. It offered version DRAFT_14. The `setup_parameters` array contains typed parameter objects (here, a PATH parameter). ```json { - "time": 0.216, + "time": 1744243200216.0, "name": "moqt:control_message_created", "data": { - "event_type": "control_message_created", "stream_id": 0, - "message_type": "server_setup", - "selected_version": "DRAFT_14", - "parameters": [["2", "100"]] + "message": { + "type": "server_setup", + "selected_version": "DRAFT_14", + "number_of_parameters": 1, + "setup_parameters": [{"name": "max_request_id", "value": 100}] + } } } ``` @@ -115,29 +119,32 @@ cargo run --bin moq-test-client -- \ ```json { - "time": 64.779, + "time": 1744243264779.0, "name": "moqt:control_message_parsed", "data": { - "event_type": "control_message_parsed", "stream_id": 0, - "message_type": "publish_namespace", - "request_id": 0, - "track_namespace": "/moq-test/interop", - "parameters": [] + "message": { + "type": "publish_namespace", + "request_id": 0, + "track_namespace": [{"value": "moq-test"}, {"value": "interop"}], + "number_of_parameters": 0, + "parameters": [] + } } } ``` -The relay received your PUBLISH_NAMESPACE for `/moq-test/interop`. +The relay received your PUBLISH_NAMESPACE for `moq-test/interop`. ```json { - "time": 65.526, + "time": 1744243265526.0, "name": "moqt:control_message_created", "data": { - "event_type": "control_message_created", "stream_id": 0, - "message_type": "publish_namespace_ok", - "request_id": 0 + "message": { + "type": "publish_namespace_ok", + "request_id": 0 + } } } ``` @@ -183,33 +190,33 @@ curl https://interop-relay.cloudflare.mediaoverquic.com:443/mlog/08d0b03ede133f0 **Publisher mlog** (after SETUP): ```json -{"time":231.481,"name":"moqt:control_message_parsed","data":{"message_type":"publish_namespace","request_id":0,"track_namespace":"/moq-test/interop",...}} -{"time":233.084,"name":"moqt:control_message_created","data":{"message_type":"publish_namespace_ok","request_id":0}} +{"time":1744243431481.0,"name":"moqt:control_message_parsed","data":{"stream_id":0,"message":{"type":"publish_namespace","request_id":0,"track_namespace":[{"value":"moq-test"},{"value":"interop"}],...}}} +{"time":1744243433084.0,"name":"moqt:control_message_created","data":{"stream_id":0,"message":{"type":"publish_namespace_ok","request_id":0}}} ``` -The publisher announced `/moq-test/interop` and the relay accepted it. +The publisher announced `moq-test/interop` and the relay accepted it. **Subscriber mlog** (after SETUP): ```json -{"time":47.169,"name":"moqt:control_message_parsed","data":{"message_type":"subscribe","subscribe_id":0,"track_namespace":"/moq-test/interop","track_name":"test-track",...}} -{"time":48.622,"name":"moqt:control_message_created","data":{"message_type":"subscribe_ok","subscribe_id":0,"track_alias":0,...}} +{"time":1744243247169.0,"name":"moqt:control_message_parsed","data":{"stream_id":0,"message":{"type":"subscribe","request_id":0,"track_namespace":[{"value":"moq-test"},{"value":"interop"}],"track_name":{"value":"test-track"},...}}} +{"time":1744243248622.0,"name":"moqt:control_message_created","data":{"stream_id":0,"message":{"type":"subscribe_ok","request_id":0,"track_alias":0,...}}} ``` -The subscriber sent a SUBSCRIBE for track `test-track` under namespace `/moq-test/interop`, and the relay responded with SUBSCRIBE_OK. This confirms the relay successfully routed the subscription to the publisher's namespace. +The subscriber sent a SUBSCRIBE for track `test-track` under namespace `moq-test/interop`, and the relay responded with SUBSCRIBE_OK. This confirms the relay successfully routed the subscription to the publisher's namespace. **When data flows**, you'll also see data plane events in the mlog. In the publisher's mlog, `*_parsed` events show data the relay received: ```json -{"time":395.872,"name":"moqt:subgroup_header_parsed","data":{"header_type":"SubgroupIdExt","track_alias":0,"group_id":1,"publisher_priority":128}} -{"time":397.166,"name":"moqt:subgroup_object_parsed","data":{"group_id":1,"subgroup_id":0,"object_id":0,"object_payload_length":1024}} +{"time":1744243595872.0,"name":"moqt:subgroup_header_parsed","data":{"stream_id":0,"track_alias":0,"group_id":1,"publisher_priority":128}} +{"time":1744243597166.0,"name":"moqt:subgroup_object_parsed","data":{"stream_id":0,"group_id":1,"subgroup_id":0,"object_id":0,"extension_headers_length":0,"object_payload_length":1024}} ``` In the subscriber's mlog, `*_created` events show data the relay forwarded: ```json -{"time":395.872,"name":"moqt:subgroup_header_created","data":{"header_type":"SubgroupIdExt","track_alias":0,"group_id":1,"publisher_priority":128,"subgroup_id":0}} -{"time":397.166,"name":"moqt:subgroup_object_created","data":{"group_id":1,"subgroup_id":0,"object_id":0,"object_payload_length":1024}} +{"time":1744243595872.0,"name":"moqt:subgroup_header_created","data":{"stream_id":0,"track_alias":0,"group_id":1,"publisher_priority":128,"subgroup_id":0}} +{"time":1744243597166.0,"name":"moqt:subgroup_object_created","data":{"stream_id":0,"group_id":1,"subgroup_id":0,"object_id":0,"extension_headers_length":0,"object_payload_length":1024}} ``` - `object_payload_length` tells you the size of the payload the relay handled @@ -224,15 +231,21 @@ mlog uses [JSON-SEQ (RFC 7464)](https://www.rfc-editor.org/rfc/rfc7464): each re ```json { - "qlog_version": "0.3", - "qlog_format": "JSON-SEQ", + "file_schema": "urn:ietf:params:qlog:file:sequential", + "serialization_format": "JSON-SEQ", "title": "moq-relay", "description": "MoQ Transport events", "trace": { "vantage_point": { "type": "server" }, + "common_fields": { + "time_format": "relative_to_epoch", + "reference_time": { + "clock_type": "system", + "epoch": "1970-01-01T00:00:00.000Z" + } + }, "event_schemas": [ - "urn:ietf:params:qlog:events:loglevel", - "urn:ietf:params:qlog:events:moqt" + "urn:ietf:params:qlog:events:moqt-03" ] } } @@ -242,7 +255,7 @@ mlog uses [JSON-SEQ (RFC 7464)](https://www.rfc-editor.org/rfc/rfc7464): each re ```json { - "time": , + "time": , "name": "", "data": { ... } } @@ -265,7 +278,7 @@ mlog uses [JSON-SEQ (RFC 7464)](https://www.rfc-editor.org/rfc/rfc7464): each re ### Control message types -These appear in the `message_type` field of control message events: +These appear in the `message.type` field of control message events: | Message Type | Protocol Reference | |-------------|-------------------| diff --git a/moq-transport/src/mlog/events.rs b/moq-transport/src/mlog/events.rs index a93820df..5fa77e29 100644 --- a/moq-transport/src/mlog/events.rs +++ b/moq-transport/src/mlog/events.rs @@ -22,16 +22,27 @@ // - Need to plumb actual QUIC stream IDs through web_transport abstractions // - This would enable correlation between QUIC qlog and MoQ mlog events -use serde::{Deserialize, Serialize}; +use serde::Serialize; use serde_json::{json, Value as JsonValue}; +use crate::coding::Encode; use crate::{coding, data, message, setup}; +/// Hex-encode a byte slice (lowercase). +fn hex_encode(bytes: &[u8]) -> String { + let mut s = String::with_capacity(bytes.len() * 2); + for b in bytes { + use std::fmt::Write; + let _ = write!(s, "{:02x}", b); + } + s +} + /// MoQ Transport event following qlog patterns #[serde_with::skip_serializing_none] -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize)] pub struct Event { - /// Time in milliseconds since connection start + /// Time in milliseconds since Unix epoch (1970-01-01T00:00:00.000Z) pub time: f64, /// Event name in format "moqt:event_name" @@ -42,64 +53,55 @@ pub struct Event { } /// Union of all MoQ Transport event types -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "event_type")] +/// +/// Uses serde untagged representation — the event type is conveyed by +/// the `name` field on the parent Event struct, not repeated in data. +/// (Per qlog spec, the "name" field is the event type identifier.) +/// +/// Note: `Deserialize` is not derived because structurally identical +/// variant pairs (e.g. `ControlMessageParsed` / `ControlMessageCreated`) +/// are ambiguous under untagged deserialization. This enum is +/// serialization-only. Consumers must use `Event.name` to determine +/// the event type. +#[derive(Debug, Clone, Serialize)] +#[serde(untagged)] pub enum EventData { - #[serde(rename = "control_message_parsed")] ControlMessageParsed(ControlMessageParsed), - - #[serde(rename = "control_message_created")] ControlMessageCreated(ControlMessageCreated), - - #[serde(rename = "subgroup_header_parsed")] SubgroupHeaderParsed(SubgroupHeaderParsed), - - #[serde(rename = "subgroup_header_created")] SubgroupHeaderCreated(SubgroupHeaderCreated), - - #[serde(rename = "subgroup_object_parsed")] SubgroupObjectParsed(SubgroupObjectParsed), - - #[serde(rename = "subgroup_object_created")] SubgroupObjectCreated(SubgroupObjectCreated), - - #[serde(rename = "object_datagram_parsed")] ObjectDatagramParsed(ObjectDatagramParsed), - - #[serde(rename = "object_datagram_created")] ObjectDatagramCreated(ObjectDatagramCreated), - - #[serde(rename = "loglevel")] LogLevel(LogLevelEvent), } -/// Control message parsed event (Section 4.2 of draft-pardue-moq-qlog-moq-events) +/// Control message parsed event +/// Per draft-pardue-moq-qlog-moq-events-03 Section 4.2 #[serde_with::skip_serializing_none] -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize)] pub struct ControlMessageParsed { pub stream_id: u64, - pub message_type: String, - /// Message-specific fields - #[serde(flatten)] + /// Nested control message object (contains "type" discriminator) pub message: JsonValue, } -/// Control message created event (Section 4.1 of draft-pardue-moq-qlog-moq-events) +/// Control message created event +/// Per draft-pardue-moq-qlog-moq-events-03 Section 4.1 #[serde_with::skip_serializing_none] -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize)] pub struct ControlMessageCreated { pub stream_id: u64, - pub message_type: String, - /// Message-specific fields - #[serde(flatten)] + /// Nested control message object (contains "type" discriminator) pub message: JsonValue, } /// Subgroup header parsed event (data plane) #[serde_with::skip_serializing_none] -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize)] pub struct SubgroupHeaderParsed { pub stream_id: u64, @@ -110,7 +112,7 @@ pub struct SubgroupHeaderParsed { /// Subgroup header created event (data plane) #[serde_with::skip_serializing_none] -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize)] pub struct SubgroupHeaderCreated { pub stream_id: u64, @@ -121,7 +123,7 @@ pub struct SubgroupHeaderCreated { /// Subgroup object parsed event (data plane) #[serde_with::skip_serializing_none] -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize)] pub struct SubgroupObjectParsed { pub stream_id: u64, @@ -132,7 +134,7 @@ pub struct SubgroupObjectParsed { /// Subgroup object created event (data plane) #[serde_with::skip_serializing_none] -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize)] pub struct SubgroupObjectCreated { pub stream_id: u64, @@ -143,7 +145,7 @@ pub struct SubgroupObjectCreated { /// Object Datagram parsed event (data plane) #[serde_with::skip_serializing_none] -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize)] pub struct ObjectDatagramParsed { pub stream_id: u64, @@ -154,7 +156,7 @@ pub struct ObjectDatagramParsed { /// Object Datagram created event (data plane) #[serde_with::skip_serializing_none] -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize)] pub struct ObjectDatagramCreated { pub stream_id: u64, @@ -166,44 +168,151 @@ pub struct ObjectDatagramCreated { /// LogLevel event for flexible logging (qlog loglevel schema) /// See: https://www.ietf.org/archive/id/draft-ietf-quic-qlog-main-schema-12.html#name-loglevel-events #[serde_with::skip_serializing_none] -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize)] pub struct LogLevelEvent { pub message: String, } -// Helper functions to create vector of string pairs from KVPs -fn key_value_pairs_to_vec(kvps: &[coding::KeyValuePair]) -> Vec<(String, String)> { - kvps.iter() - .map(|kvp| (kvp.key.to_string(), format!("{:?}", kvp.value))) - .collect() +/// Convert a KeyValuePair to a typed MOQTSetupParameter JSON object. +/// Per draft-pardue-moq-qlog-moq-events-03 Section 5.2 +fn setup_param_to_qlog(kvp: &coding::KeyValuePair) -> JsonValue { + match (kvp.key, &kvp.value) { + // PATH (0x01) + (0x01, coding::Value::BytesValue(bytes)) => json!({ + "name": "path", + "value": String::from_utf8_lossy(bytes), + }), + // MAX_REQUEST_ID (0x02) + (0x02, coding::Value::IntValue(v)) => json!({ + "name": "max_request_id", + "value": v, + }), + (key, coding::Value::IntValue(v)) => json!({ + "name": "unknown", + "name_bytes": key, + "value": v, + }), + (key, coding::Value::BytesValue(bytes)) => json!({ + "name": "unknown", + "name_bytes": key, + "length": bytes.len(), + "value_bytes": hex_encode(bytes), + }), + } +} + +/// Convert setup parameters to qlog [*MOQTSetupParameter] array. +fn setup_params_to_qlog(kvps: &[coding::KeyValuePair]) -> JsonValue { + json!(kvps.iter().map(setup_param_to_qlog).collect::>()) +} + +/// Convert a KeyValuePair to a typed MOQTParameter JSON object. +/// Per draft-pardue-moq-qlog-moq-events-03 Section 5.3 +fn param_to_qlog(kvp: &coding::KeyValuePair) -> JsonValue { + match (kvp.key, &kvp.value) { + // AUTHORIZATION_TOKEN (0x02) — redact value, log only presence and length + (0x02, coding::Value::BytesValue(bytes)) => json!({ + "name": "authorization_token", + "alias_type": kvp.key, + "token_length": bytes.len(), + }), + // DELIVERY_TIMEOUT (0x03) + (0x03, coding::Value::IntValue(v)) => json!({ + "name": "delivery_timeout", + "value": v, + }), + // MAX_CACHE_DURATION (0x04) + (0x04, coding::Value::IntValue(v)) => json!({ + "name": "max_cache_duration", + "value": v, + }), + (key, coding::Value::IntValue(v)) => json!({ + "name": "unknown", + "name_bytes": key, + "value": v, + }), + (key, coding::Value::BytesValue(bytes)) => json!({ + "name": "unknown", + "name_bytes": key, + "length": bytes.len(), + "value_bytes": hex_encode(bytes), + }), + } +} + +/// Convert parameters to qlog [*MOQTParameter] array. +fn params_to_qlog(kvps: &[coding::KeyValuePair]) -> JsonValue { + json!(kvps.iter().map(param_to_qlog).collect::>()) +} + +/// Compute the wire byte length of extension headers by encoding to a temp buffer. +/// This matches the MoQT framing field that precedes extension headers on the wire. +fn ext_headers_wire_len(headers: &data::ExtensionHeaders) -> u64 { + let mut tmp = bytes::BytesMut::new(); + for kvp in &headers.0 { + let _ = kvp.encode(&mut tmp); + } + tmp.len() as u64 +} + +/// Convert extension headers to qlog [*MOQTExtensionHeader] array. +/// Per draft-pardue-moq-qlog-moq-events-03 Section 5.7 +fn extension_headers_to_qlog(kvps: &[coding::KeyValuePair]) -> JsonValue { + json!(kvps + .iter() + .map(|kvp| match &kvp.value { + coding::Value::IntValue(v) => json!({ + "header_type": kvp.key, + "header_value": v, + }), + coding::Value::BytesValue(bytes) => json!({ + "header_type": kvp.key, + "header_length": bytes.len(), + "payload": hex_encode(bytes), + }), + }) + .collect::>()) +} + +/// Convert a TrackNamespace to the qlog [*MOQTByteString] format. +/// Per draft-pardue-moq-qlog-moq-events-03 Section 5.4 +fn namespace_to_qlog(ns: &coding::TrackNamespace) -> JsonValue { + json!(ns + .fields + .iter() + .map(|f| json!({"value": String::from_utf8_lossy(&f.value)})) + .collect::>()) +} + +/// Convert a track name string to MOQTByteString format. +/// Per draft-pardue-moq-qlog-moq-events-03 Section 5.4 +fn track_name_to_qlog(name: &str) -> JsonValue { + json!({"value": name}) +} + +/// Convert a Location to MOQTLocation format. +/// Per draft-pardue-moq-qlog-moq-events-03 Section 5.5 +fn location_to_qlog(loc: &coding::Location) -> JsonValue { + json!({"group": loc.group_id, "object": loc.object_id}) } fn create_control_message_event( time: f64, stream_id: u64, is_parsed: bool, - msg_type: &str, message: JsonValue, ) -> Event { if is_parsed { Event { time, name: "moqt:control_message_parsed".to_string(), - data: EventData::ControlMessageParsed(ControlMessageParsed { - stream_id, - message_type: msg_type.to_string(), - message, - }), + data: EventData::ControlMessageParsed(ControlMessageParsed { stream_id, message }), } } else { Event { time, name: "moqt:control_message_created".to_string(), - data: EventData::ControlMessageCreated(ControlMessageCreated { - stream_id, - message_type: msg_type.to_string(), - message, - }), + data: EventData::ControlMessageCreated(ControlMessageCreated { stream_id, message }), } } } @@ -215,12 +324,12 @@ pub fn client_setup_parsed(time: f64, stream_id: u64, msg: &setup::Client) -> Ev time, stream_id, true, - "client_setup", - json!( - { + json!({ + "type": "client_setup", "number_of_supported_versions": msg.versions.0.len(), "supported_versions": versions, - "parameters": key_value_pairs_to_vec(&msg.params.0), + "number_of_parameters": msg.params.0.len(), + "setup_parameters": setup_params_to_qlog(&msg.params.0), }), ) } @@ -231,31 +340,34 @@ pub fn server_setup_created(time: f64, stream_id: u64, msg: &setup::Server) -> E time, stream_id, false, - "server_setup", - json!( - { + json!({ + "type": "server_setup", "selected_version": format!("{:?}", msg.version), - "parameters": key_value_pairs_to_vec(&msg.params.0), + "number_of_parameters": msg.params.0.len(), + "setup_parameters": setup_params_to_qlog(&msg.params.0), }), ) } /// Helper to convert SUBSCRIBE message to JSON +/// Per draft-pardue-moq-qlog-moq-events-03 Section 5.6.6 fn subscribe_to_json(msg: &message::Subscribe) -> JsonValue { let mut json = json!({ - "subscribe_id": msg.id, - "track_namespace": msg.track_namespace.to_string(), - "track_name": &msg.track_name, + "type": "subscribe", + "request_id": msg.id, + "track_namespace": namespace_to_qlog(&msg.track_namespace), + "track_name": track_name_to_qlog(&msg.track_name), "subscriber_priority": msg.subscriber_priority, "group_order": format!("{:?}", msg.group_order), + "forward": msg.forward, "filter_type": format!("{:?}", msg.filter_type), - "parameters": key_value_pairs_to_vec(&msg.params.0), + "number_of_parameters": msg.params.0.len(), + "parameters": params_to_qlog(&msg.params.0), }); // Add optional fields based on filter type if let Some(start_loc) = &msg.start_location { - json["start_group"] = json!(start_loc.group_id); - json["start_object"] = json!(start_loc.object_id); + json["start_location"] = location_to_qlog(start_loc); } if let Some(end_group) = msg.end_group_id { json["end_group"] = json!(end_group); @@ -266,30 +378,32 @@ fn subscribe_to_json(msg: &message::Subscribe) -> JsonValue { /// Create a control_message_parsed event for SUBSCRIBE pub fn subscribe_parsed(time: f64, stream_id: u64, msg: &message::Subscribe) -> Event { - create_control_message_event(time, stream_id, true, "subscribe", subscribe_to_json(msg)) + create_control_message_event(time, stream_id, true, subscribe_to_json(msg)) } /// Create a control_message_created event for SUBSCRIBE pub fn subscribe_created(time: f64, stream_id: u64, msg: &message::Subscribe) -> Event { - create_control_message_event(time, stream_id, false, "subscribe", subscribe_to_json(msg)) + create_control_message_event(time, stream_id, false, subscribe_to_json(msg)) } /// Helper to convert SUBSCRIBE_OK message to JSON +/// Per draft-pardue-moq-qlog-moq-events-03 Section 5.6.7 fn subscribe_ok_to_json(msg: &message::SubscribeOk) -> JsonValue { let mut json = json!({ - "subscribe_id": msg.id, + "type": "subscribe_ok", + "request_id": msg.id, "track_alias": msg.track_alias, "expires": msg.expires, "group_order": format!("{:?}", msg.group_order), "content_exists": msg.content_exists, - "parameters": key_value_pairs_to_vec(&msg.params.0), + "number_of_parameters": msg.params.0.len(), + "parameters": params_to_qlog(&msg.params.0), }); - // Add optional largest_location fields if content exists + // Add optional largest_location if content exists if msg.content_exists { if let Some(largest) = &msg.largest_location { - json["largest_group_id"] = json!(largest.group_id); - json["largest_object_id"] = json!(largest.object_id); + json["largest_location"] = location_to_qlog(largest); } } @@ -298,79 +412,54 @@ fn subscribe_ok_to_json(msg: &message::SubscribeOk) -> JsonValue { /// Create a control_message_parsed event for SUBSCRIBE_OK pub fn subscribe_ok_parsed(time: f64, stream_id: u64, msg: &message::SubscribeOk) -> Event { - create_control_message_event( - time, - stream_id, - true, - "subscribe_ok", - subscribe_ok_to_json(msg), - ) + create_control_message_event(time, stream_id, true, subscribe_ok_to_json(msg)) } /// Create a control_message_created event for SUBSCRIBE_OK pub fn subscribe_ok_created(time: f64, stream_id: u64, msg: &message::SubscribeOk) -> Event { - create_control_message_event( - time, - stream_id, - false, - "subscribe_ok", - subscribe_ok_to_json(msg), - ) + create_control_message_event(time, stream_id, false, subscribe_ok_to_json(msg)) } /// Helper to convert SUBSCRIBE_ERROR message to JSON +/// Per draft-pardue-moq-qlog-moq-events-03 Section 5.6.8 fn subscribe_error_to_json(msg: &message::SubscribeError) -> JsonValue { json!({ - "subscribe_id": msg.id, + "type": "subscribe_error", + "request_id": msg.id, "error_code": msg.error_code, - "reason_phrase": &msg.reason_phrase.0, + "reason": &msg.reason_phrase.0, }) } /// Create a control_message_parsed event for SUBSCRIBE_ERROR pub fn subscribe_error_parsed(time: f64, stream_id: u64, msg: &message::SubscribeError) -> Event { - create_control_message_event( - time, - stream_id, - true, - "subscribe_error", - subscribe_error_to_json(msg), - ) + create_control_message_event(time, stream_id, true, subscribe_error_to_json(msg)) } /// Create a control_message_created event for SUBSCRIBE_ERROR pub fn subscribe_error_created(time: f64, stream_id: u64, msg: &message::SubscribeError) -> Event { - create_control_message_event( - time, - stream_id, - false, - "subscribe_error", - subscribe_error_to_json(msg), - ) + create_control_message_event(time, stream_id, false, subscribe_error_to_json(msg)) } /// Helper to convert PUBLISH_NAMESPACE message to JSON +/// Per draft-pardue-moq-qlog-moq-events-03 Section 5.6.22 fn publish_namespace_to_json(msg: &message::PublishNamespace) -> JsonValue { json!({ + "type": "publish_namespace", "request_id": msg.id, - "track_namespace": msg.track_namespace.to_string(), - "parameters": key_value_pairs_to_vec(&msg.params.0), + "track_namespace": namespace_to_qlog(&msg.track_namespace), + "number_of_parameters": msg.params.0.len(), + "parameters": params_to_qlog(&msg.params.0), }) } -/// Create a control_message_parsed event for PUBLISH_NAMESPACE (was ANNOUNCE in earlier drafts) +/// Create a control_message_parsed event for PUBLISH_NAMESPACE pub fn publish_namespace_parsed( time: f64, stream_id: u64, msg: &message::PublishNamespace, ) -> Event { - create_control_message_event( - time, - stream_id, - true, - "publish_namespace", - publish_namespace_to_json(msg), - ) + create_control_message_event(time, stream_id, true, publish_namespace_to_json(msg)) } /// Create a control_message_created event for PUBLISH_NAMESPACE @@ -379,35 +468,25 @@ pub fn publish_namespace_created( stream_id: u64, msg: &message::PublishNamespace, ) -> Event { - create_control_message_event( - time, - stream_id, - false, - "publish_namespace", - publish_namespace_to_json(msg), - ) + create_control_message_event(time, stream_id, false, publish_namespace_to_json(msg)) } /// Helper to convert PUBLISH_NAMESPACE_OK message to JSON +/// Per draft-pardue-moq-qlog-moq-events-03 Section 5.6.23 fn publish_namespace_ok_to_json(msg: &message::PublishNamespaceOk) -> JsonValue { json!({ + "type": "publish_namespace_ok", "request_id": msg.id, }) } -/// Create a control_message_parsed event for PUBLISH_NAMESPACE_OK (was ANNOUNCE_OK) +/// Create a control_message_parsed event for PUBLISH_NAMESPACE_OK pub fn publish_namespace_ok_parsed( time: f64, stream_id: u64, msg: &message::PublishNamespaceOk, ) -> Event { - create_control_message_event( - time, - stream_id, - true, - "publish_namespace_ok", - publish_namespace_ok_to_json(msg), - ) + create_control_message_event(time, stream_id, true, publish_namespace_ok_to_json(msg)) } /// Create a control_message_created event for PUBLISH_NAMESPACE_OK @@ -416,37 +495,27 @@ pub fn publish_namespace_ok_created( stream_id: u64, msg: &message::PublishNamespaceOk, ) -> Event { - create_control_message_event( - time, - stream_id, - false, - "publish_namespace_ok", - publish_namespace_ok_to_json(msg), - ) + create_control_message_event(time, stream_id, false, publish_namespace_ok_to_json(msg)) } /// Helper to convert PUBLISH_NAMESPACE_ERROR message to JSON +/// Per draft-pardue-moq-qlog-moq-events-03 Section 5.6.24 fn publish_namespace_error_to_json(msg: &message::PublishNamespaceError) -> JsonValue { json!({ + "type": "publish_namespace_error", "request_id": msg.id, "error_code": msg.error_code, - "reason_phrase": &msg.reason_phrase.0, + "reason": &msg.reason_phrase.0, }) } -/// Create a control_message_parsed event for PUBLISH_NAMESPACE_ERROR (was ANNOUNCE_ERROR) +/// Create a control_message_parsed event for PUBLISH_NAMESPACE_ERROR pub fn publish_namespace_error_parsed( time: f64, stream_id: u64, msg: &message::PublishNamespaceError, ) -> Event { - create_control_message_event( - time, - stream_id, - true, - "publish_namespace_error", - publish_namespace_error_to_json(msg), - ) + create_control_message_event(time, stream_id, true, publish_namespace_error_to_json(msg)) } /// Create a control_message_created event for PUBLISH_NAMESPACE_ERROR @@ -455,73 +524,59 @@ pub fn publish_namespace_error_created( stream_id: u64, msg: &message::PublishNamespaceError, ) -> Event { - create_control_message_event( - time, - stream_id, - false, - "publish_namespace_error", - publish_namespace_error_to_json(msg), - ) + create_control_message_event(time, stream_id, false, publish_namespace_error_to_json(msg)) } /// Create a control_message_parsed event for UNSUBSCRIBE +/// Per draft-pardue-moq-qlog-moq-events-03 Section 5.6.10 pub fn unsubscribe_parsed(time: f64, stream_id: u64, msg: &message::Unsubscribe) -> Event { create_control_message_event( time, stream_id, true, - "unsubscribe", - json!({ - "subscribe_id": msg.id, - }), + json!({"type": "unsubscribe", "request_id": msg.id}), ) } /// Create a control_message_created event for UNSUBSCRIBE +/// Per draft-pardue-moq-qlog-moq-events-03 Section 5.6.10 pub fn unsubscribe_created(time: f64, stream_id: u64, msg: &message::Unsubscribe) -> Event { create_control_message_event( time, stream_id, false, - "unsubscribe", - json!({ - "subscribe_id": msg.id, - }), + json!({"type": "unsubscribe", "request_id": msg.id}), ) } /// Create a control_message_parsed event for GOAWAY +/// Per draft-pardue-moq-qlog-moq-events-03 Section 5.6.3 pub fn go_away_parsed(time: f64, stream_id: u64, msg: &message::GoAway) -> Event { create_control_message_event( time, stream_id, true, - "goaway", - json!({ - "new_session_uri": &msg.uri.0, - }), + json!({"type": "goaway", "new_session_uri": &msg.uri.0}), ) } /// Create a control_message_created event for GOAWAY +/// Per draft-pardue-moq-qlog-moq-events-03 Section 5.6.3 pub fn go_away_created(time: f64, stream_id: u64, msg: &message::GoAway) -> Event { create_control_message_event( time, stream_id, false, - "goaway", - json!({ - "new_session_uri": &msg.uri.0, - }), + json!({"type": "goaway", "new_session_uri": &msg.uri.0}), ) } // Data plane events /// Helper to convert SubgroupHeader to JSON +/// Per draft-pardue-moq-qlog-moq-events-03 Section 4.9 fn subgroup_header_to_json(header: &data::SubgroupHeader) -> JsonValue { let mut json = json!({ - "header_type": format!("{:?}", header.header_type), "track_alias": header.track_alias, "group_id": header.group_id, "publisher_priority": header.publisher_priority, @@ -559,6 +614,7 @@ pub fn subgroup_header_created(time: f64, stream_id: u64, header: &data::Subgrou } /// Helper to convert SubgroupObject to JSON +/// Per draft-pardue-moq-qlog-moq-events-03 Section 4.11 fn subgroup_object_to_json( group_id: u64, subgroup_id: u64, @@ -569,7 +625,8 @@ fn subgroup_object_to_json( "group_id": group_id, "subgroup_id": subgroup_id, "object_id": object_id, - // TODO send object_playload itself + "extension_headers_length": 0, + // TODO send object_payload itself "object_payload_length": object.payload_length, }); @@ -618,7 +675,8 @@ pub fn subgroup_object_created( } } -/// Helper to convert SubgroupObject to JSON +/// Helper to convert SubgroupObjectExt to JSON +/// Per draft-pardue-moq-qlog-moq-events-03 Section 4.11 fn subgroup_object_ext_to_json( group_id: u64, subgroup_id: u64, @@ -629,8 +687,9 @@ fn subgroup_object_ext_to_json( "group_id": group_id, "subgroup_id": subgroup_id, "object_id": object_id, - "extension_headers": key_value_pairs_to_vec(&object.extension_headers.0), - // TODO send object_playload itself + "extension_headers_length": ext_headers_wire_len(&object.extension_headers), + "extension_headers": extension_headers_to_qlog(&object.extension_headers.0), + // TODO send object_payload itself "object_payload_length": object.payload_length, }); @@ -692,7 +751,10 @@ fn object_datagram_to_json(datagram: &data::Datagram) -> JsonValue { }); if let Some(extension_headers) = &datagram.extension_headers { - json["extension_headers"] = json!(key_value_pairs_to_vec(&extension_headers.0)); + json["extension_headers_length"] = json!(ext_headers_wire_len(extension_headers)); + json["extension_headers"] = extension_headers_to_qlog(&extension_headers.0); + } else { + json["extension_headers_length"] = json!(0u64); } if let Some(status) = datagram.status { diff --git a/moq-transport/src/mlog/writer.rs b/moq-transport/src/mlog/writer.rs index 5dc29478..099a1e30 100644 --- a/moq-transport/src/mlog/writer.rs +++ b/moq-transport/src/mlog/writer.rs @@ -4,7 +4,7 @@ use std::fs::File; use std::io::{self, BufWriter, Write}; use std::path::Path; -use std::time::Instant; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; use super::Event; @@ -12,7 +12,8 @@ use super::Event; /// Writes JSON-SEQ format compatible with qlog aggregation pub struct MlogWriter { writer: BufWriter, - start_time: Instant, + start: Instant, + epoch_offset_ms: f64, } impl MlogWriter { @@ -21,22 +22,41 @@ impl MlogWriter { let file = File::create(path)?; let mut writer = BufWriter::new(file); - let start_time = Instant::now(); + // Capture the epoch offset once at startup, then use cheap Instant + // for per-event timing. This avoids a SystemTime syscall per event + // while still producing absolute epoch-ms timestamps. + let start = Instant::now(); + let epoch_offset_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() + * 1000.0; // Write qlog-compatible header as first record - // This follows qlog JSON-SEQ format (RFC 7464) + // This follows qlog JSON-SEQ format (RFC 7464) per + // draft-ietf-quic-qlog-main-schema-13 Section 5 + // + // Uses epoch-relative timestamps (absolute epoch-ms) so that + // consumers can use the time field directly as a native timestamp + // without needing to know the trace start time. let header = serde_json::json!({ - "qlog_version": "0.3", - "qlog_format": "JSON-SEQ", + "file_schema": "urn:ietf:params:qlog:file:sequential", + "serialization_format": "JSON-SEQ", "title": "moq-relay", "description": "MoQ Transport events", "trace": { "vantage_point": { "type": "server" }, + "common_fields": { + "time_format": "relative_to_epoch", + "reference_time": { + "clock_type": "system", + "epoch": "1970-01-01T00:00:00.000Z" + } + }, "event_schemas": [ - "urn:ietf:params:qlog:events:loglevel", - "urn:ietf:params:qlog:events:moqt" + "urn:ietf:params:qlog:events:moqt-03" ] } }); @@ -46,12 +66,21 @@ impl MlogWriter { writer.write_all(b"\n")?; writer.flush()?; - Ok(Self { writer, start_time }) + Ok(Self { + writer, + start, + epoch_offset_ms, + }) } - /// Get elapsed time in milliseconds since connection start - pub fn elapsed_ms(&self) -> f64 { - self.start_time.elapsed().as_secs_f64() * 1000.0 + /// Get current time as epoch milliseconds for event timestamps. + /// Per qlog-main-schema-13 Section 7.1, with time_format "relative_to_epoch" + /// and epoch "1970-01-01T00:00:00.000Z", time values are absolute Unix epoch ms. + /// + /// Uses a cached epoch offset from startup plus cheap monotonic elapsed time, + /// avoiding a SystemTime syscall per event. + pub fn epoch_ms(&self) -> f64 { + self.epoch_offset_ms + self.start.elapsed().as_secs_f64() * 1000.0 } /// Add an event to the log diff --git a/moq-transport/src/session/mod.rs b/moq-transport/src/session/mod.rs index a2313636..85668cdb 100644 --- a/moq-transport/src/session/mod.rs +++ b/moq-transport/src/session/mod.rs @@ -654,7 +654,7 @@ impl Session { // Emit mlog event for CLIENT_SETUP parsed if let Some(ref mut mlog) = mlog { - let event = mlog::events::client_setup_parsed(mlog.elapsed_ms(), 0, &client); + let event = mlog::events::client_setup_parsed(mlog.epoch_ms(), 0, &client); let _ = mlog.add_event(event); } @@ -682,7 +682,7 @@ impl Session { // Emit mlog event for SERVER_SETUP created if let Some(ref mut mlog) = mlog { - let event = mlog::events::server_setup_created(mlog.elapsed_ms(), 0, &server); + let event = mlog::events::server_setup_created(mlog.epoch_ms(), 0, &server); let _ = mlog.add_event(event); } @@ -728,7 +728,7 @@ impl Session { // Emit mlog event for sent control messages if let Some(ref mlog) = mlog { if let Ok(mut mlog_guard) = mlog.lock() { - let time = mlog_guard.elapsed_ms(); + let time = mlog_guard.epoch_ms(); let stream_id = 0; // Control stream is always stream 0 // Emit events based on message type @@ -792,7 +792,7 @@ impl Session { // Emit mlog event for received control messages if let Some(ref mlog) = mlog { if let Ok(mut mlog_guard) = mlog.lock() { - let time = mlog_guard.elapsed_ms(); + let time = mlog_guard.epoch_ms(); let stream_id = 0; // Control stream is always stream 0 // Emit events based on message type diff --git a/moq-transport/src/session/subscribed.rs b/moq-transport/src/session/subscribed.rs index 5847e6e4..db5f96a4 100644 --- a/moq-transport/src/session/subscribed.rs +++ b/moq-transport/src/session/subscribed.rs @@ -268,7 +268,7 @@ impl Subscribed { // Log subgroup header created/sent if let Some(ref mlog) = mlog { if let Ok(mut mlog_guard) = mlog.lock() { - let time = mlog_guard.elapsed_ms(); + let time = mlog_guard.epoch_ms(); let stream_id = 0; // TODO: Placeholder, need actual QUIC stream ID let event = mlog::subgroup_header_created(time, stream_id, &header); let _ = mlog_guard.add_event(event); @@ -304,7 +304,7 @@ impl Subscribed { // Log subgroup object created/sent if let Some(ref mlog) = mlog { if let Ok(mut mlog_guard) = mlog.lock() { - let time = mlog_guard.elapsed_ms(); + let time = mlog_guard.epoch_ms(); let stream_id = 0; // TODO: Placeholder, need actual QUIC stream ID let event = mlog::subgroup_object_ext_created( time, @@ -413,7 +413,7 @@ impl Subscribed { // Create mlog event for datagram created if let Some(ref mlog) = self.mlog { if let Ok(mut mlog_guard) = mlog.lock() { - let time = mlog_guard.elapsed_ms(); + let time = mlog_guard.epoch_ms(); let stream_id = 0; // TODO: Placeholder, need actual QUIC stream ID let _ = mlog_guard.add_event(mlog::object_datagram_created( time, diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index af3b51a4..fa6d362d 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -352,7 +352,7 @@ impl Subscriber { if let Some(ref subgroup_header) = stream_header.subgroup_header { if let Some(ref mlog) = self.mlog { if let Ok(mut mlog_guard) = mlog.lock() { - let time = mlog_guard.elapsed_ms(); + let time = mlog_guard.epoch_ms(); let stream_id = 0; // TODO: Placeholder, need actual QUIC stream ID let event = mlog::subgroup_header_parsed(time, stream_id, subgroup_header); let _ = mlog_guard.add_event(event); @@ -560,7 +560,7 @@ impl Subscriber { // Log subgroup object parsed/received if let Some(ref mlog) = mlog { if let Ok(mut mlog_guard) = mlog.lock() { - let time = mlog_guard.elapsed_ms(); + let time = mlog_guard.epoch_ms(); let stream_id = 0; // TODO: Placeholder, need actual QUIC stream ID let event = if let Some(obj_ext) = decoded_object { mlog::subgroup_object_ext_parsed( @@ -651,7 +651,7 @@ impl Subscriber { if let Some(ref mlog) = self.mlog { if let Ok(mut mlog_guard) = mlog.lock() { - let time = mlog_guard.elapsed_ms(); + let time = mlog_guard.epoch_ms(); let stream_id = 0; // TODO: Placeholder, need actual QUIC stream ID let _ = mlog_guard.add_event(mlog::object_datagram_parsed(time, stream_id, &datagram));