Skip to content

Commit 043388c

Browse files
authored
Log media call stats. (#376)
1 parent 94886b6 commit 043388c

File tree

9 files changed

+313
-22
lines changed

9 files changed

+313
-22
lines changed

pkg/mixer/mixer.go

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ package mixer
1616

1717
import (
1818
"fmt"
19+
"slices"
1920
"sync"
21+
"sync/atomic"
2022
"time"
2123

2224
"github.com/frostbyte73/core"
@@ -35,6 +37,26 @@ const (
3537
inputBufferMin = inputBufferFrames/2 + 1
3638
)
3739

40+
type Stats struct {
41+
Tracks atomic.Int64
42+
TracksTotal atomic.Uint64
43+
Restarts atomic.Uint64
44+
45+
Mixes atomic.Uint64
46+
TimedMixes atomic.Uint64
47+
JumpMixes atomic.Uint64
48+
ZeroMixes atomic.Uint64
49+
50+
InputSamples atomic.Uint64
51+
InputFrames atomic.Uint64
52+
53+
MixedSamples atomic.Uint64
54+
MixedFrames atomic.Uint64
55+
56+
OutputSamples atomic.Uint64
57+
OutputFrames atomic.Uint64
58+
}
59+
3860
type Input struct {
3961
m *Mixer
4062
sampleRate int
@@ -58,11 +80,13 @@ type Mixer struct {
5880
lastMixEndTs time.Time
5981
stopped core.Fuse
6082
mixCnt uint
83+
84+
stats *Stats
6185
}
6286

63-
func NewMixer(out msdk.Writer[msdk.PCM16Sample], bufferDur time.Duration) *Mixer {
87+
func NewMixer(out msdk.Writer[msdk.PCM16Sample], bufferDur time.Duration, st *Stats) *Mixer {
6488
mixSize := int(time.Duration(out.SampleRate()) * bufferDur / time.Second)
65-
m := newMixer(out, mixSize)
89+
m := newMixer(out, mixSize, st)
6690
m.tickerDur = bufferDur
6791
m.ticker = time.NewTicker(bufferDur)
6892

@@ -71,12 +95,16 @@ func NewMixer(out msdk.Writer[msdk.PCM16Sample], bufferDur time.Duration) *Mixer
7195
return m
7296
}
7397

74-
func newMixer(out msdk.Writer[msdk.PCM16Sample], mixSize int) *Mixer {
98+
func newMixer(out msdk.Writer[msdk.PCM16Sample], mixSize int, st *Stats) *Mixer {
99+
if st == nil {
100+
st = new(Stats)
101+
}
75102
return &Mixer{
76103
out: out,
77104
sampleRate: out.SampleRate(),
78105
mixBuf: make([]int32, mixSize),
79106
mixTmp: make(msdk.PCM16Sample, mixSize),
107+
stats: st,
80108
}
81109
}
82110

@@ -90,6 +118,10 @@ func (m *Mixer) mixInputs() {
90118
if n == 0 {
91119
continue
92120
}
121+
122+
m.stats.MixedFrames.Add(1)
123+
m.stats.MixedSamples.Add(uint64(n))
124+
93125
m.mixTmp = m.mixTmp[:n]
94126
for j, v := range m.mixTmp {
95127
// Add the samples. This can potentially lead to overflow, but is unlikely and dividing by the source
@@ -106,6 +138,7 @@ func (m *Mixer) reset() {
106138
}
107139

108140
func (m *Mixer) mixOnce() {
141+
m.stats.Mixes.Add(1)
109142
m.mixCnt++
110143
m.reset()
111144
m.mixInputs()
@@ -121,6 +154,10 @@ func (m *Mixer) mixOnce() {
121154
}
122155
out[i] = int16(v)
123156
}
157+
158+
m.stats.OutputFrames.Add(1)
159+
m.stats.OutputSamples.Add(uint64(len(out)))
160+
124161
_ = m.out.WriteSample(out)
125162
}
126163

@@ -129,6 +166,7 @@ func (m *Mixer) mixUpdate() {
129166
now := time.Now()
130167

131168
if m.lastMixEndTs.IsZero() {
169+
m.stats.TimedMixes.Add(1)
132170
m.lastMixEndTs = now
133171
n = 1
134172
} else {
@@ -137,8 +175,12 @@ func (m *Mixer) mixUpdate() {
137175
if dt := now.Sub(m.lastMixEndTs); dt > 0 {
138176
n = int(dt / m.tickerDur)
139177
m.lastMixEndTs = m.lastMixEndTs.Add(time.Duration(n) * m.tickerDur)
178+
m.stats.JumpMixes.Add(uint64(n))
140179
}
141180
}
181+
if n == 0 {
182+
m.stats.ZeroMixes.Add(1)
183+
}
142184
if n > inputBufferFrames {
143185
n = inputBufferFrames
144186
// reset
@@ -178,6 +220,9 @@ func (m *Mixer) NewInput() *Input {
178220
return nil
179221
}
180222

223+
m.stats.Tracks.Add(1)
224+
m.stats.TracksTotal.Add(1)
225+
181226
inp := &Input{
182227
m: m,
183228
sampleRate: m.sampleRate,
@@ -194,12 +239,12 @@ func (m *Mixer) RemoveInput(inp *Input) {
194239
}
195240
m.mu.Lock()
196241
defer m.mu.Unlock()
197-
for i, cur := range m.inputs {
198-
if cur == inp {
199-
m.inputs = append(m.inputs[:i], m.inputs[i+1:]...)
200-
break
201-
}
242+
i := slices.Index(m.inputs, inp)
243+
if i < 0 {
244+
return
202245
}
246+
m.inputs = slices.Delete(m.inputs, i, i+1)
247+
m.stats.Tracks.Add(-1)
203248
}
204249

205250
func (m *Mixer) String() string {
@@ -223,6 +268,7 @@ func (i *Input) readSample(bufMin int, out msdk.PCM16Sample) (int, error) {
223268
n, err := i.buf.Read(out)
224269
if n == 0 {
225270
i.buffering = true // starving; pause the input and start buffering again
271+
i.m.stats.Restarts.Add(1)
226272
}
227273
return n, err
228274
}
@@ -246,6 +292,10 @@ func (i *Input) Close() error {
246292
func (i *Input) WriteSample(sample msdk.PCM16Sample) error {
247293
i.mu.Lock()
248294
defer i.mu.Unlock()
295+
296+
i.m.stats.InputFrames.Add(1)
297+
i.m.stats.InputSamples.Add(uint64(len(sample)))
298+
249299
_, err := i.buf.Write(sample)
250300
return err
251301
}

pkg/mixer/mixer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ type testMixer struct {
6161
func newTestMixer(t testing.TB) *testMixer {
6262
m := &testMixer{t: t}
6363

64-
m.Mixer = newMixer(newTestWriter(&m.sample, 8000), 5)
64+
m.Mixer = newMixer(newTestWriter(&m.sample, 8000), 5, nil)
6565
return m
6666
}
6767

pkg/sip/inbound.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@ type inboundCall struct {
353353
forwardDTMF atomic.Bool
354354
done atomic.Bool
355355
started core.Fuse
356+
stats Stats
356357
jitterBuf bool
357358
}
358359

@@ -375,9 +376,10 @@ func (s *Server) newInboundCall(
375376
state: state,
376377
extraAttrs: extra,
377378
dtmf: make(chan dtmf.Event, 10),
378-
lkRoom: NewRoom(log), // we need it created earlier so that the audio mixer is available for pin prompts
379379
jitterBuf: SelectValueBool(s.conf.EnableJitterBuffer, s.conf.EnableJitterBufferProb),
380380
}
381+
// we need it created earlier so that the audio mixer is available for pin prompts
382+
c.lkRoom = NewRoom(log, &c.stats.Room)
381383
c.log = c.log.WithValues("jitterBuf", c.jitterBuf)
382384
c.ctx, c.cancel = context.WithCancel(context.Background())
383385
s.cmu.Lock()
@@ -596,6 +598,7 @@ func (c *inboundCall) runMediaConn(offerData []byte, enc livekit.SIPMediaEncrypt
596598
MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial,
597599
MediaTimeout: c.s.conf.MediaTimeout,
598600
EnableJitterBuffer: c.jitterBuf,
601+
Stats: &c.stats.Port,
599602
}, RoomSampleRate)
600603
if err != nil {
601604
return nil, err
@@ -770,6 +773,9 @@ func (c *inboundCall) close(error bool, status CallStatus, reason string) {
770773
c.mon.CallTerminate(reason)
771774
sipCode, sipStatus := status.SIPStatus()
772775
log := c.log.WithValues("status", sipCode, "reason", reason)
776+
defer func() {
777+
log.Infow("call statistics", "stats", c.stats.Load())
778+
}()
773779
if error {
774780
log.Warnw("Closing inbound call with error", nil)
775781
} else {

0 commit comments

Comments
 (0)