From 233f49e6097928e57084e7ef89cf6efdbecd53bc Mon Sep 17 00:00:00 2001 From: Denys Smirnov Date: Mon, 29 Sep 2025 23:52:21 +0300 Subject: [PATCH 1/2] Only pass full frames to opus encoder. Increase buffer size. --- pkg/media/opus/opus.go | 66 +++++++++++++++++++++++++++++++----------- 1 file changed, 49 insertions(+), 17 deletions(-) diff --git a/pkg/media/opus/opus.go b/pkg/media/opus/opus.go index 4066e6a6..54909f02 100644 --- a/pkg/media/opus/opus.go +++ b/pkg/media/opus/opus.go @@ -76,12 +76,15 @@ func Encode(w Writer, channels int, log logger.Logger) (msdk.PCM16Writer, error) log.Errorw("cannot initialize opus encoder", err) return nil, err } + samples := w.SampleRate() / rtp.DefFramesPerSec return &encoder{ - w: w, - p: p, - enc: enc, - buf: make([]byte, w.SampleRate()/rtp.DefFramesPerSec), - log: log, + w: w, + p: p, + enc: enc, + samples: samples, + inbuf: make(msdk.PCM16Sample, 0, samples), + buf: make([]byte, 4*channels*samples), + log: log, }, nil } @@ -127,11 +130,13 @@ func (d *decoder) Close() error { } type encoder struct { - w Writer - p params - enc *opus.Encoder - buf Sample - log logger.Logger + w Writer + p params + enc *opus.Encoder + samples int + inbuf msdk.PCM16Sample + buf Sample + log logger.Logger successiveErrorCount int } @@ -145,20 +150,47 @@ func (e *encoder) SampleRate() int { } func (e *encoder) WriteSample(in msdk.PCM16Sample) error { - n, err := e.enc.Encode(in, e.buf) - if err != nil { - if e.successiveErrorCount < 5 { - e.log.Errorw("error encoding opus sample", err, "len", len(in), "n", n) - e.successiveErrorCount++ + e.inbuf = append(e.inbuf, in...) + for len(e.inbuf) >= e.samples { + i := e.samples + + n, err := e.enc.Encode(e.inbuf[:i], e.buf) + + sz := copy(e.inbuf, e.inbuf[i:]) + e.inbuf = e.inbuf[:sz] + if err != nil { + if e.successiveErrorCount < 5 { + e.log.Errorw("error encoding opus sample", err, "len", len(in), "buf", len(e.buf), "n", n) + e.successiveErrorCount++ + } + return err + } + e.successiveErrorCount = 0 + if err = e.w.WriteSample(e.buf[:n]); err != nil { + return err } + } + return nil +} + +func (e *encoder) flush() error { + if len(e.inbuf) == 0 { + return nil + } + n, err := e.enc.Encode(e.inbuf, e.buf) + if err != nil { return err } - e.successiveErrorCount = 0 return e.w.WriteSample(e.buf[:n]) } func (e *encoder) Close() error { - return e.w.Close() + err1 := e.flush() + err2 := e.w.Close() + if err1 != nil { + return err1 + } + return err2 } func NewWebmWriter(w io.WriteCloser, sampleRate int, sampleDur time.Duration) msdk.WriteCloser[Sample] { From fcf23bc94314e3d277ef3490130de15ee2be6d2f Mon Sep 17 00:00:00 2001 From: Denys Smirnov Date: Tue, 28 Oct 2025 06:40:29 +0200 Subject: [PATCH 2/2] Protect encoder and decoder with a mutex. --- pkg/media/opus/opus.go | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/pkg/media/opus/opus.go b/pkg/media/opus/opus.go index 54909f02..5ca6afa2 100644 --- a/pkg/media/opus/opus.go +++ b/pkg/media/opus/opus.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "io" + "sync" "time" "gopkg.in/hraban/opus.v2" @@ -89,11 +90,13 @@ func Encode(w Writer, channels int, log logger.Logger) (msdk.PCM16Writer, error) } type decoder struct { - w msdk.PCM16Writer + log logger.Logger p params + + mu sync.Mutex + w msdk.PCM16Writer dec *opus.Decoder buf msdk.PCM16Sample - log logger.Logger successiveErrorCount int } @@ -110,6 +113,8 @@ func (d *decoder) WriteSample(in Sample) error { if len(in) == 0 { return nil } + d.mu.Lock() + defer d.mu.Unlock() n, err := d.dec.Decode(in, d.buf) if err != nil { // Some workflows (concatenating opus files) can cause a suprious decoding error, so ignore small amount of corruption errors @@ -126,17 +131,21 @@ func (d *decoder) WriteSample(in Sample) error { } func (d *decoder) Close() error { + d.mu.Lock() + defer d.mu.Unlock() return d.w.Close() } type encoder struct { - w Writer + log logger.Logger p params - enc *opus.Encoder samples int - inbuf msdk.PCM16Sample - buf Sample - log logger.Logger + + mu sync.Mutex + w Writer + enc *opus.Encoder + inbuf msdk.PCM16Sample + buf Sample successiveErrorCount int } @@ -150,6 +159,8 @@ func (e *encoder) SampleRate() int { } func (e *encoder) WriteSample(in msdk.PCM16Sample) error { + e.mu.Lock() + defer e.mu.Unlock() e.inbuf = append(e.inbuf, in...) for len(e.inbuf) >= e.samples { i := e.samples @@ -185,6 +196,8 @@ func (e *encoder) flush() error { } func (e *encoder) Close() error { + e.mu.Lock() + defer e.mu.Unlock() err1 := e.flush() err2 := e.w.Close() if err1 != nil {