diff --git a/Cargo.toml b/Cargo.toml index c89058c..6f1543f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,4 +21,4 @@ tokio-stream = "0.1" [dev-dependencies] env_logger = "0.9" rand = "0.8" -tokio = { version = "1.17", features = ["macros", "net", "rt", "sync"] } +tokio = { version = "1.17", features = ["macros", "net", "rt-multi-thread", "sync"] } diff --git a/src/client.rs b/src/client.rs index 1e3ab17..bccd14a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,6 @@ use std::{ffi::OsString, path::Path}; -use crate::{Connection, Result}; +use crate::{Connection, Result, SyncConnection}; /// A bird client instance. You need to create a [Connection] from this /// client, using [Client::connect], to make requests. @@ -32,4 +32,13 @@ impl Client { pub async fn connect(&self) -> Result { Connection::new(&self.unix_socket).await } + + /// Open a new [SyncConnection] to this client. You can open multiple + /// connections to the same client. + /// + /// Note that this can fail if the unix socket is closed, or if the + /// initial hello negotiation with the server fails. + pub fn connect_sync(&self) -> Result { + SyncConnection::new(&self.unix_socket) + } } diff --git a/src/connection.rs b/src/connection.rs index a1f4f34..f07e90f 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -2,8 +2,11 @@ //! //! Refer to documentation of [Connection] for more details. -use std::{collections::VecDeque, io::ErrorKind, path::Path}; -use tokio::net::UnixStream; +use std::{ + collections::VecDeque, + io::{ErrorKind, Read, Write}, + path::Path, +}; use crate::{ Error, Interface, InterfaceAddress, InterfaceProperties, InterfaceSummary, Message, Protocol, @@ -17,7 +20,7 @@ use crate::{ /// before the response from the previous one has been fully received, you'll /// get a [Error::OperationInProgress] error. pub struct Connection { - stream: UnixStream, + stream: tokio::net::UnixStream, unparsed_bytes: Vec, unsent_messages: VecDeque, request_in_progress: bool, @@ -28,7 +31,7 @@ impl Connection { /// introductory welcome message before returning the [Connection] pub(crate) async fn new>(unix_socket: P) -> Result { // connect to the unix socket - let stream = UnixStream::connect(unix_socket).await?; + let stream = tokio::net::UnixStream::connect(unix_socket).await?; let mut connection = Connection { stream, @@ -40,7 +43,7 @@ impl Connection { // process greeting and return if let Message::Welcome(ref greeting) = connection.next_message().await? { - log::trace!("received greeting {}", greeting); + log::trace!("received greeting {greeting}"); // we need to do this because the message processor automatically adds an Ok if let Message::Ok = connection.next_message().await? { log::trace!("handshake completed. connection active"); @@ -118,80 +121,7 @@ impl Connection { /// list of [ShowInterfacesMessage] entries, one each for an interface. pub async fn show_interfaces(&mut self) -> Result> { let messages = self.send_request("show interfaces").await?; - let mut result = vec![]; - - // we expect messages to show up as a series of triplets: 1001, 1004 and 1003 - let mut idx = 0; - loop { - // each iteration here means fully going through all of 1001, 1004 and 1003 - - // if we're already at end, return - if idx >= messages.len() { - return Ok(result); - } - - // start processing - let first_msg = &messages[idx]; - idx += 1; - - // process only if we find the first entry to be a 1001 - if let Some(msg_1001) = Interface::from_enum(first_msg) { - // get the position of the next 1001 - let next_1001_idx = messages[idx..] - .iter() - .position(|x| matches!(x, Message::InterfaceList(_))) - .unwrap_or(messages.len() - idx) - + idx; - let delta = next_1001_idx - idx; - if delta == 0 || delta > 2 { - log::error!( - "conn: parse failed: a 1001 entry without (or more than one) 1003/1004", - ); - return Err(Error::ParseError(messages)); - } - let mut msg_1004: Option = None; - let mut msg_1003: Option> = None; - while idx < next_1001_idx { - let cur_msg = &messages[idx]; - idx += 1; - match cur_msg { - Message::InterfaceFlags(_) => { - if let Some(props) = InterfaceProperties::from_enum(cur_msg) { - msg_1004 = Some(props); - } else { - return Err(Error::ParseError(messages)); - } - } - Message::InterfaceAddress(_) => { - if let Some(addrs) = InterfaceAddress::from_enum(cur_msg) { - msg_1003 = Some(addrs); - } else { - return Err(Error::ParseError(messages)); - } - } - _ => { - log::error!( - "conn: parse failed: found invalid code {}", - messages[idx].code() - ); - return Err(Error::ParseError(messages)); - } - } - } - if let Some(msg_1004) = msg_1004 { - result.push(ShowInterfacesMessage { - interface: msg_1001, - properties: msg_1004, - addresses: msg_1003.unwrap_or_default(), - }); - } else { - log::error!("conn: parse failed: found a 1001 without a 1004"); - return Err(Error::ParseError(messages)); - } - } else { - return Err(Error::ParseError(messages)); - } - } + handle_show_interfaces(messages) } /// Sends a `show protocols []` request and returns the parsed response as a @@ -201,26 +131,12 @@ impl Connection { /// match the pattern. pub async fn show_protocols(&mut self, pattern: Option<&str>) -> Result> { let cmd = if let Some(pattern) = pattern { - format!("show protocols \"{}\"", pattern) + format!("show protocols \"{pattern}\"") } else { "show protocols".into() }; let messages = self.send_request(&cmd).await?; - - // we ignore the 2002 message, and focus only on 1005 - for message in &messages { - // if we get a 1002, we process it and return it - if let Message::ProtocolList(_) = message { - return if let Some(protocols) = Protocol::from_enum(message) { - Ok(protocols) - } else { - Err(Error::ParseError(messages)) - }; - } - } - - // if we didn't encounter any 1002, we return a ParseError - Err(Error::ParseError(messages)) + handle_show_protocols(messages) } /// Sends a `show protocols all []` request and returns the parsed response as a @@ -233,62 +149,12 @@ impl Connection { pattern: Option<&str>, ) -> Result> { let cmd = if let Some(pattern) = pattern { - format!("show protocols all \"{}\"", pattern) + format!("show protocols all \"{pattern}\"") } else { "show protocols all".into() }; - let mut result = vec![]; let messages = self.send_request(&cmd).await?; - - // we expect messages to show up as a series of doublets: 1002 and 1006 - if let Some(mut idx) = messages - .iter() - .position(|x| matches!(x, Message::ProtocolList(_))) - { - while idx < messages.len() { - // each iteration here means fully going through the pair of 1002 and 1006 - - if let Some(protocol) = Protocol::from_enum(&messages[idx]) { - // move index to the next message - idx += 1; - // if we have a valid 1002 ... - if let Some(protocol) = protocol.first() { - // ... check for 1006 - if idx == messages.len() - || !matches!(messages[idx], Message::ProtocolDetails(_)) - { - // if we're already at the end, or if there's no 1006 after the 1002 we saw - // just a step before, we push the current 1002 without any 1006, and continue - result.push(ShowProtocolDetailsMessage { - protocol: protocol.clone(), - detail: None, - }); - continue; - } - // looks like we have a valid 1006, so let's process it - if let Some(detail) = ProtocolDetail::from_enum(&messages[idx]) { - // looks like we got a valid 1006 - idx += 1; - result.push(ShowProtocolDetailsMessage { - protocol: protocol.clone(), - detail: Some(detail), - }); - } else { - log::error!("conn: failed to parse 1006 message"); - return Err(Error::ParseError(messages)); - } - } - } else { - log::error!("conn: failed to parse 1002 message: {:?}", messages[idx]); - return Err(Error::ParseError(messages)); - } - } - - Ok(result) - } else { - // No 1002 entries, so empty result - Ok(Vec::new()) - } + handle_show_protocols_details(messages) } /// Sends a `show status` request, and returns a semantically parsed response @@ -355,7 +221,12 @@ impl Connection { return Err(Error::eof("premature EOF")); } Ok(count) => { - if self.enqueue_messages(&frame[..count])? == 0 { + if enqueue_messages( + &frame[..count], + &mut self.unparsed_bytes, + &mut self.unsent_messages, + )? == 0 + { // we continue to fetch more if amount of data // was insufficient to parse response continue; @@ -371,230 +242,585 @@ impl Connection { } } } +} - /// Process raw bytes to parse and enqueue messages. On success returns the - /// number of messages enqueued. - /// - /// If we have pending unparsed bytes from previous iterations, we create a - /// new buffer that combines the old one with the new `frame`, and then - /// processes it. - /// - /// However, if we don't have any pending unparsed bytes, then it would be - /// an overhead to do so, so we just process the frame directly. - /// - /// In both cases, pending bytes from this iteration are added to - /// `unparsed_bytes` +/// A non-async version of [Connection] +pub struct SyncConnection { + stream: std::os::unix::net::UnixStream, + unparsed_bytes: Vec, + unsent_messages: VecDeque, + request_in_progress: bool, +} + +impl SyncConnection { + /// Open a new connection to this `unix_socket`, and consumes the + /// introductory welcome message before returning the [Connection] + pub(crate) fn new>(unix_socket: P) -> Result { + let stream = std::os::unix::net::UnixStream::connect(unix_socket)?; + + let mut connection = SyncConnection { + stream, + unparsed_bytes: Vec::with_capacity(2 * READ_FRAME_SIZE), + unsent_messages: VecDeque::with_capacity(20), + request_in_progress: true, + }; + + if let Message::Welcome(ref greeting) = connection.next_message()? { + log::trace!("received greeting {greeting}"); + if let Message::Ok = connection.next_message()? { + log::trace!("handshake completed. connection active"); + connection.allow_new_requests(); + return Ok(connection); + } + } + Err(Error::InvalidToken("did not find greeting".into())) + } + + /// Mark current request/response session as completed, so that new requests can + /// be made on this connection. #[inline] - fn enqueue_messages(&mut self, frame: &[u8]) -> Result { - let num_unparsed = self.unparsed_bytes.len(); - let has_unparsed = num_unparsed > 0; - if has_unparsed { - // if we had previously unparsed bytes, we use them in combination with - // the new frame - let mut new_vec: Vec = Vec::with_capacity(num_unparsed + frame.len()); - new_vec.extend_from_slice(&self.unparsed_bytes); - new_vec.extend_from_slice(frame); - self.enqueue_messages_from_buffer(&new_vec) + fn allow_new_requests(&mut self) { + self.request_in_progress = false; + } + + /// Sends a request to the server and gets a vec of response messages. The + /// terminating [Message::Ok] is not included. + pub fn send_request(&mut self, request: &str) -> Result> { + if self.request_in_progress { + return Err(Error::OperationInProgress); + } + + self.unparsed_bytes.clear(); + self.unsent_messages.clear(); + + let request = if request.ends_with('\n') { + request.to_owned() } else { - // if we didn't have any previously unparsed bytes, we can process this - // frame directly, gaining a tiny bit of efficiency. This helps in dealing - // with most messages that will tend to be quite small. - self.enqueue_messages_from_buffer(frame) + format!("{}\n", &request) + }; + let mut result: Vec = Vec::new(); + self.write_to_server(&request)?; + self.request_in_progress = true; + + loop { + let message = self.next_message()?; + if let Message::Ok = message { + self.allow_new_requests(); + return Ok(result); + } else { + result.push(message); + } } } - /// Processes raw data to parse and enqeueue Messages. - /// - /// The logic is straighforward, even if cumbersome to look at. We run a loop, where - /// at each iteration, we process a new line. In each line, we encounter one of the - /// following scenarios (xxxx is a 4 digit code): - /// 1. `xxxx` - this is the last line in this response - /// 2. `xxxx` - this is NOT the last line in this response - /// 3. `` - same as (2) but the xxxx code is implicitly = previous one + /// Sends a `show interfaces summary` request and returns the parsed response as a + /// list of [InterfaceSummary] entries, one each for an interface. + pub fn show_interfaces_summary(&mut self) -> Result> { + let messages = self.send_request("show interfaces summary")?; + + for message in &messages { + if let Message::InterfaceSummary(_) = message { + return if let Some(ifcs) = InterfaceSummary::from_enum(message) { + Ok(ifcs) + } else { + Err(Error::ParseError(messages)) + }; + } + } + + Err(Error::ParseError(messages)) + } + + /// Sends a `show interfaces` request and returns the parsed response as a + /// list of [ShowInterfacesMessage] entries, one each for an interface. + pub fn show_interfaces(&mut self) -> Result> { + let messages = self.send_request("show interfaces")?; + handle_show_interfaces(messages) + } + + /// Sends a `show protocols []` request and returns the parsed response as a + /// list of [InterfaceSummary] entries, one for each protocol. /// - /// More details about the protocol can be found [here](https://gitlab.nic.cz/labs/bird/-/blob/master/nest/cli.c) + /// If `pattern` is specified, results of only those protocols is returned, which + /// match the pattern. + pub fn show_protocols(&mut self, pattern: Option<&str>) -> Result> { + let cmd = if let Some(pattern) = pattern { + format!("show protocols \"{pattern}\"") + } else { + "show protocols".into() + }; + let messages = self.send_request(&cmd)?; + handle_show_protocols(messages) + } + + /// Sends a `show protocols all []` request and returns the parsed response as a + /// list of [ShowProtocolDetailsMessage] entries, one for each protocol instance. /// - /// While processing each line, we can return an `Ok(0)` to indicate we need more - /// data ([Connection::fetch_new_messages] takes care of that). - #[inline] - fn enqueue_messages_from_buffer(&mut self, buffer: &[u8]) -> Result { - let bsize = buffer.len(); - let mut num_messages = 0; - let mut pos: usize = 0; - let mut code: u32 = 0; - let mut msg_start_pos = 0; - let mut message_size: usize = 0; - let mut last_msg_added_epos = 0; + /// If `pattern` is specified, results of only those protocols is returned, which + /// match the pattern. + pub fn show_protocols_details( + &mut self, + pattern: Option<&str>, + ) -> Result> { + let cmd = if let Some(pattern) = pattern { + format!("show protocols all \"{pattern}\"") + } else { + "show protocols all".into() + }; + let messages = self.send_request(&cmd)?; + handle_show_protocols_details(messages) + } + + /// Sends a `show status` request, and returns a semantically parsed response + /// in the form of [ShowStatusMessage] + pub fn show_status(&mut self) -> Result { + let messages = self.send_request("show status")?; - // process things line by line. each iteration of this loop constitutes - // a new line + match ShowStatusMessage::from_messages(&messages) { + Some(ssm) => Ok(ssm), + None => Err(Error::ParseError(messages)), + } + } + + /// Reads a full [Message] from the server, and returns it + fn next_message(&mut self) -> Result { + if let Some(pending_message) = self.unsent_messages.pop_front() { + return Ok(pending_message); + } + + self.fetch_new_messages()?; + if let Some(new_message) = self.unsent_messages.pop_front() { + Ok(new_message) + } else { + Err(Error::eof("was expecting more messages")) + } + } + + /// Writes `request` to the server, returning only after it has been written + /// fully. + fn write_to_server(&mut self, request: &str) -> Result<()> { + let data = request.as_bytes(); + let total_size = data.len(); + let mut written_size = 0; loop { - let line_start_pos = pos; - log::trace!("conn: checking if we can start processing a new line"); - // break or ask for more data if we're at the end, but expected to parse - if pos >= bsize { - if num_messages > 0 { - log::trace!( - " need more data, exiting loop as already enqueued {} messages", - num_messages - ); - break; - } else { - log::trace!(" need more data"); - return Ok(0); // we need more data + match self.stream.write(data) { + Ok(n) => { + written_size += n; + if written_size >= total_size { + return Ok(()); + } + } + Err(err) => { + if err.kind() != ErrorKind::WouldBlock { + return Err(Error::from(err)); + } } } + } + } - // if we don't have visibility into the next newline, break or ask - // for more data - let nl_pos: usize; - match buffer[pos..].iter().position(|it| *it == b'\n') { - Some(it) => nl_pos = pos + it, - None => { - if num_messages > 0 { - log::trace!( - " need more data, exiting loop as already enqueued {} messages", - num_messages - ); - break; + /// Fetches and add news messages to the queue. + #[inline] + fn fetch_new_messages(&mut self) -> Result<()> { + loop { + let mut frame = [0_u8; READ_FRAME_SIZE]; + match self.stream.read(&mut frame) { + Ok(0) => { + return Err(Error::eof("premature EOF")); + } + Ok(count) => { + if enqueue_messages( + &frame[..count], + &mut self.unparsed_bytes, + &mut self.unsent_messages, + )? == 0 + { + continue; } else { - log::trace!(" need more data"); - return Ok(0); // we need more data + return Ok(()); } } - }; - let next_line_pos = nl_pos + 1; + Err(err) => { + if err.kind() != ErrorKind::WouldBlock { + return Err(Error::IoError(err)); + } + } + } + } + } +} - log::trace!( - "conn: processing line: {}", - String::from_utf8_lossy(&buffer[pos..nl_pos]) - ); +fn handle_show_interfaces(messages: Vec) -> Result> { + let mut result = vec![]; - if buffer[pos] == b' ' { - log::trace!(" no code present, we're a continuation of prev line"); - pos += 1; // we're now at start of data in this line - message_size += nl_pos - pos + 1; // +1 for newline - } else { - log::trace!(" line has a code, need to check if same as prev or not"); - // the line does not start with a space, so we MUST see a code - // and a continuation/final marker - if pos + 5 >= bsize { - if num_messages > 0 { - log::trace!( - " need more data, exiting loop as already enqueued {} messages", - num_messages + // we expect messages to show up as a series of triplets: 1001, 1004 and 1003 + let mut idx = 0; + loop { + // each iteration here means fully going through all of 1001, 1004 and 1003 + + // if we're already at end, return + if idx >= messages.len() { + return Ok(result); + } + + // start processing + let first_msg = &messages[idx]; + idx += 1; + + // process only if we find the first entry to be a 1001 + if let Some(msg_1001) = Interface::from_enum(first_msg) { + // get the position of the next 1001 + let next_1001_idx = messages[idx..] + .iter() + .position(|x| matches!(x, Message::InterfaceList(_))) + .unwrap_or(messages.len() - idx) + + idx; + let delta = next_1001_idx - idx; + if delta == 0 || delta > 2 { + log::error!( + "conn: parse failed: a 1001 entry without (or more than one) 1003/1004", + ); + return Err(Error::ParseError(messages)); + } + let mut msg_1004: Option = None; + let mut msg_1003: Option> = None; + while idx < next_1001_idx { + let cur_msg = &messages[idx]; + idx += 1; + match cur_msg { + Message::InterfaceFlags(_) => { + if let Some(props) = InterfaceProperties::from_enum(cur_msg) { + msg_1004 = Some(props); + } else { + return Err(Error::ParseError(messages)); + } + } + Message::InterfaceAddress(_) => { + if let Some(addrs) = InterfaceAddress::from_enum(cur_msg) { + msg_1003 = Some(addrs); + } else { + return Err(Error::ParseError(messages)); + } + } + _ => { + log::error!( + "conn: parse failed: found invalid code {}", + messages[idx].code() ); - break; + return Err(Error::ParseError(messages)); + } + } + } + if let Some(msg_1004) = msg_1004 { + result.push(ShowInterfacesMessage { + interface: msg_1001, + properties: msg_1004, + addresses: msg_1003.unwrap_or_default(), + }); + } else { + log::error!("conn: parse failed: found a 1001 without a 1004"); + return Err(Error::ParseError(messages)); + } + } else { + return Err(Error::ParseError(messages)); + } + } +} + +fn handle_show_protocols(messages: Vec) -> Result> { + // we ignore the 2002 message, and focus only on 1005 + for message in &messages { + // if we get a 1002, we process it and return it + if let Message::ProtocolList(_) = message { + return if let Some(protocols) = Protocol::from_enum(message) { + Ok(protocols) + } else { + Err(Error::ParseError(messages)) + }; + } + } + + // if we didn't encounter any 1002, we return a ParseError + Err(Error::ParseError(messages)) +} + +fn handle_show_protocols_details( + messages: Vec, +) -> Result> { + let mut result = vec![]; + // we expect messages to show up as a series of doublets: 1002 and 1006 + if let Some(mut idx) = messages + .iter() + .position(|x| matches!(x, Message::ProtocolList(_))) + { + while idx < messages.len() { + // each iteration here means fully going through the pair of 1002 and 1006 + + if let Some(protocol) = Protocol::from_enum(&messages[idx]) { + // move index to the next message + idx += 1; + // if we have a valid 1002 ... + if let Some(protocol) = protocol.first() { + // ... check for 1006 + if idx == messages.len() + || !matches!(messages[idx], Message::ProtocolDetails(_)) + { + // if we're already at the end, or if there's no 1006 after the 1002 we saw + // just a step before, we push the current 1002 without any 1006, and continue + result.push(ShowProtocolDetailsMessage { + protocol: protocol.clone(), + detail: None, + }); + continue; + } + // looks like we have a valid 1006, so let's process it + if let Some(detail) = ProtocolDetail::from_enum(&messages[idx]) { + // looks like we got a valid 1006 + idx += 1; + result.push(ShowProtocolDetailsMessage { + protocol: protocol.clone(), + detail: Some(detail), + }); } else { - log::trace!(" need more data"); - return Ok(0); + log::error!("conn: failed to parse 1006 message"); + return Err(Error::ParseError(messages)); } } - let new_code = parse_code(&buffer[pos..(pos + 4)])?; - let separator = buffer[pos + 4]; + } else { + log::error!("conn: failed to parse 1002 message: {:?}", messages[idx]); + return Err(Error::ParseError(messages)); + } + } + + Ok(result) + } else { + // No 1002 entries, so empty result + Ok(Vec::new()) + } +} + +/// Process raw bytes to parse and enqueue messages. On success returns the +/// number of messages enqueued. +/// +/// If we have pending unparsed bytes from previous iterations, we create a +/// new buffer that combines the old one with the new `frame`, and then +/// processes it. +/// +/// However, if we don't have any pending unparsed bytes, then it would be +/// an overhead to do so, so we just process the frame directly. +/// +/// In both cases, pending bytes from this iteration are added to +/// `unparsed_bytes` +#[inline] +fn enqueue_messages( + frame: &[u8], + unparsed_bytes: &mut Vec, + unsent_messages: &mut VecDeque, +) -> Result { + let num_unparsed = unparsed_bytes.len(); + let has_unparsed = num_unparsed > 0; + if has_unparsed { + // if we had previously unparsed bytes, we use them in combination with + // the new frame + let mut new_vec: Vec = Vec::with_capacity(num_unparsed + frame.len()); + new_vec.extend_from_slice(unparsed_bytes); + new_vec.extend_from_slice(frame); + enqueue_messages_from_buffer(&new_vec, unparsed_bytes, unsent_messages) + } else { + // if we didn't have any previously unparsed bytes, we can process this + // frame directly, gaining a tiny bit of efficiency. This helps in dealing + // with most messages that will tend to be quite small. + enqueue_messages_from_buffer(frame, unparsed_bytes, unsent_messages) + } +} + +/// Processes raw data to parse and enqeueue Messages. +/// +/// The logic is straighforward, even if cumbersome to look at. We run a loop, where +/// at each iteration, we process a new line. In each line, we encounter one of the +/// following scenarios (xxxx is a 4 digit code): +/// 1. `xxxx` - this is the last line in this response +/// 2. `xxxx` - this is NOT the last line in this response +/// 3. `` - same as (2) but the xxxx code is implicitly = previous one +/// +/// More details about the protocol can be found [here](https://gitlab.nic.cz/labs/bird/-/blob/master/nest/cli.c) +/// +/// While processing each line, we can return an `Ok(0)` to indicate we need more +/// data ([Connection::fetch_new_messages] takes care of that). +#[inline] +fn enqueue_messages_from_buffer( + buffer: &[u8], + unparsed_bytes: &mut Vec, + unsent_messages: &mut VecDeque, +) -> Result { + let bsize = buffer.len(); + let mut num_messages = 0; + let mut pos: usize = 0; + let mut code: u32 = 0; + let mut msg_start_pos = 0; + let mut message_size: usize = 0; + let mut last_msg_added_epos = 0; + + // process things line by line. each iteration of this loop constitutes + // a new line + loop { + let line_start_pos = pos; + log::trace!("conn: checking if we can start processing a new line"); + // break or ask for more data if we're at the end, but expected to parse + if pos >= bsize { + if num_messages > 0 { log::trace!( - " encountered code {} and separator '{}'", - new_code, - separator as char + " need more data, exiting loop as already enqueued {num_messages} messages" ); - let is_last = match separator { - b' ' => true, - b'-' => false, - _ => { - return Err(Error::InvalidToken(format!( - "unknown separator {} after code {}", - separator, new_code - ))) - } - }; - pos += 5; // we're now at the start of data in this line - - let mut ok_added = false; - if is_last { - // if this is the last line - if new_code == code { - log::trace!( - " determined to be the last line, but has same code as before {}", - code - ); - // treat it as continuation of the previous message - message_size += nl_pos - pos + 1; - let message = parse_message(code, buffer, msg_start_pos, message_size)?; - log::trace!(" pushing last message {:?}", message); - self.unsent_messages.push_back(message); - num_messages += 1; - last_msg_added_epos = nl_pos + 1; - } else { - log::trace!(" determined to be the last line, has new code {}", code); - // treat this as a new message - // we first push the prev message, if present - if message_size > 0 { - let message = parse_message(code, buffer, msg_start_pos, message_size)?; - log::trace!(" pushing prev to last message {:?}", message); - self.unsent_messages.push_back(message); - num_messages += 1; - // last_msg_added_epos = nl_pos + 1; // not needed as we do this at the end anyway - } - // now we process the new message - code = new_code; - msg_start_pos = pos; + break; + } else { + log::trace!(" need more data"); + return Ok(0); // we need more data + } + } + + // if we don't have visibility into the next newline, break or ask + // for more data + let nl_pos: usize; + match buffer[pos..].iter().position(|it| *it == b'\n') { + Some(it) => nl_pos = pos + it, + None => { + if num_messages > 0 { + log::trace!( + " need more data, exiting loop as already enqueued {num_messages} messages" + ); + break; + } else { + log::trace!(" need more data"); + return Ok(0); // we need more data + } + } + }; + let next_line_pos = nl_pos + 1; + + log::trace!( + "conn: processing line: {}", + String::from_utf8_lossy(&buffer[pos..nl_pos]) + ); + + if buffer[pos] == b' ' { + log::trace!(" no code present, we're a continuation of prev line"); + pos += 1; // we're now at start of data in this line + message_size += nl_pos - pos + 1; // +1 for newline + } else { + log::trace!(" line has a code, need to check if same as prev or not"); + // the line does not start with a space, so we MUST see a code + // and a continuation/final marker + if pos + 5 >= bsize { + if num_messages > 0 { + log::trace!( + " need more data, exiting loop as already enqueued {num_messages} messages" + ); + break; + } else { + log::trace!(" need more data"); + return Ok(0); + } + } + let new_code = parse_code(&buffer[pos..(pos + 4)])?; + let separator = buffer[pos + 4]; + log::trace!( + " encountered code {} and separator '{}'", + new_code, + separator as char + ); + let is_last = match separator { + b' ' => true, + b'-' => false, + _ => { + return Err(Error::InvalidToken(format!( + "unknown separator {separator} after code {new_code}" + ))) + } + }; + pos += 5; // we're now at the start of data in this line + + let mut ok_added = false; + if is_last { + // if this is the last line + if new_code == code { + log::trace!( + " determined to be the last line, but has same code as before {code}" + ); + // treat it as continuation of the previous message + message_size += nl_pos - pos + 1; + let message = parse_message(code, buffer, msg_start_pos, message_size)?; + log::trace!(" pushing last message {message:?}"); + unsent_messages.push_back(message); + num_messages += 1; + last_msg_added_epos = nl_pos + 1; + } else { + log::trace!(" determined to be the last line, has new code {code}"); + // treat this as a new message + // we first push the prev message, if present + if message_size > 0 { let message = parse_message(code, buffer, msg_start_pos, message_size)?; - log::trace!(" pushing new message {:?}", message); - if let Message::Ok = message { - ok_added = true; - } - self.unsent_messages.push_back(message); + log::trace!(" pushing prev to last message {message:?}"); + unsent_messages.push_back(message); num_messages += 1; - last_msg_added_epos = nl_pos + 1; + // last_msg_added_epos = nl_pos + 1; // not needed as we do this at the end anyway } - if !ok_added { - self.unsent_messages.push_back(Message::Ok); + // now we process the new message + code = new_code; + msg_start_pos = pos; + let message = parse_message(code, buffer, msg_start_pos, message_size)?; + log::trace!(" pushing new message {message:?}"); + if let Message::Ok = message { + ok_added = true; } - break; + unsent_messages.push_back(message); + num_messages += 1; + last_msg_added_epos = nl_pos + 1; + } + if !ok_added { + unsent_messages.push_back(Message::Ok); + } + break; + } else { + // if this is not the last line + // if this line is a continuation of the previous one + if new_code == code { + log::trace!(" not the last line, continuing from prev line"); + // we just mark this line as extension of previous one + message_size += nl_pos - pos + 1; } else { - // if this is not the last line - // if this line is a continuation of the previous one - if new_code == code { - log::trace!(" not the last line, continuing from prev line"); - // we just mark this line as extension of previous one - message_size += nl_pos - pos + 1; - } else { - log::trace!(" not the last line, but new code"); - // treat this as a new message - // we first push the prev message, if present - if message_size > 0 { - let message = parse_message(code, buffer, msg_start_pos, message_size)?; - log::trace!(" pushing new message {:?}", message); - self.unsent_messages.push_back(message); - num_messages += 1; - last_msg_added_epos = line_start_pos; - } - // now we process the new message - log::trace!( - " resetting markers for a new message with code {}", - new_code - ); - code = new_code; - message_size = nl_pos - pos; - msg_start_pos = pos; + log::trace!(" not the last line, but new code"); + // treat this as a new message + // we first push the prev message, if present + if message_size > 0 { + let message = parse_message(code, buffer, msg_start_pos, message_size)?; + log::trace!(" pushing new message {message:?}"); + unsent_messages.push_back(message); + num_messages += 1; + last_msg_added_epos = line_start_pos; } + // now we process the new message + log::trace!(" resetting markers for a new message with code {new_code}"); + code = new_code; + message_size = nl_pos - pos; + msg_start_pos = pos; } } - pos = next_line_pos; // move to the next line - } - - // push all unprocessed bytes to self.unparsed_bytes - let remaining = buffer.len() - last_msg_added_epos; - log::trace!("conn: found {} pending bytes", remaining); - if remaining > 0 { - self.unparsed_bytes.clear(); - let src = &buffer[(buffer.len() - remaining)..]; - log::trace!("conn: enqueuing pending: {}", &String::from_utf8_lossy(src)); - self.unparsed_bytes.extend_from_slice(src); } + pos = next_line_pos; // move to the next line + } - Ok(num_messages) + // push all unprocessed bytes to self.unparsed_bytes + let remaining = buffer.len() - last_msg_added_epos; + log::trace!("conn: found {remaining} pending bytes"); + if remaining > 0 { + unparsed_bytes.clear(); + let src = &buffer[(buffer.len() - remaining)..]; + log::trace!("conn: enqueuing pending: {}", &String::from_utf8_lossy(src)); + unparsed_bytes.extend_from_slice(src); } + + Ok(num_messages) } /// Parse the 4 digit code at the front of a bird response @@ -720,7 +946,7 @@ mod tests { if let Message::InterfaceList(s) = message { assert_eq!(s, needle); } else { - panic!("incorrect message type {:?}", message); + panic!("incorrect message type {message:?}"); } } @@ -761,7 +987,7 @@ mod tests { assert!(s.ends_with("fe80::4490::72/64 (scope univ)")); assert!(!s.ends_with('\n')); } else { - panic!("incorrect message type {:?}", message); + panic!("incorrect message type {message:?}"); } } @@ -793,7 +1019,7 @@ mod tests { assert!(s.contains("\n\tfe80:169:254:199::2/126 (scope link)")); assert!(!s.ends_with('\n')); } else { - panic!("incorrect message type {:?}", message); + panic!("incorrect message type {message:?}"); } } } diff --git a/src/lib.rs b/src/lib.rs index 72e4bc1..d026d42 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -//! Library for async communication with the Bird BGP server. +//! Library for communication with the Bird BGP server, supporting both sync and async. //! //! ## Examples //! ```no_run diff --git a/src/models/interface.rs b/src/models/interface.rs index a681ec9..b947f4c 100644 --- a/src/models/interface.rs +++ b/src/models/interface.rs @@ -23,7 +23,7 @@ impl Interface { let name = if let Some(s) = it.next() { s } else { - log::error!("ifc: unable to determine name in {}", content); + log::error!("ifc: unable to determine name in {content}"); return None; }; @@ -33,12 +33,12 @@ impl Interface { "up" => true, "down" => false, _ => { - log::error!("ifc: unknown state {}", s); + log::error!("ifc: unknown state {s}"); return None; } } } else { - log::error!("ifc: unable to determine state in {}", content); + log::error!("ifc: unable to determine state in {content}"); return None; }; @@ -54,7 +54,7 @@ impl Interface { } } if index < 0 { - log::error!("ifc: did not find an appropriate index in {}", content); + log::error!("ifc: did not find an appropriate index in {content}"); return None; } @@ -98,7 +98,7 @@ impl InterfaceProperties { _ => InterfaceType::Unknown(s.to_owned()), } } else { - log::error!("ifc: did not find any iftype in {}", content); + log::error!("ifc: did not find any iftype in {content}"); return None; }; for token in content.split_ascii_whitespace() { @@ -106,7 +106,7 @@ impl InterfaceProperties { if let Ok(m) = _mtu.parse::() { mtu = m; } else { - log::error!("ifc: found invalid mtu in line {}", content); + log::error!("ifc: found invalid mtu in line {content}"); return None; } } else { @@ -125,7 +125,7 @@ impl InterfaceProperties { } if mtu == 0 { - log::error!("ifc: did not find any iftype in {}", content); + log::error!("ifc: did not find any iftype in {content}"); } Some(InterfaceProperties { iftype, flags, mtu }) @@ -208,7 +208,7 @@ impl InterfaceAddress { let ip = if let Some(s) = it.next() { s } else { - log::error!("ifc: failed to find ip address in {}", line); + log::error!("ifc: failed to find ip address in {line}"); return None; }; @@ -220,7 +220,7 @@ impl InterfaceAddress { if let Some(sc) = it.next() { scope = sc.trim_matches(bc).trim_matches(','); } else { - log::error!("ifc: encountered scope but not value in {}", line); + log::error!("ifc: encountered scope but not value in {line}"); return None; } } else { @@ -300,6 +300,19 @@ impl InterfaceSummary { } } +/// Valid broadcast address set +const IF_FLAG_BROADCAST: u32 = 1 << 2; +/// Supports multicast +const IF_FLAG_MULTICAST: u32 = 1 << 3; +/// Is a loopback device +const IF_FLAG_LOOPBACK: u32 = 1 << 5; +/// Not to be used by routing protocols (loopbacks etc.) +const IF_FLAG_IGNORED: u32 = 1 << 6; +/// Interface is running +const IF_FLAG_ADMIN_UP: u32 = 1 << 7; +/// L1 layer is up +const IF_FLAG_LINK_UP: u32 = 1 << 8; + #[cfg(test)] mod tests { use super::*; @@ -413,16 +426,3 @@ mod tests { } } } - -/// Valid broadcast address set -const IF_FLAG_BROADCAST: u32 = 1 << 2; -/// Supports multicast -const IF_FLAG_MULTICAST: u32 = 1 << 3; -/// Is a loopback device -const IF_FLAG_LOOPBACK: u32 = 1 << 5; -/// Not to be used by routing protocols (loopbacks etc.) -const IF_FLAG_IGNORED: u32 = 1 << 6; -/// Interface is running -const IF_FLAG_ADMIN_UP: u32 = 1 << 7; -/// L1 layer is up -const IF_FLAG_LINK_UP: u32 = 1 << 8; diff --git a/src/models/status.rs b/src/models/status.rs index 0e80ecb..8f95e92 100644 --- a/src/models/status.rs +++ b/src/models/status.rs @@ -40,21 +40,21 @@ impl ShowStatusMessage { if let Ok(dt) = NaiveDateTime::parse_from_str(x.trim(), tfmt) { server_time = Some(dt); } else { - log::error!("failed to parse timestamp {}", x); + log::error!("failed to parse timestamp {x}"); return None; } } else if let Some(x) = line.strip_prefix("Last reboot on ") { if let Ok(dt) = NaiveDateTime::parse_from_str(x.trim(), tfmt) { last_reboot_on = Some(dt); } else { - log::error!("failed to parse timestamp {}", x); + log::error!("failed to parse timestamp {x}"); return None; } } else if let Some(x) = line.strip_prefix("Last reconfiguration on ") { if let Ok(dt) = NaiveDateTime::parse_from_str(x.trim(), tfmt) { last_reconfigured_on = Some(dt); } else { - log::error!("failed to parse timestamp {}", x); + log::error!("failed to parse timestamp {x}"); return None; } } diff --git a/tests/server.rs b/tests/server.rs index 719c75d..29d37eb 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -90,7 +90,7 @@ impl MockServer { ); if delay_ms > 0 { for ref c in split_content(response) { - log::trace!("sending chunk: {}", c); + log::trace!("sending chunk: {c}"); Self::write_to_client(&stream, c).await; tokio::time::sleep(Duration::from_millis(delay_ms)).await; } @@ -103,7 +103,7 @@ impl MockServer { } Err(err) => { if err.kind() != std::io::ErrorKind::WouldBlock { - panic!("server: encountered IO error: {}", err); + panic!("server: encountered IO error: {err}"); } } } @@ -136,8 +136,7 @@ impl MockServer { let expired = Instant::now().duration_since(start) > duration; assert!( !expired, - "timed out waiting for {} client connections", - num_clients + "timed out waiting for {num_clients} client connections" ); tokio::time::sleep(Duration::from_millis(100)).await; } diff --git a/tests/test_birdc.rs b/tests/test_birdc.rs index 2f67a18..407ff49 100644 --- a/tests/test_birdc.rs +++ b/tests/test_birdc.rs @@ -8,9 +8,43 @@ use birdc::*; mod server; use server::*; +macro_rules! test_sync_async_request { + ($id:ident($mock:expr, $cmd:ident($( $params:expr ),*), $response:ident, $delay:literal) $test:block) => { + #[tokio::test(flavor = "multi_thread")] + async fn $id() { + let _ = env_logger::try_init(); + let server = MockServer::start_server($mock, $delay) + .await + .expect("failed to start server"); + let client = Client::for_unix_socket(&server.unix_socket); + let mut async_conn = client.connect().await.expect("failed to connect client"); + let $response = async_conn.$cmd($($params),*).await.expect("failed to send request"); + $test; + + let mut sync_conn = client.connect_sync().expect("failed to connect sync client"); + let $response = sync_conn.$cmd($($params),*).expect("failed to send sync request"); + $test; + + server.wait_until(1, 3).await; + } + }; + + ($id:ident($mock:expr, $cmd:ident($( $params:expr ),*), $response:ident) $test:block) => { + test_sync_async_request!($id($mock, $cmd($($params),*), $response, 0) $test); + }; + + ($id:ident($mock:expr, $request:literal, $response:ident, $delay:literal) $test:block) => { + test_sync_async_request!($id($mock, send_request($request), $response, $delay) $test); + }; + + ($id:ident($mock:expr, $request:literal, $response:ident) $test:block) => { + test_sync_async_request!($id($mock, $request, $response, 0) $test); + } +} + /// This tests if the client open, and the greeting exchange works correctly /// for a single client. -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_single_client_open() { let _ = env_logger::try_init(); let server = MockServer::start_server("", 0) @@ -18,6 +52,9 @@ async fn test_single_client_open() { .expect("failed to start server"); let client = Client::for_unix_socket(&server.unix_socket); client.connect().await.expect("failed to connect client"); + client + .connect_sync() + .expect("failed to connect sync client"); server.wait_until(1, 3).await; } @@ -37,342 +74,259 @@ async fn test_multiple_client_open() { server.wait_until(1, 3).await; } -/// This tests if we receive the right sequence of response [Message]s from the -/// server, upon a `show interfaces` command -#[tokio::test] -async fn test_raw_protocol() { - let _ = env_logger::try_init(); - let server = MockServer::start_server(&get_test_text(), 0) - .await - .expect("failed to start server"); - let client = Client::for_unix_socket(&server.unix_socket); - let mut connection = client.connect().await.expect("failed to connect client"); - let response = connection - .send_request("show interfaces") - .await - .expect("failed to send request"); - validate_show_interfaces_response(&response); - - server.wait_until(1, 3).await; -} - -/// Tests for raw protocol, just like [test_raw_protocol], but the server -/// sends response in delayed batches, to test buffering. -#[tokio::test] -async fn test_raw_protocol_with_delays() { - let _ = env_logger::try_init(); - let server = MockServer::start_server(&get_test_text(), 100) - .await - .expect("failed to start server"); - let client = Client::for_unix_socket(&server.unix_socket); - let mut connection = client.connect().await.expect("failed to connect client"); - let response = connection - .send_request("show interfaces") - .await - .expect("failed to send request"); - validate_show_interfaces_response(&response); - - server.wait_until(1, 3).await; -} - -#[tokio::test] -async fn test_show_interfaces() { - let _ = env_logger::try_init(); - let server = MockServer::start_server(&get_test_text(), 0) - .await - .expect("failed to start server"); - let client = Client::for_unix_socket(&server.unix_socket); - let mut connection = client.connect().await.expect("failed to connect client"); - let response = connection - .show_interfaces() - .await - .expect("failed to parse response"); - assert_eq!(response.len(), 3); - - server.wait_until(1, 3).await; -} - -#[tokio::test] -async fn test_show_interfaces_summary() { - let _ = env_logger::try_init(); - let server = MockServer::start_server(&get_interfaces_summary(), 0) - .await - .expect("failed to start server"); - let client = Client::for_unix_socket(&server.unix_socket); - let mut connection = client.connect().await.expect("failed to connect client"); - let response = connection - .show_interfaces_summary() - .await - .expect("failed to parse response"); - assert_eq!(response.len(), 3); - - assert_eq!(response[0].name, "lo"); - assert_eq!(response[0].state, "up"); - assert!(matches!(&response[0].ipv4_address, Some(x) if x == "127.0.0.1/8")); - assert!(matches!(&response[0].ipv6_address, Some(x) if x == "::1/128")); - - assert_eq!(response[1].name, "eth0"); - assert_eq!(response[1].state, "up"); - assert!(matches!(&response[1].ipv4_address, Some(x) if x == "172.30.0.12/16")); - assert!(matches!(&response[1].ipv6_address, Some(x) if x == "fe80::4495:80ff:fe71:a791/64")); - - assert_eq!(response[2].name, "eth1"); - assert_eq!(response[2].state, "up"); - assert!(matches!(&response[2].ipv4_address, Some(x) if x == "169.254.199.2/30")); - assert!(matches!(&response[2].ipv6_address, Some(x) if x == "fe80::a06f:7ff:fea7:c662/64")); - - server.wait_until(1, 3).await; -} - -#[tokio::test] -async fn test_show_protocols() { - let _ = env_logger::try_init(); - let server = MockServer::start_server(&get_protocols(), 0) - .await - .expect("failed to start server"); - let client = Client::for_unix_socket(&server.unix_socket); - let mut connection = client.connect().await.expect("failed to connect client"); - let protocol = connection - .show_protocols(None) - .await - .expect("failed to parse response"); - - assert_eq!(protocol.len(), 7); - - assert_eq!(protocol[0].name, "device1"); - assert_eq!(protocol[0].proto, "Device"); - assert!(protocol[0].table.is_none()); - assert_eq!(protocol[0].state, "up"); - assert_eq!(protocol[0].since, "2022-04-14"); - assert!(protocol[0].info.is_none()); - - assert_eq!(protocol[1].name, "direct_eth0"); - assert_eq!(protocol[1].proto, "Direct"); - assert!(protocol[1].table.is_none()); - assert_eq!(protocol[1].state, "up"); - assert_eq!(protocol[1].since, "2022-04-14"); - assert!(protocol[1].info.is_none()); - - assert_eq!(protocol[2].name, "kernel_v4"); - assert_eq!(protocol[2].proto, "Kernel"); - assert_eq!(protocol[2].table.as_ref().unwrap(), "master4"); - assert_eq!(protocol[2].state, "up"); - assert_eq!(protocol[2].since, "2022-04-14"); - assert!(protocol[2].info.is_none()); - - assert_eq!(protocol[3].name, "kernel_v6"); - assert_eq!(protocol[3].proto, "Kernel"); - assert_eq!(protocol[3].table.as_ref().unwrap(), "master6"); - assert_eq!(protocol[3].state, "up"); - assert_eq!(protocol[3].since, "2022-04-14"); - assert!(protocol[3].info.is_none()); - - assert_eq!(protocol[4].name, "bfd1"); - assert_eq!(protocol[4].proto, "BFD"); - assert!(protocol[4].table.is_none()); - assert_eq!(protocol[4].state, "up"); - assert_eq!(protocol[4].since, "2022-04-14"); - assert!(protocol[4].info.is_none()); - - assert_eq!(protocol[5].name, "bgp_local4"); - assert_eq!(protocol[5].proto, "BGP"); - assert!(protocol[5].table.is_none()); - assert_eq!(protocol[5].state, "up"); - assert_eq!(protocol[5].since, "2022-04-16"); - assert_eq!(protocol[5].info.as_ref().unwrap(), "Established"); - - assert_eq!(protocol[6].name, "bgp_local6"); - assert_eq!(protocol[6].proto, "BGP"); - assert!(protocol[6].table.is_none()); - assert_eq!(protocol[6].state, "up"); - assert_eq!(protocol[6].since, "2022-04-16"); - assert_eq!(protocol[6].info.as_ref().unwrap(), "Established"); -} - -#[tokio::test] -async fn test_show_protocols_pattern() { - let _ = env_logger::try_init(); - let server = MockServer::start_server(&get_protocols_only_kernel(), 0) - .await - .expect("failed to start server"); - let client = Client::for_unix_socket(&server.unix_socket); - let mut connection = client.connect().await.expect("failed to connect client"); - let protocol = connection - .show_protocols(Some("kernel*")) - .await - .expect("failed to parse response"); - - assert_eq!(protocol.len(), 2); - - assert_eq!(protocol[0].name, "kernel_v4"); - assert_eq!(protocol[0].proto, "Kernel"); - assert_eq!(protocol[0].table.as_ref().unwrap(), "master4"); - assert_eq!(protocol[0].state, "up"); - assert_eq!(protocol[0].since, "2022-04-14"); - assert!(protocol[0].info.is_none()); - - assert_eq!(protocol[1].name, "kernel_v6"); - assert_eq!(protocol[1].proto, "Kernel"); - assert_eq!(protocol[1].table.as_ref().unwrap(), "master6"); - assert_eq!(protocol[1].state, "up"); - assert_eq!(protocol[1].since, "2022-04-14"); - assert!(protocol[1].info.is_none()); -} +// This tests if we receive the right sequence of response [Message]s from the +// server, upon a `show interfaces` command +test_sync_async_request!( + test_raw_protocol(&get_test_text(), "show interfaces", response) { + validate_show_interfaces_response(&response); + } +); -#[tokio::test] -async fn test_show_protocols_all() { - let _ = env_logger::try_init(); - let server = MockServer::start_server(&get_protocols_all(), 0) - .await - .expect("failed to start server"); - let client = Client::for_unix_socket(&server.unix_socket); - let mut connection = client.connect().await.expect("failed to connect client"); - let protocols = connection - .show_protocols_details(None) - .await - .expect("failed to parse response"); - - assert_eq!(protocols.len(), 7); - - assert_eq!(protocols[0].protocol.name, "device1"); - assert_eq!(protocols[0].protocol.proto, "Device"); - assert_eq!(protocols[0].protocol.state, "up"); - assert_eq!(protocols[0].protocol.since, "2022-04-14"); - assert!(protocols[0].detail.is_none()); - - assert_eq!(protocols[1].protocol.name, "direct_eth0"); - assert_eq!(protocols[1].protocol.proto, "Direct"); - assert_eq!(protocols[1].protocol.state, "up"); - assert_eq!(protocols[1].protocol.since, "2022-04-14"); - let details = protocols[1] - .detail - .as_ref() - .expect("detail should've been present"); - assert!(details.proto_info.is_none()); - assert_eq!(details.channels[0].name, "ipv4"); - assert_eq!(details.channels[0].state, "UP"); - assert_eq!(details.channels[0].table, "master4"); - assert_eq!(details.channels[1].name, "ipv6"); - assert_eq!(details.channels[1].state, "UP"); - assert_eq!(details.channels[1].table, "master6"); - - assert_eq!(protocols[2].protocol.name, "kernel_v4"); - assert_eq!(protocols[2].protocol.proto, "Kernel"); - assert_eq!(protocols[2].protocol.state, "up"); - assert_eq!(protocols[2].protocol.since, "2022-04-14"); - let details = protocols[2] - .detail - .as_ref() - .expect("detail should've been present"); - assert!(details.proto_info.is_none()); - assert_eq!(details.channels[0].name, "ipv4"); - assert_eq!(details.channels[0].state, "UP"); - assert_eq!(details.channels[0].table, "master4"); - - assert_eq!(protocols[3].protocol.name, "kernel_v6"); - assert_eq!(protocols[3].protocol.proto, "Kernel"); - assert_eq!(protocols[3].protocol.state, "up"); - assert_eq!(protocols[3].protocol.since, "2022-04-14"); - let details = protocols[3] - .detail - .as_ref() - .expect("detail should've been present"); - assert!(details.proto_info.is_none()); - assert_eq!(details.channels[0].name, "ipv6"); - assert_eq!(details.channels[0].state, "UP"); - assert_eq!(details.channels[0].table, "master6"); - - assert_eq!(protocols[4].protocol.name, "bfd1"); - assert_eq!(protocols[4].protocol.proto, "BFD"); - assert_eq!(protocols[4].protocol.state, "up"); - assert_eq!(protocols[4].protocol.since, "2022-04-14"); - assert!(protocols[4].detail.is_none()); - - assert_eq!(protocols[5].protocol.name, "bgp_r1_v4"); - assert_eq!(protocols[5].protocol.proto, "BGP"); - assert_eq!(protocols[5].protocol.state, "up"); - assert_eq!(protocols[5].protocol.since, "2022-04-14"); - let details = protocols[5] - .detail - .as_ref() - .expect("detail should've been present"); - assert!(matches!(&details.description, Some(x) if x == "IPv4 BGP with internal router")); - let ProtoSpecificInfo::Bgp(bgp_info) = details - .proto_info - .as_ref() - .expect("proto info should've been present"); - assert_eq!(bgp_info.local_as, 64560); - assert_eq!(bgp_info.neighbor_as, 64561); - let bgp_session = bgp_info - .session - .as_ref() - .expect("expected bgp session to be present"); - assert_eq!(bgp_session.neighbor_id, "172.29.0.1"); - assert_eq!(bgp_session.hold_time, 240); - assert_eq!(bgp_session.keepalive_time, 80); - assert_eq!(details.channels[0].name, "ipv4"); - assert_eq!(details.channels[0].state, "UP"); - let route_stats = details.channels[0] - .route_stats - .as_ref() - .expect("route stats should've been present"); - assert_eq!(route_stats.imported, 1); - assert_eq!(route_stats.exported, 0); - - assert_eq!(protocols[6].protocol.name, "bgp_r1_v6"); - assert_eq!(protocols[6].protocol.proto, "BGP"); - assert_eq!(protocols[6].protocol.state, "up"); - assert_eq!(protocols[6].protocol.since, "2022-04-14"); - let details = protocols[6] - .detail - .as_ref() - .expect("detail should've been present"); - assert!(matches!(&details.description, Some(x) if x == "IPv6 BGP with internal router")); - let ProtoSpecificInfo::Bgp(bgp_info) = details - .proto_info - .as_ref() - .expect("proto info should've been present"); - assert_eq!(bgp_info.local_as, 64560); - assert_eq!(bgp_info.neighbor_as, 64561); - let bgp_session = bgp_info - .session - .as_ref() - .expect("expected bgp session to be present"); - assert_eq!(bgp_session.neighbor_id, "172.29.0.1"); - assert_eq!(bgp_session.hold_time, 240); - assert_eq!(bgp_session.keepalive_time, 80); - assert_eq!(details.channels[0].name, "ipv6"); - assert_eq!(details.channels[0].state, "UP"); - let route_stats = details.channels[0] - .route_stats - .as_ref() - .expect("route stats should've been present"); - assert_eq!(route_stats.imported, 1); - assert_eq!(route_stats.exported, 0); -} +// Tests for raw protocol, just like [test_raw_protocol], but the server +// sends response in delayed batches, to test buffering. +test_sync_async_request!( + test_raw_protocol_with_delays(&get_test_text(), "show interfaces", response, 100) { + validate_show_interfaces_response(&response); + } +); -#[tokio::test] -async fn test_show_status() { - let _ = env_logger::try_init(); - let server = MockServer::start_server(&get_show_status(), 0) - .await - .expect("failed to start server"); - let client = Client::for_unix_socket(&server.unix_socket); - let mut connection = client.connect().await.expect("failed to connect client"); - let status = connection - .show_status() - .await - .expect("failed to parse ShowStatusMessage"); - assert_eq!(status.version_line, "BIRD 2.0.7"); - assert_eq!(status.router_id, "172.29.0.12"); - assert_eq!(status.server_time.to_string(), "2022-05-08 10:14:23.381"); - assert_eq!(status.last_reboot_on.to_string(), "2022-04-14 22:23:28.096"); - assert_eq!( - status.last_reconfigured_on.to_string(), - "2022-04-15 00:00:46.707", - ); - assert_eq!(status.status, "Daemon is up and running"); -} +test_sync_async_request!( + test_show_interfaces(&get_test_text(), show_interfaces(), response, 0) { + assert_eq!(response.len(), 3); + } +); + +test_sync_async_request!( + test_show_interfaces_summary(&get_interfaces_summary(), show_interfaces_summary(), response, 0) { + assert_eq!(response.len(), 3); + + assert_eq!(response[0].name, "lo"); + assert_eq!(response[0].state, "up"); + assert!(matches!(&response[0].ipv4_address, Some(x) if x == "127.0.0.1/8")); + assert!(matches!(&response[0].ipv6_address, Some(x) if x == "::1/128")); + + assert_eq!(response[1].name, "eth0"); + assert_eq!(response[1].state, "up"); + assert!(matches!(&response[1].ipv4_address, Some(x) if x == "172.30.0.12/16")); + assert!(matches!(&response[1].ipv6_address, Some(x) if x == "fe80::4495:80ff:fe71:a791/64")); + + assert_eq!(response[2].name, "eth1"); + assert_eq!(response[2].state, "up"); + assert!(matches!(&response[2].ipv4_address, Some(x) if x == "169.254.199.2/30")); + assert!(matches!(&response[2].ipv6_address, Some(x) if x == "fe80::a06f:7ff:fea7:c662/64")); + } +); + +test_sync_async_request!( + test_show_protocols(&get_protocols(), show_protocols(None), protocol) { + assert_eq!(protocol.len(), 7); + + assert_eq!(protocol[0].name, "device1"); + assert_eq!(protocol[0].proto, "Device"); + assert!(protocol[0].table.is_none()); + assert_eq!(protocol[0].state, "up"); + assert_eq!(protocol[0].since, "2022-04-14"); + assert!(protocol[0].info.is_none()); + + assert_eq!(protocol[1].name, "direct_eth0"); + assert_eq!(protocol[1].proto, "Direct"); + assert!(protocol[1].table.is_none()); + assert_eq!(protocol[1].state, "up"); + assert_eq!(protocol[1].since, "2022-04-14"); + assert!(protocol[1].info.is_none()); + + assert_eq!(protocol[2].name, "kernel_v4"); + assert_eq!(protocol[2].proto, "Kernel"); + assert_eq!(protocol[2].table.as_ref().unwrap(), "master4"); + assert_eq!(protocol[2].state, "up"); + assert_eq!(protocol[2].since, "2022-04-14"); + assert!(protocol[2].info.is_none()); + + assert_eq!(protocol[3].name, "kernel_v6"); + assert_eq!(protocol[3].proto, "Kernel"); + assert_eq!(protocol[3].table.as_ref().unwrap(), "master6"); + assert_eq!(protocol[3].state, "up"); + assert_eq!(protocol[3].since, "2022-04-14"); + assert!(protocol[3].info.is_none()); + + assert_eq!(protocol[4].name, "bfd1"); + assert_eq!(protocol[4].proto, "BFD"); + assert!(protocol[4].table.is_none()); + assert_eq!(protocol[4].state, "up"); + assert_eq!(protocol[4].since, "2022-04-14"); + assert!(protocol[4].info.is_none()); + + assert_eq!(protocol[5].name, "bgp_local4"); + assert_eq!(protocol[5].proto, "BGP"); + assert!(protocol[5].table.is_none()); + assert_eq!(protocol[5].state, "up"); + assert_eq!(protocol[5].since, "2022-04-16"); + assert_eq!(protocol[5].info.as_ref().unwrap(), "Established"); + + assert_eq!(protocol[6].name, "bgp_local6"); + assert_eq!(protocol[6].proto, "BGP"); + assert!(protocol[6].table.is_none()); + assert_eq!(protocol[6].state, "up"); + assert_eq!(protocol[6].since, "2022-04-16"); + assert_eq!(protocol[6].info.as_ref().unwrap(), "Established"); + } +); + +test_sync_async_request!( + test_show_protocols_pattern(&get_protocols_only_kernel(), show_protocols(Some("kernel*")), protocol) { + assert_eq!(protocol.len(), 2); + + assert_eq!(protocol[0].name, "kernel_v4"); + assert_eq!(protocol[0].proto, "Kernel"); + assert_eq!(protocol[0].table.as_ref().unwrap(), "master4"); + assert_eq!(protocol[0].state, "up"); + assert_eq!(protocol[0].since, "2022-04-14"); + assert!(protocol[0].info.is_none()); + + assert_eq!(protocol[1].name, "kernel_v6"); + assert_eq!(protocol[1].proto, "Kernel"); + assert_eq!(protocol[1].table.as_ref().unwrap(), "master6"); + assert_eq!(protocol[1].state, "up"); + assert_eq!(protocol[1].since, "2022-04-14"); + assert!(protocol[1].info.is_none()); + } +); + +test_sync_async_request!( + test_show_protocols_all(&get_protocols_all(), show_protocols_details(None), protocols) { + assert_eq!(protocols.len(), 7); + + assert_eq!(protocols[0].protocol.name, "device1"); + assert_eq!(protocols[0].protocol.proto, "Device"); + assert_eq!(protocols[0].protocol.state, "up"); + assert_eq!(protocols[0].protocol.since, "2022-04-14"); + assert!(protocols[0].detail.is_none()); + + assert_eq!(protocols[1].protocol.name, "direct_eth0"); + assert_eq!(protocols[1].protocol.proto, "Direct"); + assert_eq!(protocols[1].protocol.state, "up"); + assert_eq!(protocols[1].protocol.since, "2022-04-14"); + let details = protocols[1] + .detail + .as_ref() + .expect("detail should've been present"); + assert!(details.proto_info.is_none()); + assert_eq!(details.channels[0].name, "ipv4"); + assert_eq!(details.channels[0].state, "UP"); + assert_eq!(details.channels[0].table, "master4"); + assert_eq!(details.channels[1].name, "ipv6"); + assert_eq!(details.channels[1].state, "UP"); + assert_eq!(details.channels[1].table, "master6"); + + assert_eq!(protocols[2].protocol.name, "kernel_v4"); + assert_eq!(protocols[2].protocol.proto, "Kernel"); + assert_eq!(protocols[2].protocol.state, "up"); + assert_eq!(protocols[2].protocol.since, "2022-04-14"); + let details = protocols[2] + .detail + .as_ref() + .expect("detail should've been present"); + assert!(details.proto_info.is_none()); + assert_eq!(details.channels[0].name, "ipv4"); + assert_eq!(details.channels[0].state, "UP"); + assert_eq!(details.channels[0].table, "master4"); + + assert_eq!(protocols[3].protocol.name, "kernel_v6"); + assert_eq!(protocols[3].protocol.proto, "Kernel"); + assert_eq!(protocols[3].protocol.state, "up"); + assert_eq!(protocols[3].protocol.since, "2022-04-14"); + let details = protocols[3] + .detail + .as_ref() + .expect("detail should've been present"); + assert!(details.proto_info.is_none()); + assert_eq!(details.channels[0].name, "ipv6"); + assert_eq!(details.channels[0].state, "UP"); + assert_eq!(details.channels[0].table, "master6"); + + assert_eq!(protocols[4].protocol.name, "bfd1"); + assert_eq!(protocols[4].protocol.proto, "BFD"); + assert_eq!(protocols[4].protocol.state, "up"); + assert_eq!(protocols[4].protocol.since, "2022-04-14"); + assert!(protocols[4].detail.is_none()); + + assert_eq!(protocols[5].protocol.name, "bgp_r1_v4"); + assert_eq!(protocols[5].protocol.proto, "BGP"); + assert_eq!(protocols[5].protocol.state, "up"); + assert_eq!(protocols[5].protocol.since, "2022-04-14"); + let details = protocols[5] + .detail + .as_ref() + .expect("detail should've been present"); + assert!(matches!(&details.description, Some(x) if x == "IPv4 BGP with internal router")); + let ProtoSpecificInfo::Bgp(bgp_info) = details + .proto_info + .as_ref() + .expect("proto info should've been present"); + assert_eq!(bgp_info.local_as, 64560); + assert_eq!(bgp_info.neighbor_as, 64561); + let bgp_session = bgp_info + .session + .as_ref() + .expect("expected bgp session to be present"); + assert_eq!(bgp_session.neighbor_id, "172.29.0.1"); + assert_eq!(bgp_session.hold_time, 240); + assert_eq!(bgp_session.keepalive_time, 80); + assert_eq!(details.channels[0].name, "ipv4"); + assert_eq!(details.channels[0].state, "UP"); + let route_stats = details.channels[0] + .route_stats + .as_ref() + .expect("route stats should've been present"); + assert_eq!(route_stats.imported, 1); + assert_eq!(route_stats.exported, 0); + + assert_eq!(protocols[6].protocol.name, "bgp_r1_v6"); + assert_eq!(protocols[6].protocol.proto, "BGP"); + assert_eq!(protocols[6].protocol.state, "up"); + assert_eq!(protocols[6].protocol.since, "2022-04-14"); + let details = protocols[6] + .detail + .as_ref() + .expect("detail should've been present"); + assert!(matches!(&details.description, Some(x) if x == "IPv6 BGP with internal router")); + let ProtoSpecificInfo::Bgp(bgp_info) = details + .proto_info + .as_ref() + .expect("proto info should've been present"); + assert_eq!(bgp_info.local_as, 64560); + assert_eq!(bgp_info.neighbor_as, 64561); + let bgp_session = bgp_info + .session + .as_ref() + .expect("expected bgp session to be present"); + assert_eq!(bgp_session.neighbor_id, "172.29.0.1"); + assert_eq!(bgp_session.hold_time, 240); + assert_eq!(bgp_session.keepalive_time, 80); + assert_eq!(details.channels[0].name, "ipv6"); + assert_eq!(details.channels[0].state, "UP"); + let route_stats = details.channels[0] + .route_stats + .as_ref() + .expect("route stats should've been present"); + assert_eq!(route_stats.imported, 1); + assert_eq!(route_stats.exported, 0); + } +); + +test_sync_async_request!( + test_show_status(&get_show_status(), show_status(), status) { + assert_eq!(status.version_line, "BIRD 2.0.7"); + assert_eq!(status.router_id, "172.29.0.12"); + assert_eq!(status.server_time.to_string(), "2022-05-08 10:14:23.381"); + assert_eq!(status.last_reboot_on.to_string(), "2022-04-14 22:23:28.096"); + assert_eq!( + status.last_reconfigured_on.to_string(), + "2022-04-15 00:00:46.707", + ); + assert_eq!(status.status, "Daemon is up and running"); + } +); /// Validates response of `show interfaces` command fn validate_show_interfaces_response(response: &[Message]) {