Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions worker/.clangd
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
CompileFlags:
CompilationDatabase: "out/Release/build"

CompletionOptions:
HeaderInsertion: Never
12 changes: 12 additions & 0 deletions worker/include/RTC/SCTP/TODO_SCTP.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

## Related to mediasoup SCTP implementation

- `DataChunk`, IDataChunk`and`AnyDataChunk`: Add `SetUserData(UserData)`.

- Remove all default values of class memners in .hpp of all classes in case the constructor must give them initial value.

- Lot of stuff missing in `TransmissionControlBock` class and I forgot to add "TODO: SCTP" in them.

- dcsctp uses µs (webrtc::Timestamp::Micros()) internally, while mediasoup uses ms (`DepLibUV::GetTimeMs()`). When porting dcsctp timeout/duration logic, make sure to convert accordingly. Do not mix units in the same field.

- `Association`: When transitioning to CLOSED (due to failure while connecting or closure) we should emit a new event "stcpclosed" in all `DataProducers/Consumers`.

- When receiving SCTP RE-CONFIG, we should emit "streamclosed" in those `DataProducers/DataConsumers` whose stream ID have been closed.
Expand All @@ -19,6 +27,10 @@
- We must also remove `device.sctpCapabilities` getter from mediasoup-client because anyway we are making up those values!
- Also must update the website documentation.

- Replicate `retransmission_queue_test.cc` of dcsctp.

- When we invoke `close()` on a `DataProducer/Consumer` in server, we must end calling `sctpAssociation->ResetStream([streamId])` so it sends `ReConfig` to peer.

- In `transport.dump()` (maybe also in `getStats()`) we must properly obtain `OS` and `MIS` according to the number of SCTP streams negotiated via INIT + INIT_ACK. And if SCTP is not yet established, then... not sure.
- In `Association::FillBuffer()` we should not pass `this->sctpOptions.negotiatedMaxOutboundStreams/negotiatedMaxInboundStreams` but the current values.

Expand Down
49 changes: 30 additions & 19 deletions worker/include/RTC/SCTP/association/StreamResetHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "RTC/SCTP/packet/parameters/OutgoingSsnResetRequestParameter.hpp"
#include "RTC/SCTP/packet/parameters/ReconfigurationResponseParameter.hpp"
#include "RTC/SCTP/public/AssociationListener.hpp"
#include "RTC/SCTP/tx/RetransmissionQueue.hpp"
#include "handles/BackoffTimerHandle.hpp"
#include <span>
#include <vector>
Expand Down Expand Up @@ -49,7 +50,7 @@ namespace RTC
* not-yet-sent messages will be discarded, but that may change in the future.
* RFC8831 allows both behaviors.
*/
class StreamResetHandler : public TCBContext, public BackoffTimerHandle::Listener
class StreamResetHandler : public BackoffTimerHandle::Listener
{
private:
enum class ReqSeqNbrValidationResult : uint8_t
Expand Down Expand Up @@ -164,12 +165,12 @@ namespace RTC

public:
StreamResetHandler(
AssociationListener& associationListener, TCBContext* tcbContext
AssociationListener& associationListener,
TCBContext* tcbContext,
// TODO: SCTP: Implement
// DataTracker* dataTracker,
// ReassemblyQueue* reassemblyQueue,
// RetransmissionQueue* retransmissionQueue
);
RetransmissionQueue* retransmissionQueue);

~StreamResetHandler() override;

Expand All @@ -183,6 +184,23 @@ namespace RTC
*/
void ResetStreams(std::span<const uint16_t> outgoingStreamIds);

/**
* Whether a Reset Streams request should be send. Will return `false` if
* there is no need to create a request (no streams to reset) or if there
* already is an ongoing stream reset request that hasn't completed yet.
*/
bool ShouldCreateStreamResetRequest() const;

/**
* Creates a Reset Streams request that must be sent if returned. Will
* start the reconfig timer.
*
* @remarks
* - The caller must check `ShouldCreateStreamResetRequest()` first and
* only invoke this method if the former returns `true`.
*/
void CreateStreamResetRequest(Packet* packet);

/**
* Called when handling and incoming RE-CONFIG chunk. Processes a stream
* reconfiguration chunk and may send a RE-CONFIG back to the peer with
Expand All @@ -197,18 +215,10 @@ namespace RTC
bool ValidateReceivedReConfigChunk(const ReConfigChunk* receivedReConfigChunk);

/**
* Creates a Reset Streams request that must be sent if returned. Will
* start the reconfig timer. Will return `nullptr` if there is no need
* to create a request (no streams to reset) or if there already is an
* ongoing stream reset request that hasn't completed yet.
*/
ReConfigChunk* CreateStreamResetRequest();

/**
* Creates the actual RE-CONFIG chunk. A request (which set
* `currentRequest`) must have been created prior.
* Adds the actual RE-CONFIG chunk to the given Packet. A request (which
* set `this->currentRequest`) must have been created prior.
*/
ReConfigChunk* CreateReconfigChunk();
void CreateReConfigChunk(Packet* packet);

/**
* Called to validate the `reqSeqNbr`, that it's the next in sequence.
Expand Down Expand Up @@ -250,11 +260,12 @@ namespace RTC

private:
AssociationListener& associationListener;
TCBContext* tcbContext{ nullptr };
TCBContext* tcbContext;
// TODO: SCTP: Implement
// DataTracker* dataTracker;,
// TODO: SCTP: Implement
// DataTracker* dataTracker{ nullptr };,
// ReassemblyQueue* reassemblyQueue{ nullptr };,
// RetransmissionQueue* retransmissionQueue{ nullptr };
// ReassemblyQueue* reassemblyQueue;,
RetransmissionQueue* retransmissionQueue;
UnwrappedReConfigRequestSn::Unwrapper incomingReConfigRequestSnUnwrapper;
const std::unique_ptr<BackoffTimerHandle> reConfigTimer;
// The next sequence number for outgoing stream requests.
Expand Down
5 changes: 3 additions & 2 deletions worker/include/RTC/SCTP/association/TCBContext.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ namespace RTC
virtual uint32_t GetRemoteInitialTsn() const = 0;

/**
* To be called when a RTT has been measured, to update the RTO value.
* To be called when a RTT (ms) has been measured, to update the RTO
* value.
*/
virtual void ObserveRtt(uint64_t rtt) = 0;
virtual void ObserveRttMs(uint64_t rttMs) = 0;

/**
* Returns the Retransmission Timeout (RTO) value.
Expand Down
76 changes: 73 additions & 3 deletions worker/include/RTC/SCTP/association/TransmissionControlBlock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
#include "RTC/SCTP/association/HeartbeatHandler.hpp"
#include "RTC/SCTP/association/NegotiatedCapabilities.hpp"
#include "RTC/SCTP/association/PacketSender.hpp"
#include "RTC/SCTP/association/StreamResetHandler.hpp"
#include "RTC/SCTP/association/TCBContext.hpp"
#include "RTC/SCTP/packet/Packet.hpp"
#include "RTC/SCTP/public/AssociationListener.hpp"
#include "RTC/SCTP/public/SctpOptions.hpp"
#include "RTC/SCTP/tx/RetransmissionErrorCounter.hpp"
#include "RTC/SCTP/tx/RetransmissionQueue.hpp"
#include "RTC/SCTP/tx/RetransmissionTimeout.hpp"
#include "handles/BackoffTimerHandle.hpp"
#include <string_view>
Expand All @@ -25,12 +27,16 @@ namespace RTC
*
* @see https://datatracker.ietf.org/doc/html/rfc9260#section-14
*/
class TransmissionControlBlock : public TCBContext, public BackoffTimerHandle::Listener
class TransmissionControlBlock : public TCBContext,
public RetransmissionQueue::Listener,
public BackoffTimerHandle::Listener
{
public:
TransmissionControlBlock(
AssociationListener& associationListener,
const SctpOptions& sctpOptions,
// TODO: SCTP: Implement it.
// SendQueue& sendQueue,
PacketSender& packetSender,
uint32_t localVerificationTag,
uint32_t remoteVerificationTag,
Expand Down Expand Up @@ -128,7 +134,12 @@ namespace RTC
* @remarks
* - Implements TCBContext interface.
*/
void ObserveRtt(uint64_t rtt) override;
void ObserveRttMs(uint64_t rttMs) override;

size_t GetCwnd() const
{
return this->retransmissionQueue.GetCwnd();
}

/**
* @remarks
Expand Down Expand Up @@ -158,22 +169,68 @@ namespace RTC
*/
void Send(Packet* packet) override;

// TODO: SCTP: Implement it.
// DataTracker& GetDataTracker()
// {
// return this->dataTracker;
// }

// TODO: SCTP: Implement it.
// ReassemblyQueue& GetReassemblyQueue()
// {
// return this->reassemblyQueue;
// }

RetransmissionQueue& GetRetransmissionQueue()
{
return this->retransmissionQueue;
}

StreamResetHandler& GetStreamResetHandler()
{
return this->streamResetHandler;
}

HeartbeatHandler& GetHeartbeatHandler()
{
return this->heartbeatHandler;
}

/**
* Will be set while the Association is in COOKIE_ECHOED state. In this
* state, there can only be a single Packet outstanding, and it must
* contain the COOKIE_ECHO Chunk as the first Chunk in that Packet, until
* the COOKIE_ACK has been received, which will make the socket call
* `ClearRemoteStateCookie()`.
*/
void SetRemoteStateCookie(std::vector<uint8_t> remoteStateCookie);

/**
* Called when the COOKIE_ACK Chunk has been received, to allow further
* Packets to be sent.
*/
void ClearRemoteStateCookie();

bool HasRemoteStateCookie() const
{
return this->remoteStateCookie.has_value();
}

/**
* Sends a SACK Chunk, if there is a need to.
*/
void MaySendSackChunk();

/**
* Sends a FORWARD-TSN or I-FORWARD-TSN Chunk if it is needed and allowed
* (rate-limited).
*/
void MaybeSendForwardTsnChunk(Packet* packet, uint64_t nowMs);

void MaySendFastRetransmit();

// TODO: SCTP: Mamy more methods.

/**
* @remarks
* - Implements TCBContext interface.
Expand Down Expand Up @@ -206,6 +263,12 @@ namespace RTC

void OnDelayedAckTimer(uint64_t& baseTimeoutMs, bool& stop);

/* Pure virtual methods inherited from RetransmissionQueue::Listener. */
public:
void OnRetransmissionQueueNewRttMs(uint64_t newRttMs) override;
void OnRetransmissionQueueClearRetransmissionCounter() override;
;

/* Pure virtual methods inherited from BackoffTimerHandle::Listener. */
public:
void OnTimer(BackoffTimerHandle* backoffTimer, uint64_t& baseTimeoutMs, bool& stop) override;
Expand All @@ -230,11 +293,18 @@ namespace RTC
const std::unique_ptr<BackoffTimerHandle> delayedAckTimer;
RetransmissionTimeout rto;
RetransmissionErrorCounter txErrorCounter;
// TODO: SCTP: Implement.
// DataTracker dataTracker;
// TODO: SCTP: Implement.
// ReassemblyQueue reassemblyQueue;
// TODO: SCTP: Implement.
RetransmissionQueue retransmissionQueue;
StreamResetHandler streamResetHandler;
HeartbeatHandler heartbeatHandler;
// Rate limiting of FORWARD_TSN. Next can be sent at or after this
// timestamp.
// TODO: SCTP: Uncomment.
// uint64_t limitForwardTsnUntilMs{ 0 };
uint64_t limitForwardTsnUntilMs{ 0 };
// Only valid when state is State::COOKIE_ECHOED. In this state, the
// Association must wait for COOKIE_ACK to continue sending any packets (not
// including a COOKIE_ECHO). So if this state cookie is present, the
Expand Down
36 changes: 36 additions & 0 deletions worker/include/RTC/SCTP/packet/UserData.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define MS_RTC_SCTP_USER_DATA_HPP

#include "common.hpp"
#include <ostream>
#include <vector>

namespace RTC
Expand Down Expand Up @@ -39,6 +40,15 @@ namespace RTC
// Disable copy assignment.
UserData& operator=(const UserData&) = delete;

bool operator==(const UserData& other) const
{
return (
this->streamId == other.streamId && this->ssn == other.ssn && this->mid == other.mid &&
this->fsn == other.fsn && this->ppid == other.ppid && this->payload == other.payload &&
this->isBeginning == other.isBeginning && this->isEnd == other.isEnd &&
this->isUnordered == other.isUnordered);
}

~UserData();

public:
Expand Down Expand Up @@ -91,6 +101,20 @@ namespace RTC
return this->payload.size();
}

UserData Clone() const
{
return UserData(
this->streamId,
this->ssn,
this->mid,
this->fsn,
this->ppid,
this->payload,
this->isBeginning,
this->isEnd,
this->isUnordered);
}

/**
* Useful to extract the payload and its ownership when destructing the
* Message.
Expand Down Expand Up @@ -135,6 +159,18 @@ namespace RTC
bool isEnd{ false };
bool isUnordered{ false };
};

/**
* For Catch2 to print it nicely.
*/
inline std::ostream& operator<<(std::ostream& os, const UserData& d)
{
return os << "{streamId:" << d.GetStreamId() << ", ssn:" << d.GetStreamSequenceNumber()
<< ", mid:" << d.GetMessageId() << ", fsn:" << d.GetFragmentSequenceNumber()
<< ", ppid:" << d.GetPayloadProtocolId() << ", payloadLen:" << d.GetPayloadLength()
<< ", B:" << d.IsBeginning() << ", E:" << d.IsEnd() << ", U:" << d.IsUnordered()
<< "}";
}
} // namespace SCTP
} // namespace RTC

Expand Down
2 changes: 1 addition & 1 deletion worker/include/RTC/SCTP/packet/chunks/ForwardTsnChunk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ namespace RTC

std::vector<AnyForwardTsnChunk::SkippedStream> GetSkippedStreams() const final;

void AddStream(uint16_t stream, uint16_t streamSequence);
void AddStream(uint16_t streamId, uint16_t streamSequence);

protected:
ForwardTsnChunk* SoftClone(const uint8_t* buffer) const final;
Expand Down
2 changes: 1 addition & 1 deletion worker/include/RTC/SCTP/packet/chunks/IForwardTsnChunk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ namespace RTC

std::vector<AnyForwardTsnChunk::SkippedStream> GetSkippedStreams() const final;

void AddStream(uint16_t stream, bool uFlag, uint32_t messageIdentifier);
void AddStream(uint16_t streamId, bool uFlag, uint32_t messageIdentifier);

protected:
IForwardTsnChunk* SoftClone(const uint8_t* buffer) const final;
Expand Down
3 changes: 2 additions & 1 deletion worker/include/RTC/SCTP/public/SctpOptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define MS_RTC_SCTP_OPTIONS_HPP

#include "common.hpp"
#include "Utils.hpp"
#include "RTC/Consts.hpp"
#include "RTC/SCTP/packet/parameters/ZeroChecksumAcceptableParameter.hpp"

Expand Down Expand Up @@ -44,7 +45,7 @@ namespace RTC
* Maximum size of an SCTP Packet. It doesn't include any overhead of
* DTLS, TURN, UDP or IP headers.
*/
size_t mtu{ RTC::Consts::MaxSafeMtuSizeForSctp };
size_t mtu{ Utils::Byte::PadDownTo4Bytes(RTC::Consts::MaxSafeMtuSizeForSctp) };

/**
* The largest allowed message payload to be sent. Messages will be rejected
Expand Down
Loading
Loading