This repository was archived by the owner on Oct 10, 2023. It is now read-only.
forked from AFathi/live-webrtcsignaling
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpipeline.node.splitrtcpav.go
100 lines (91 loc) · 2.39 KB
/
pipeline.node.splitrtcpav.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package main
import (
"context"
"strings"
plogger "github.com/heytribe/go-plogger"
"github.com/heytribe/live-webrtcsignaling/rtcp"
"github.com/heytribe/live-webrtcsignaling/srtp"
)
type PipelineNodeSplitRTCPAV struct {
PipelineNode
// I/O
In chan *srtp.PacketRTCP
OutPacketRTCPAudio chan *srtp.PacketRTCP
OutPacketRTCPVideo chan *srtp.PacketRTCP
// private
audio []uint32
video []uint32
exit chan struct{}
}
func NewPipelineNodeSplitRTCPAV(audio, video []uint32) *PipelineNodeSplitRTCPAV {
n := new(PipelineNodeSplitRTCPAV)
n.In = make(chan *srtp.PacketRTCP, 128)
n.OutPacketRTCPAudio = make(chan *srtp.PacketRTCP, 128)
n.OutPacketRTCPVideo = make(chan *srtp.PacketRTCP, 128)
//
n.audio = audio
n.video = video
return n
}
func (n *PipelineNodeSplitRTCPAV) IsAudio(ssrcId uint32) bool {
for i := 0; i < len(n.audio); i++ {
if n.audio[i] == ssrcId {
return true
}
}
return false
}
func (n *PipelineNodeSplitRTCPAV) IsVideo(ssrcId uint32) bool {
for i := 0; i < len(n.video); i++ {
if n.video[i] == ssrcId {
return true
}
}
return false
}
func (n *PipelineNodeSplitRTCPAV) Run(ctx context.Context) {
n.Running = true
n.emitStart()
log := plogger.FromContextSafe(ctx).Prefix("SplitAV").Prefix("RTCP")
// tempfix
rtcpParser := rtcp.NewParser(rtcp.Dependencies{Logger: log})
for {
select {
case <-ctx.Done():
n.onStop(ctx)
return
case packetRTCP := <-n.In:
if packetRTCP.GetSize() < 12 {
log.Warnf("udp packet length should be > 12")
continue
}
ssrcId := packetRTCP.GetSSRCid()
switch {
case n.IsAudio(ssrcId):
select {
case n.OutPacketRTCPAudio <- packetRTCP:
default:
log.Warnf("OutPacketRTCPAudio is full, dropping packet from In")
}
case n.IsVideo(ssrcId):
select {
case n.OutPacketRTCPVideo <- packetRTCP:
default:
log.Warnf("OutPacketRTCPVideo is full, dropping packet from In")
}
default:
audiosIds := strings.Join(uint32sToStrings(n.audio), ",")
videosIds := strings.Join(uint32sToStrings(n.video), ",")
log.Warnf("packet format unknown, pt=%d ssrcId=%d VS audioList=%s & videoList=%s", packetRTCP.GetPT(), ssrcId, audiosIds, videosIds)
packets, err := rtcpParser.Parse(packetRTCP)
if err != nil {
log.Errorf(err.Error())
} else {
for i := 0; i < len(packets); i++ {
log.Warnf("UNKNOWN PACKET %d: %v", i, packets[i])
}
}
}
}
}
}