Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,52 @@ pub struct MqttConnection {
/// Allow insecure TLS connections
#[arg(long, global = true)]
pub insecure: bool,

/// Path to a TLS CA certificate file.
///
/// Used to verify the broker's certificate instead of using the system's certificate store.
/// Useful for self-signed certificates or private CA (like AWS `IoT` Core).
/// The file should contain PEM-encoded CA certificates.
#[arg(
long,
env = "MQTTUI_CA",
value_hint = ValueHint::FilePath,
value_name = "FILEPATH",
global = true,
)]
pub ca: Option<std::path::PathBuf>,

/// TLS ALPN protocol to use.
///
/// Required by some brokers like AWS `IoT` Core.
/// Common values: "mqtt" for standard MQTT, "x-amzn-mqtt-ca" for AWS `IoT` custom auth.
/// Can be specified multiple times for multiple protocols (first is preferred).
#[arg(
long,
env = "MQTTUI_ALPN",
value_hint = ValueHint::Other,
value_name = "PROTOCOL",
global = true,
)]
pub alpn: Vec<String>,

/// MQTT `QoS` level for subscriptions and publishing.
///
/// 0 = At most once (fire and forget)
/// 1 = At least once (acknowledged delivery)
/// 2 = Exactly once (assured delivery)
///
/// Note: AWS `IoT` Core does not support `QoS` 2. Use --qos 1 for AWS `IoT`.
#[arg(
long,
env = "MQTTUI_QOS",
value_hint = ValueHint::Other,
value_name = "LEVEL",
default_value_t = 2,
value_parser = clap::value_parser!(u8).range(0..=2),
global = true,
)]
pub qos: u8,
}

