diff --git a/pkg/media/opus/opus.go b/pkg/media/opus/opus.go index 4066e6a6..5ca6afa2 100644 --- a/pkg/media/opus/opus.go +++ b/pkg/media/opus/opus.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "io" + "sync" "time" "gopkg.in/hraban/opus.v2" @@ -76,21 +77,26 @@ func Encode(w Writer, channels int, log logger.Logger) (msdk.PCM16Writer, error) log.Errorw("cannot initialize opus encoder", err) return nil, err } + samples := w.SampleRate() / rtp.DefFramesPerSec return &encoder{ - w: w, - p: p, - enc: enc, - buf: make([]byte, w.SampleRate()/rtp.DefFramesPerSec), - log: log, + w: w, + p: p, + enc: enc, + samples: samples, + inbuf: make(msdk.PCM16Sample, 0, samples), + buf: make([]byte, 4*channels*samples), + log: log, }, nil } type decoder struct { - w msdk.PCM16Writer + log logger.Logger p params + + mu sync.Mutex + w msdk.PCM16Writer dec *opus.Decoder buf msdk.PCM16Sample - log logger.Logger successiveErrorCount int } @@ -107,6 +113,8 @@ func (d *decoder) WriteSample(in Sample) error { if len(in) == 0 { return nil } + d.mu.Lock() + defer d.mu.Unlock() n, err := d.dec.Decode(in, d.buf) if err != nil { // Some workflows (concatenating opus files) can cause a suprious decoding error, so ignore small amount of corruption errors @@ -123,15 +131,21 @@ func (d *decoder) WriteSample(in Sample) error { } func (d *decoder) Close() error { + d.mu.Lock() + defer d.mu.Unlock() return d.w.Close() } type encoder struct { - w Writer - p params - enc *opus.Encoder - buf Sample - log logger.Logger + log logger.Logger + p params + samples int + + mu sync.Mutex + w Writer + enc *opus.Encoder + inbuf msdk.PCM16Sample + buf Sample successiveErrorCount int } @@ -145,20 +159,51 @@ func (e *encoder) SampleRate() int { } func (e *encoder) WriteSample(in msdk.PCM16Sample) error { - n, err := e.enc.Encode(in, e.buf) - if err != nil { - if e.successiveErrorCount < 5 { - e.log.Errorw("error encoding opus sample", err, "len", len(in), "n", n) - e.successiveErrorCount++ + e.mu.Lock() + defer e.mu.Unlock() + e.inbuf = append(e.inbuf, in...) + for len(e.inbuf) >= e.samples { + i := e.samples + + n, err := e.enc.Encode(e.inbuf[:i], e.buf) + + sz := copy(e.inbuf, e.inbuf[i:]) + e.inbuf = e.inbuf[:sz] + if err != nil { + if e.successiveErrorCount < 5 { + e.log.Errorw("error encoding opus sample", err, "len", len(in), "buf", len(e.buf), "n", n) + e.successiveErrorCount++ + } + return err + } + e.successiveErrorCount = 0 + if err = e.w.WriteSample(e.buf[:n]); err != nil { + return err } + } + return nil +} + +func (e *encoder) flush() error { + if len(e.inbuf) == 0 { + return nil + } + n, err := e.enc.Encode(e.inbuf, e.buf) + if err != nil { return err } - e.successiveErrorCount = 0 return e.w.WriteSample(e.buf[:n]) } func (e *encoder) Close() error { - return e.w.Close() + e.mu.Lock() + defer e.mu.Unlock() + err1 := e.flush() + err2 := e.w.Close() + if err1 != nil { + return err1 + } + return err2 } func NewWebmWriter(w io.WriteCloser, sampleRate int, sampleDur time.Duration) msdk.WriteCloser[Sample] {