This repository was archived by the owner on Oct 10, 2023. It is now read-only.
forked from AFathi/live-webrtcsignaling
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpipeline.node.jitter.publisher.go
91 lines (82 loc) · 2.22 KB
/
pipeline.node.jitter.publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package main
import (
"context"
"net"
plogger "github.com/heytribe/go-plogger"
"github.com/heytribe/live-webrtcsignaling/srtp"
)
type PipelineNodeJitterPublisher struct {
PipelineNode
// I/O
In chan *srtp.PacketRTP
// Publisher
Out chan *srtp.PacketRTP
// Listener
OutRTP chan *srtp.PacketRTP
OutRTCP chan *RtpUdpPacket
// private
buffer *JitterBuffer
}
func NewPipelineNodeJitterPublisher(ctx context.Context, codecOption CodecOptions, pt uint16, ptRtx uint16,
freq uint32, ssrc uint32, rtxSsrc uint32,
jst JitterStreamTypeOptions, bitrate Bitrate, rtt *int64) *PipelineNodeJitterPublisher {
n := new(PipelineNodeJitterPublisher)
// FIXME: check err
n.buffer, _ = NewJitterBufferPublisher(ctx, codecOption, pt, ptRtx, freq, ssrc, rtxSsrc, jst, bitrate, rtt, n)
n.In = make(chan *srtp.PacketRTP, 128)
n.Out = make(chan *srtp.PacketRTP, 128)
n.OutRTP = make(chan *srtp.PacketRTP, 128)
n.OutRTCP = make(chan *RtpUdpPacket, 128)
return n
}
func (n *PipelineNodeJitterPublisher) SetRaddr(ctx context.Context, rAddr *net.UDPAddr) {
n.buffer.SetRaddr(rAddr)
}
func (n *PipelineNodeJitterPublisher) Run(ctx context.Context) {
n.Running = true
n.emitStart()
log := plogger.FromContextSafe(ctx)
for {
select {
case <-ctx.Done():
n.onStop(ctx)
return
case packet := <-n.In:
n.buffer.PushPacket(packet)
case packet := <-n.buffer.out:
select {
case n.Out <- packet:
default:
log.Warnf("Out is full, dropping packet from buffer.out")
}
case packet := <-n.buffer.outRTP:
select {
case n.OutRTP <- packet:
default:
log.Warnf("OutRTP is full, dropping packet from buffer.outRTP")
}
case packet := <-n.buffer.outRTCP:
select {
case n.OutRTCP <- packet:
default:
log.Warnf("OutRTCP is full, dropping packet from buffer.outRTCP")
}
case event := <-n.buffer.event:
select {
case n.Bus <- event:
default:
log.Warnf("Bus is full, dropping packet from buffer.event")
}
}
}
}
func (n *PipelineNodeJitterPublisher) SetJitterSize(size uint64) {
n.buffer.SetJitterSize(size)
}
func (n *PipelineNodeJitterPublisher) SendPLI() {
n.buffer.SendPLI()
}
func (n *PipelineNodeJitterPublisher) SendFIR() {
// Should change for creating a RTCP FIR packet
n.buffer.SendPLI()
}