Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 91 additions & 85 deletions src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ impl WebSocketContext {

/// Try to decode one message frame. May return None.
fn read_message_frame(&mut self, stream: &mut impl Read) -> Result<Option<Message>> {
if let Some(frame) = self
let frame = match self
Copy link
Contributor Author

@akonradi-signal akonradi-signal Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if this could be written as let Some(frame) = ... else { ... }; but that's not available until Rust 1.65 and the MSRV here is 1.63.

.frame
.read_frame(
stream,
Expand All @@ -613,108 +613,114 @@ impl WebSocketContext {
)
.check_connection_reset(self.state)?
{
if !self.state.can_read() {
return Err(Error::Protocol(ProtocolError::ReceivedAfterClosing));
}
// MUST be 0 unless an extension is negotiated that defines meanings
// for non-zero values. If a nonzero value is received and none of
// the negotiated extensions defines the meaning of such a nonzero
// value, the receiving endpoint MUST _Fail the WebSocket
// Connection_.
{
let hdr = frame.header();
if hdr.rsv1 || hdr.rsv2 || hdr.rsv3 {
return Err(Error::Protocol(ProtocolError::NonZeroReservedBits));
}
None => {
// Connection closed by peer
return match replace(&mut self.state, WebSocketState::Terminated) {
WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => {
Err(Error::ConnectionClosed)
}
_ => Err(Error::Protocol(ProtocolError::ResetWithoutClosingHandshake)),
};
}
Some(frame) => frame,
};

if self.role == Role::Client && frame.is_masked() {
// A client MUST close a connection if it detects a masked frame. (RFC 6455)
return Err(Error::Protocol(ProtocolError::MaskedFrameFromServer));
if !self.state.can_read() {
return Err(Error::Protocol(ProtocolError::ReceivedAfterClosing));
}
// MUST be 0 unless an extension is negotiated that defines meanings
// for non-zero values. If a nonzero value is received and none of
// the negotiated extensions defines the meaning of such a nonzero
// value, the receiving endpoint MUST _Fail the WebSocket
// Connection_.
{
let hdr = frame.header();
if hdr.rsv1 || hdr.rsv2 || hdr.rsv3 {
return Err(Error::Protocol(ProtocolError::NonZeroReservedBits));
}
}

match frame.header().opcode {
OpCode::Control(ctl) => {
match ctl {
// All control frames MUST have a payload length of 125 bytes or less
// and MUST NOT be fragmented. (RFC 6455)
_ if !frame.header().is_final => {
Err(Error::Protocol(ProtocolError::FragmentedControlFrame))
}
_ if frame.payload().len() > 125 => {
Err(Error::Protocol(ProtocolError::ControlFrameTooBig))
}
OpCtl::Close => Ok(self.do_close(frame.into_close()?).map(Message::Close)),
OpCtl::Reserved(i) => {
Err(Error::Protocol(ProtocolError::UnknownControlFrameType(i)))
}
OpCtl::Ping => {
let data = frame.into_payload();
// No ping processing after we sent a close frame.
if self.state.is_active() {
self.set_additional(Frame::pong(data.clone()));
}
Ok(Some(Message::Ping(data)))
if self.role == Role::Client && frame.is_masked() {
// A client MUST close a connection if it detects a masked frame. (RFC 6455)
return Err(Error::Protocol(ProtocolError::MaskedFrameFromServer));
}

match frame.header().opcode {
OpCode::Control(ctl) => {
match ctl {
// All control frames MUST have a payload length of 125 bytes or less
// and MUST NOT be fragmented. (RFC 6455)
_ if !frame.header().is_final => {
Err(Error::Protocol(ProtocolError::FragmentedControlFrame))
}
_ if frame.payload().len() > 125 => {
Err(Error::Protocol(ProtocolError::ControlFrameTooBig))
}
OpCtl::Close => Ok(self.do_close(frame.into_close()?).map(Message::Close)),
OpCtl::Reserved(i) => {
Err(Error::Protocol(ProtocolError::UnknownControlFrameType(i)))
}
OpCtl::Ping => {
let data = frame.into_payload();
// No ping processing after we sent a close frame.
if self.state.is_active() {
self.set_additional(Frame::pong(data.clone()));
}
OpCtl::Pong => Ok(Some(Message::Pong(frame.into_payload()))),
Ok(Some(Message::Ping(data)))
}
OpCtl::Pong => Ok(Some(Message::Pong(frame.into_payload()))),
}
}

OpCode::Data(data) => {
let fin = frame.header().is_final;
match data {
OpData::Continue => {
if let Some(ref mut msg) = self.incomplete {
msg.extend(frame.into_payload(), self.config.max_message_size)?;
} else {
return Err(Error::Protocol(
ProtocolError::UnexpectedContinueFrame,
));
}
if fin {
Ok(Some(self.incomplete.take().unwrap().complete()?))
} else {
Ok(None)
}
}
c if self.incomplete.is_some() => {
Err(Error::Protocol(ProtocolError::ExpectedFragment(c)))
}
OpData::Text if fin => {
check_max_size(frame.payload().len(), self.config.max_message_size)?;
Ok(Some(Message::Text(frame.into_text()?)))
OpCode::Data(data) => {
enum FrameType {
Continue,
Initial(IncompleteMessageType),
}

let fin = frame.header().is_final;
let frame_type = match data {
OpData::Continue => Ok(FrameType::Continue),
_ if self.incomplete.is_some() => Err(ProtocolError::ExpectedFragment(data)),
OpData::Text => Ok(FrameType::Initial(IncompleteMessageType::Text)),
OpData::Binary => Ok(FrameType::Initial(IncompleteMessageType::Binary)),
OpData::Reserved(i) => Err(ProtocolError::UnknownDataFrameType(i)),
}?;

match frame_type {
FrameType::Continue => {
let msg = self
.incomplete
.as_mut()
.ok_or(ProtocolError::UnexpectedContinueFrame)?;
msg.extend(frame.into_payload(), self.config.max_message_size)?;

if fin {
Ok(Some(self.incomplete.take().unwrap().complete()?))
} else {
Ok(None)
}
OpData::Binary if fin => {
}
FrameType::Initial(data_type) => {
if fin {
check_max_size(frame.payload().len(), self.config.max_message_size)?;
Ok(Some(Message::Binary(frame.into_payload())))
}
OpData::Text | OpData::Binary => {
let message_type = match data {
OpData::Text => IncompleteMessageType::Text,
OpData::Binary => IncompleteMessageType::Binary,
_ => panic!("Bug: message is not text nor binary"),
};
let mut incomplete = IncompleteMessage::new(message_type);
Ok(Some(match data_type {
IncompleteMessageType::Text => Message::Text(frame.into_text()?),
IncompleteMessageType::Binary => {
Message::Binary(frame.into_payload())
}
}))
} else {
let mut incomplete = IncompleteMessage::new(data_type);
incomplete
.extend(frame.into_payload(), self.config.max_message_size)?;
self.incomplete = Some(incomplete);
Ok(None)
}
OpData::Reserved(i) => {
Err(Error::Protocol(ProtocolError::UnknownDataFrameType(i)))
}
}
}
} // match opcode
} else {
// Connection closed by peer
match replace(&mut self.state, WebSocketState::Terminated) {
WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => {
Err(Error::ConnectionClosed)
}
_ => Err(Error::Protocol(ProtocolError::ResetWithoutClosingHandshake)),
}
}
} // match opcode
}

/// Received a close frame. Tells if we need to return a close frame to the user.
Expand Down