Skip to content

Commit dfa6291

Browse files
committed
jitterbuffer: Add packet skipping in interceptor
1 parent f2f92bb commit dfa6291

File tree

5 files changed

+102
-22
lines changed

5 files changed

+102
-22
lines changed

internal/test/mock_stream.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc
103103
if !ok {
104104
return 0, nil, io.EOF
105105
}
106-
107106
marshaled, err := p.Marshal()
108107
if err != nil {
109108
return 0, nil, io.EOF

pkg/jitterbuffer/jitter_buffer.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,12 @@ func (jb *JitterBuffer) PlayoutHead() uint16 {
130130
return jb.playoutHead
131131
}
132132

133+
func (jb *JitterBuffer) Length() uint16 {
134+
jb.mutex.Lock()
135+
defer jb.mutex.Unlock()
136+
return jb.packets.Length()
137+
}
138+
133139
// SetPlayoutHead allows you to manually specify the packet you wish to pop next
134140
// If you have encountered a packet that hasn't resolved you can skip it.
135141
func (jb *JitterBuffer) SetPlayoutHead(playoutHead uint16) {
@@ -157,7 +163,7 @@ func (jb *JitterBuffer) Push(packet *rtp.Packet) {
157163
if jb.packets.Length() == 0 {
158164
jb.emit(StartBuffering)
159165
}
160-
if jb.packets.Length() > 100 {
166+
if jb.packets.Length() > 2*jb.minStartCount {
161167
jb.stats.overflowCount++
162168
jb.emit(BufferOverflow)
163169
}

pkg/jitterbuffer/option.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,10 @@ func Log(log logging.LeveledLogger) ReceiverInterceptorOption {
1818
return nil
1919
}
2020
}
21+
22+
func WithSkipMissingPackets() ReceiverInterceptorOption {
23+
return func(d *ReceiverInterceptor) error {
24+
d.skipMissingPackets = true
25+
return nil
26+
}
27+
}

pkg/jitterbuffer/receiver_interceptor.go

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package jitterbuffer
55

66
import (
7+
"errors"
78
"sync"
89

910
"github.com/pion/interceptor"
@@ -16,11 +17,14 @@ type InterceptorFactory struct {
1617
opts []ReceiverInterceptorOption
1718
}
1819

19-
// NewInterceptor constructs a new ReceiverInterceptor.
20-
func (g *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
20+
// NewInterceptor constructs a new ReceiverInterceptor
21+
func (g *InterceptorFactory) NewInterceptor(logName string) (interceptor.Interceptor, error) {
22+
if logName == "" {
23+
logName = "jitterbuffer"
24+
}
2125
i := &ReceiverInterceptor{
2226
close: make(chan struct{}),
23-
log: logging.NewDefaultLoggerFactory().NewLogger("jitterbuffer"),
27+
log: logging.NewDefaultLoggerFactory().NewLogger(logName),
2428
buffer: New(),
2529
}
2630

@@ -52,11 +56,11 @@ func (g *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor,
5256
// arriving) quickly enough.
5357
type ReceiverInterceptor struct {
5458
interceptor.NoOp
55-
buffer *JitterBuffer
56-
m sync.Mutex
57-
wg sync.WaitGroup
58-
close chan struct{}
59-
log logging.LeveledLogger
59+
buffer *JitterBuffer
60+
wg sync.WaitGroup
61+
close chan struct{}
62+
log logging.LeveledLogger
63+
skipMissingPackets bool
6064
}
6165

6266
// NewInterceptor returns a new InterceptorFactory.
@@ -76,16 +80,31 @@ func (i *ReceiverInterceptor) BindRemoteStream(
7680
return n, attr, err
7781
}
7882
packet := &rtp.Packet{}
79-
if err := packet.Unmarshal(buf); err != nil {
83+
if err := packet.Unmarshal(buf[:n]); err != nil {
8084
return 0, nil, err
8185
}
82-
i.m.Lock()
83-
defer i.m.Unlock()
8486
i.buffer.Push(packet)
8587
if i.buffer.state == Emitting {
86-
newPkt, err := i.buffer.Pop()
87-
if err != nil {
88-
return 0, nil, err
88+
for {
89+
newPkt, err := i.buffer.Pop()
90+
if err != nil {
91+
if errors.Is(err, ErrNotFound) {
92+
if i.skipMissingPackets {
93+
i.log.Warn("Skipping missing packet")
94+
i.buffer.SetPlayoutHead(i.buffer.PlayoutHead() + 1)
95+
continue
96+
}
97+
}
98+
return 0, nil, err
99+
}
100+
if newPkt != nil {
101+
nlen, err := newPkt.MarshalTo(b)
102+
return nlen, attr, err
103+
104+
}
105+
if i.buffer.Length() == 0 {
106+
break
107+
}
89108
}
90109
nlen, err := newPkt.MarshalTo(b)
91110

@@ -99,16 +118,12 @@ func (i *ReceiverInterceptor) BindRemoteStream(
99118
// UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track.
100119
func (i *ReceiverInterceptor) UnbindRemoteStream(_ *interceptor.StreamInfo) {
101120
defer i.wg.Wait()
102-
i.m.Lock()
103-
defer i.m.Unlock()
104121
i.buffer.Clear(true)
105122
}
106123

107124
// Close closes the interceptor.
108125
func (i *ReceiverInterceptor) Close() error {
109126
defer i.wg.Wait()
110-
i.m.Lock()
111-
defer i.m.Unlock()
112127
i.buffer.Clear(true)
113128

114129
return nil

pkg/jitterbuffer/receiver_interceptor_test.go

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,19 +80,72 @@ func TestReceiverBuffersAndPlaysout(t *testing.T) {
8080
SenderSSRC: 123,
8181
MediaSSRC: 456,
8282
}})
83-
for s := 0; s < 61; s++ {
83+
for s := 0; s < 910; s++ {
8484
stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{
8585
SequenceNumber: uint16(s), //nolint:gosec // G115
8686
}})
8787
}
8888
// Give time for packets to be handled and stream written to.
8989
time.Sleep(50 * time.Millisecond)
90-
for s := 0; s < 10; s++ {
90+
for s := 0; s < 50; s++ {
9191
read := <-stream.ReadRTP()
92+
if read.Err != nil {
93+
t.Fatal(read.Err)
94+
}
9295
seq := read.Packet.Header.SequenceNumber
9396
assert.EqualValues(t, uint16(s), seq) //nolint:gosec // G115
9497
}
9598
assert.NoError(t, stream.Close())
9699
err = testInterceptor.Close()
97100
assert.NoError(t, err)
98101
}
102+
103+
func TestReceiverBuffersAndPlaysoutSkippingMissingPackets(t *testing.T) {
104+
buf := bytes.Buffer{}
105+
106+
factory, err := NewInterceptor(
107+
Log(logging.NewDefaultLoggerFactory().NewLogger("test")),
108+
WithSkipMissingPackets(),
109+
)
110+
assert.NoError(t, err)
111+
112+
i, err := factory.NewInterceptor("jitterbuffer")
113+
assert.NoError(t, err)
114+
115+
assert.EqualValues(t, 0, buf.Len())
116+
117+
stream := test.NewMockStream(&interceptor.StreamInfo{
118+
SSRC: 123456,
119+
ClockRate: 90000,
120+
}, i)
121+
122+
for s := 0; s < 420; s++ {
123+
if s == 6 {
124+
s++
125+
}
126+
if s == 40 {
127+
s = s + 20
128+
}
129+
stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{
130+
SequenceNumber: uint16(s),
131+
}})
132+
}
133+
134+
for s := 0; s < 100; s++ {
135+
read := <-stream.ReadRTP()
136+
if read.Err != nil {
137+
continue
138+
}
139+
seq := read.Packet.Header.SequenceNumber
140+
if s == 6 {
141+
s++
142+
}
143+
if s == 40 {
144+
s = s + 20
145+
}
146+
assert.EqualValues(t, uint16(s), seq)
147+
}
148+
assert.NoError(t, stream.Close())
149+
err = i.Close()
150+
assert.NoError(t, err)
151+
}

0 commit comments

Comments
 (0)