From c2d96f5dd2b0819a94987b6bc284c4ad273a2451 Mon Sep 17 00:00:00 2001 From: Matt Mick Date: Wed, 28 Jan 2026 15:08:46 -0600 Subject: [PATCH] feat(tls): add AWS IoT Core support via --ca, --alpn, and --qos options - Add --ca option to specify CA certificate file for broker verification - Add --alpn option for TLS ALPN protocol negotiation (e.g., "mqtt") - Add --qos option to configure QoS level (0, 1, or 2) for interactive mode - Add helpful error hint when connection is closed, suggesting --qos 1 The --qos option only affects the interactive TUI mode. Other subcommands (log, read-one, publish, clean-retained) retain their original behavior. Default QoS remains 2 for backward compatibility. AWS IoT Core requires ALPN and does not support QoS 2. Typical usage: mqttui --ca root.pem --alpn mqtt --qos 1 ... Fixes "connection closed by peer" errors when connecting to AWS IoT Core. Co-Authored-By: Claude Opus 4.5 --- src/cli.rs | 46 ++++++++++++++++++++++++++++++++++ src/interactive/mod.rs | 3 ++- src/interactive/mqtt_thread.rs | 23 ++++++++++++++--- src/main.rs | 9 +++++++ src/mqtt/connect.rs | 7 ++++++ src/mqtt/encryption.rs | 34 ++++++++++++++++++++----- 6 files changed, 111 insertions(+), 11 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 1758e098..0bae1268 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -246,6 +246,52 @@ pub struct MqttConnection { /// Allow insecure TLS connections #[arg(long, global = true)] pub insecure: bool, + + /// Path to a TLS CA certificate file. + /// + /// Used to verify the broker's certificate instead of using the system's certificate store. + /// Useful for self-signed certificates or private CA (like AWS `IoT` Core). + /// The file should contain PEM-encoded CA certificates. + #[arg( + long, + env = "MQTTUI_CA", + value_hint = ValueHint::FilePath, + value_name = "FILEPATH", + global = true, + )] + pub ca: Option, + + /// TLS ALPN protocol to use. + /// + /// Required by some brokers like AWS `IoT` Core. + /// Common values: "mqtt" for standard MQTT, "x-amzn-mqtt-ca" for AWS `IoT` custom auth. + /// Can be specified multiple times for multiple protocols (first is preferred). + #[arg( + long, + env = "MQTTUI_ALPN", + value_hint = ValueHint::Other, + value_name = "PROTOCOL", + global = true, + )] + pub alpn: Vec, + + /// MQTT `QoS` level for subscriptions and publishing. + /// + /// 0 = At most once (fire and forget) + /// 1 = At least once (acknowledged delivery) + /// 2 = Exactly once (assured delivery) + /// + /// Note: AWS `IoT` Core does not support `QoS` 2. Use --qos 1 for AWS `IoT`. + #[arg( + long, + env = "MQTTUI_QOS", + value_hint = ValueHint::Other, + value_name = "LEVEL", + default_value_t = 2, + value_parser = clap::value_parser!(u8).range(0..=2), + global = true, + )] + pub qos: u8, } #[derive(Debug, Clone)] diff --git a/src/interactive/mod.rs b/src/interactive/mod.rs index e7dbbf3f..672221fb 100644 --- a/src/interactive/mod.rs +++ b/src/interactive/mod.rs @@ -62,9 +62,10 @@ pub fn show( broker: &Broker, subscribe_topic: Vec, payload_size_limit: usize, + qos: rumqttc::QoS, ) -> anyhow::Result<()> { let mqtt_thread = - mqtt_thread::MqttThread::new(client, connection, subscribe_topic, payload_size_limit)?; + mqtt_thread::MqttThread::new(client, connection, subscribe_topic, payload_size_limit, qos)?; let app = App::new(broker, mqtt_thread); let original_hook = std::panic::take_hook(); diff --git a/src/interactive/mqtt_thread.rs b/src/interactive/mqtt_thread.rs index bff1e1dd..e3d13003 100644 --- a/src/interactive/mqtt_thread.rs +++ b/src/interactive/mqtt_thread.rs @@ -15,6 +15,7 @@ pub struct MqttThread { client: Client, connection_err: ConnectionErrorArc, history: HistoryArc, + qos: QoS, } impl MqttThread { @@ -23,9 +24,10 @@ impl MqttThread { connection: Connection, subscribe_topic: Vec, payload_size_limit: usize, + qos: QoS, ) -> anyhow::Result { for topic in &subscribe_topic { - client.subscribe(topic, QoS::ExactlyOnce)?; + client.subscribe(topic, qos)?; } let connection_err = Arc::new(RwLock::new(None)); @@ -43,6 +45,7 @@ impl MqttThread { connection, &subscribe_topic, payload_size_limit, + qos, &connection_err, &history, ); @@ -54,6 +57,7 @@ impl MqttThread { client, connection_err, history, + qos, }) } @@ -62,7 +66,17 @@ impl MqttThread { .read() .expect("mqtt history thread panicked") .as_ref() - .map(ToString::to_string) + .map(|err| { + let err_str = err.to_string(); + // Provide helpful hint for common AWS IoT Core issue (QoS 2 not supported) + if err_str.contains("reset") || err_str.contains("closed") { + format!( + "{err_str}\n\nHint: AWS IoT Core doesn't support QoS 2. Try adding: --qos 1" + ) + } else { + err_str + } + }) } pub fn get_history(&self) -> RwLockReadGuard<'_, MqttHistory> { @@ -81,7 +95,7 @@ impl MqttThread { pub fn clean_below(&self, topic: &str) -> anyhow::Result<()> { let topics = self.get_history().get_topics_below(topic); for topic in topics { - self.client.publish(topic, QoS::ExactlyOnce, true, [])?; + self.client.publish(topic, self.qos, true, [])?; } Ok(()) } @@ -93,6 +107,7 @@ fn thread_logic( mut connection: Connection, subscribe_topic: &[String], payload_size_limit: usize, + qos: QoS, connection_err: &ConnectionErrorArc, history: &HistoryArc, ) { @@ -104,7 +119,7 @@ fn thread_logic( rumqttc::Event::Incoming(rumqttc::Packet::ConnAck(_)) => { for topic in subscribe_topic { client - .subscribe(topic, QoS::ExactlyOnce) + .subscribe(topic, qos) .expect("should be able to subscribe"); } } diff --git a/src/main.rs b/src/main.rs index 997111c4..664e242d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,6 +22,13 @@ fn main() -> anyhow::Result<()> { } else { None }; + // Extract and convert QoS before mqtt_connection is moved + let qos = match matches.mqtt_connection.qos { + 0 => QoS::AtMostOnce, + 1 => QoS::AtLeastOnce, + 2 => QoS::ExactlyOnce, + _ => unreachable!("QoS value should be 0, 1, or 2 (enforced by clap)"), + }; let (broker, client, connection) = mqtt::connect(matches.mqtt_connection, keep_alive)?; match matches.subcommands { @@ -70,12 +77,14 @@ fn main() -> anyhow::Result<()> { publish::eventloop(&client, connection, verbose); } None => { + // Only interactive mode uses the --qos flag (default: QoS 2) interactive::show( client.clone(), connection, &broker, matches.topic, matches.payload_size_limit, + qos, )?; client.disconnect()?; } diff --git a/src/mqtt/connect.rs b/src/mqtt/connect.rs index 32dc6e9e..4abe38f4 100644 --- a/src/mqtt/connect.rs +++ b/src/mqtt/connect.rs @@ -14,6 +14,9 @@ pub fn connect( client_cert, client_private_key, insecure, + ca, + alpn, + qos: _, // QoS is extracted separately in main.rs before this call }: MqttConnection, keep_alive: Option, ) -> anyhow::Result<(Broker, Client, Connection)> { @@ -24,6 +27,8 @@ pub fn connect( insecure, client_cert.as_deref(), client_private_key.as_deref(), + ca.as_deref(), + &alpn, )?), host.clone(), *port, @@ -35,6 +40,8 @@ pub fn connect( insecure, client_cert.as_deref(), client_private_key.as_deref(), + ca.as_deref(), + &alpn, )?), url.to_string(), 666, diff --git a/src/mqtt/encryption.rs b/src/mqtt/encryption.rs index 1775e1db..7c4c50b8 100644 --- a/src/mqtt/encryption.rs +++ b/src/mqtt/encryption.rs @@ -63,15 +63,27 @@ pub fn create_tls_configuration( insecure: bool, client_cert: Option<&Path>, client_private_key: Option<&Path>, + ca_cert: Option<&Path>, + alpn_protocols: &[String], ) -> anyhow::Result { let mut roots = rustls::RootCertStore::empty(); - let native_certs = rustls_native_certs::load_native_certs(); - for error in native_certs.errors { - eprintln!( - "Warning: might skip some native certificates because of an error while loading: {error}" - ); + + // Load CA certificates from file if provided, otherwise use the system's native cert store. + // Custom CA files are needed for brokers using self-signed certs or private CAs (e.g., AWS IoT Core). + if let Some(ca_path) = ca_cert { + let ca_certs = read_certificate_file(ca_path)?; + for cert in ca_certs { + roots.add(cert)?; + } + } else { + let native_certs = rustls_native_certs::load_native_certs(); + for error in native_certs.errors { + eprintln!( + "Warning: might skip some native certificates because of an error while loading: {error}" + ); + } + roots.add_parsable_certificates(native_certs.certs); } - roots.add_parsable_certificates(native_certs.certs); let conf = ClientConfig::builder().with_root_certificates(roots); @@ -85,6 +97,16 @@ pub fn create_tls_configuration( }; conf.key_log = Arc::new(KeyLogFile::new()); + // Set ALPN (Application-Layer Protocol Negotiation) protocols if provided. + // Required by some brokers like AWS IoT Core to identify the protocol during TLS handshake. + // Common values: "mqtt" for standard MQTT, "x-amzn-mqtt-ca" for AWS IoT custom auth. + if !alpn_protocols.is_empty() { + conf.alpn_protocols = alpn_protocols + .iter() + .map(|protocol| protocol.as_bytes().to_vec()) + .collect(); + } + if insecure { let mut danger = conf.dangerous(); danger.set_certificate_verifier(Arc::new(NoVerifier {}));