#[derive(Debug, Clone)]
Expand Down
3 changes: 2 additions & 1 deletion src/interactive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ pub fn show(
broker: &Broker,
subscribe_topic: Vec<String>,
payload_size_limit: usize,
qos: rumqttc::QoS,
) -> anyhow::Result<()> {
let mqtt_thread =
mqtt_thread::MqttThread::new(client, connection, subscribe_topic, payload_size_limit)?;
mqtt_thread::MqttThread::new(client, connection, subscribe_topic, payload_size_limit, qos)?;
let app = App::new(broker, mqtt_thread);

let original_hook = std::panic::take_hook();
Expand Down
23 changes: 19 additions & 4 deletions src/interactive/mqtt_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub struct MqttThread {
client: Client,
connection_err: ConnectionErrorArc,
history: HistoryArc,
qos: QoS,
}

impl MqttThread {
Expand All @@ -23,9 +24,10 @@ impl MqttThread {
connection: Connection,
subscribe_topic: Vec<String>,
payload_size_limit: usize,
qos: QoS,
) -> anyhow::Result<Self> {
for topic in &subscribe_topic {
client.subscribe(topic, QoS::ExactlyOnce)?;
client.subscribe(topic, qos)?;
}

let connection_err = Arc::new(RwLock::new(None));
Expand All @@ -43,6 +45,7 @@ impl MqttThread {
connection,
&subscribe_topic,
payload_size_limit,
qos,
&connection_err,
&history,
);
Expand All @@ -54,6 +57,7 @@ impl MqttThread {
client,
connection_err,
history,
qos,
})
}

Expand All @@ -62,7 +66,17 @@ impl MqttThread {
.read()
.expect("mqtt history thread panicked")
.as_ref()
.map(ToString::to_string)
.map(|err| {
let err_str = err.to_string();
// Provide helpful hint for common AWS IoT Core issue (QoS 2 not supported)
if err_str.contains("reset") || err_str.contains("closed") {
format!(
"{err_str}\n\nHint: AWS IoT Core doesn't support QoS 2. Try adding: --qos 1"
)
} else {
err_str
}
})
}

pub fn get_history(&self) -> RwLockReadGuard<'_, MqttHistory> {
Expand All @@ -81,7 +95,7 @@ impl MqttThread {
pub fn clean_below(&self, topic: &str) -> anyhow::Result<()> {
let topics = self.get_history().get_topics_below(topic);
for topic in topics {
self.client.publish(topic, QoS::ExactlyOnce, true, [])?;
self.client.publish(topic, self.qos, true, [])?;
}
Ok(())
}
Expand All @@ -93,6 +107,7 @@ fn thread_logic(
mut connection: Connection,
subscribe_topic: &[String],
payload_size_limit: usize,
qos: QoS,
connection_err: &ConnectionErrorArc,
history: &HistoryArc,
) {
Expand All @@ -104,7 +119,7 @@ fn thread_logic(
rumqttc::Event::Incoming(rumqttc::Packet::ConnAck(_)) => {
for topic in subscribe_topic {
client
.subscribe(topic, QoS::ExactlyOnce)
.subscribe(topic, qos)
.expect("should be able to subscribe");
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ fn main() -> anyhow::Result<()> {
} else {
None
};
// Extract and convert QoS before mqtt_connection is moved
let qos = match matches.mqtt_connection.qos {
0 => QoS::AtMostOnce,
1 => QoS::AtLeastOnce,
2 => QoS::ExactlyOnce,
_ => unreachable!("QoS value should be 0, 1, or 2 (enforced by clap)"),
};
let (broker, client, connection) = mqtt::connect(matches.mqtt_connection, keep_alive)?;

match matches.subcommands {
Expand Down Expand Up @@ -70,12 +77,14 @@ fn main() -> anyhow::Result<()> {
publish::eventloop(&client, connection, verbose);
}
None => {
// Only interactive mode uses the --qos flag (default: QoS 2)
interactive::show(
client.clone(),
connection,
&broker,
matches.topic,
matches.payload_size_limit,
qos,
)?;
client.disconnect()?;
}
Expand Down
7 changes: 7 additions & 0 deletions src/mqtt/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub fn connect(
client_cert,
client_private_key,
insecure,
ca,
alpn,
qos: _, // QoS is extracted separately in main.rs before this call
}: MqttConnection,
keep_alive: Option<Duration>,
) -> anyhow::Result<(Broker, Client, Connection)> {
Expand All @@ -24,6 +27,8 @@ pub fn connect(
insecure,
client_cert.as_deref(),
client_private_key.as_deref(),
ca.as_deref(),
&alpn,
)?),
host.clone(),
*port,
Expand All @@ -35,6 +40,8 @@ pub fn connect(
insecure,
client_cert.as_deref(),
client_private_key.as_deref(),
ca.as_deref(),
&alpn,
)?),
url.to_string(),
666,
Expand Down
34 changes: 28 additions & 6 deletions src/mqtt/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,27 @@ pub fn create_tls_configuration(
insecure: bool,
client_cert: Option<&Path>,
client_private_key: Option<&Path>,
ca_cert: Option<&Path>,
alpn_protocols: &[String],
) -> anyhow::Result<TlsConfiguration> {
let mut roots = rustls::RootCertStore::empty();
let native_certs = rustls_native_certs::load_native_certs();
for error in native_certs.errors {
eprintln!(
"Warning: might skip some native certificates because of an error while loading: {error}"
);

// Load CA certificates from file if provided, otherwise use the system's native cert store.
// Custom CA files are needed for brokers using self-signed certs or private CAs (e.g., AWS IoT Core).
if let Some(ca_path) = ca_cert {
let ca_certs = read_certificate_file(ca_path)?;
for cert in ca_certs {
roots.add(cert)?;
}
} else {
let native_certs = rustls_native_certs::load_native_certs();
for error in native_certs.errors {
eprintln!(
"Warning: might skip some native certificates because of an error while loading: {error}"
);
}
roots.add_parsable_certificates(native_certs.certs);
}
roots.add_parsable_certificates(native_certs.certs);

let conf = ClientConfig::builder().with_root_certificates(roots);

Expand All @@ -85,6 +97,16 @@ pub fn create_tls_configuration(
};
conf.key_log = Arc::new(KeyLogFile::new());

// Set ALPN (Application-Layer Protocol Negotiation) protocols if provided.
// Required by some brokers like AWS IoT Core to identify the protocol during TLS handshake.
// Common values: "mqtt" for standard MQTT, "x-amzn-mqtt-ca" for AWS IoT custom auth.
if !alpn_protocols.is_empty() {
conf.alpn_protocols = alpn_protocols
.iter()
.map(|protocol| protocol.as_bytes().to_vec())
.collect();
}

if insecure {
let mut danger = conf.dangerous();
danger.set_certificate_verifier(Arc::new(NoVerifier {}));
Expand Down