Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion internal/test/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc
if !ok {
return 0, nil, io.EOF
}

marshaled, err := p.Marshal()
if err != nil {
return 0, nil, io.EOF
Expand Down
25 changes: 21 additions & 4 deletions pkg/jitterbuffer/jitter_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type (
// order, and allows removing in either sequence number order or via a
// provided timestamp.
type JitterBuffer struct {
packets *PriorityQueue
packets *RBTree
minStartCount uint16
overflowLen uint16
lastSequence uint16
Expand Down Expand Up @@ -98,7 +98,7 @@ func New(opts ...Option) *JitterBuffer {
stats: Stats{0, 0, 0},
minStartCount: 50,
overflowLen: 100,
packets: NewQueue(),
packets: NewTree(),
listeners: make(map[Event][]EventListener),
}

Expand Down Expand Up @@ -132,7 +132,15 @@ func (jb *JitterBuffer) PlayoutHead() uint16 {
return jb.playoutHead
}

// SetPlayoutHead allows you to manually specify the packet you wish to pop next
// Length returns the current number of packets in the buffer.
func (jb *JitterBuffer) Length() uint16 {
jb.mutex.Lock()
defer jb.mutex.Unlock()

return jb.packets.Length()
}

// SetPlayoutHead allows you to manually specify the packet you wish to pop next.
// If you have encountered a packet that hasn't resolved you can skip it.
func (jb *JitterBuffer) SetPlayoutHead(playoutHead uint16) {
jb.mutex.Lock()
Expand Down Expand Up @@ -171,7 +179,7 @@ func (jb *JitterBuffer) Push(packet *rtp.Packet) {
}

jb.updateStats(packet.SequenceNumber)
jb.packets.Push(packet, packet.SequenceNumber)
jb.packets.Push(packet)
jb.updateState()
}

Expand Down Expand Up @@ -255,6 +263,7 @@ func (jb *JitterBuffer) PopAtSequence(sq uint16) (*rtp.Packet, error) {
func (jb *JitterBuffer) PeekAtSequence(sq uint16) (*rtp.Packet, error) {
jb.mutex.Lock()
defer jb.mutex.Unlock()

packet, err := jb.packets.Find(sq)
if err != nil {
return nil, err
Expand Down Expand Up @@ -296,3 +305,11 @@ func (jb *JitterBuffer) Clear(resetState bool) {
jb.minStartCount = 50
}
}

// State returns the current state of the jitter buffer.
func (jb *JitterBuffer) State() State {
jb.mutex.Lock()
defer jb.mutex.Unlock()

return jb.state
}
Loading
Loading