Skip to content

Commit 86ce575

Browse files
committed
feat: renamed the module to avmuxer
1 parent 3767018 commit 86ce575

13 files changed

+268
-33
lines changed

g711_stream.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package multiplexer
1+
package avmuxer
22

33
import (
44
"bytes"

go.mod

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
module github.com/itzmanish/multiplexer
1+
module github.com/itzmanish/avmuxer
22

33
go 1.22.1
44

@@ -32,6 +32,7 @@ require (
3232
github.com/pion/transport/v3 v3.0.6 // indirect
3333
github.com/pion/turn/v3 v3.0.3 // indirect
3434
github.com/pmezard/go-difflib v1.0.0 // indirect
35+
github.com/stretchr/objx v0.5.2 // indirect
3536
github.com/wlynxg/anet v0.0.3 // indirect
3637
golang.org/x/crypto v0.25.0 // indirect
3738
golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56 // indirect

go.sum

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
5555
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
5656
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
5757
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
58+
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
5859
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
5960
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
6061
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=

mutliplexer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package multiplexer
1+
package avmuxer
22

33
import (
44
"errors"

mutliplexer_test.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package multiplexer
1+
package avmuxer
22

33
import (
44
"encoding/binary"
@@ -223,3 +223,13 @@ func TestMultiplexer_ReadPCM16(t *testing.T) {
223223
time.Sleep(20 * time.Millisecond)
224224
}
225225
}
226+
227+
func generateTestPCM(sampleCount, repeatCount int) []int16 {
228+
pcm := make([]int16, sampleCount*repeatCount)
229+
for i := 0; i < repeatCount; i++ {
230+
for j := 0; j < sampleCount; j++ {
231+
pcm[i*sampleCount+j] = int16((j % 100) * 100)
232+
}
233+
}
234+
return pcm
235+
}

opus_stream.go

+44-23
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
package multiplexer
1+
package avmuxer
22

33
import (
44
"errors"
55
"io"
66
"log"
77
)
88

9+
// OpusStream interface defines the methods for both encoding and decoding Opus streams
910
type OpusStream interface {
1011
Stream
1112

@@ -17,8 +18,10 @@ type OpusStream interface {
1718
ChannelCount() int
1819
SampleDurationMs() int
1920
SampleCount() int
21+
ID() string
2022
}
2123

24+
// opusEncodingStream implements OpusStream for encoding
2225
type opusEncodingStream struct {
2326
id string
2427
sampleRate int
@@ -31,6 +34,7 @@ type opusEncodingStream struct {
3134
encoder *OpusEncoder
3235
}
3336

37+
// opusDecodingStream implements OpusStream for decoding
3438
type opusDecodingStream struct {
3539
id string
3640
sampleRate int
@@ -43,12 +47,18 @@ type opusDecodingStream struct {
4347
decoder *OpusDecoder
4448
}
4549

50+
// NewDecodingOpusStream creates a new OpusStream for decoding
4651
func NewDecodingOpusStream(id string, sampleRate, sampleDuration, channel int) (OpusStream, error) {
52+
// Calculate sample size based on duration and rate
4753
sampleSize := sampleDuration * sampleRate / 1000
54+
55+
// Create a new OpusDecoder
4856
dec, err := NewOpusDecoder(sampleRate, channel, sampleSize)
4957
if err != nil {
5058
return nil, err
5159
}
60+
61+
// Return a new opusDecodingStream
5262
return &opusDecodingStream{
5363
id: id,
5464
sampleRate: sampleRate,
@@ -60,7 +70,10 @@ func NewDecodingOpusStream(id string, sampleRate, sampleDuration, channel int) (
6070
}, err
6171
}
6272

73+
// NewEncodingOpusStream creates a new OpusStream for encoding
6374
func NewEncodingOpusStream(id string, sampleRate, sampleDuration, channel int) (OpusStream, error) {
75+
// Similar to NewDecodingOpusStream, but for encoding
76+
// ... existing code ...
6477
sampleSize := sampleDuration * sampleRate / 1000
6578
enc, err := NewOpusEncoder(sampleRate, channel, sampleSize)
6679
if err != nil {
@@ -89,38 +102,46 @@ func (ods *opusDecodingStream) SampleRate() int {
89102
func (ods *opusDecodingStream) SampleDurationMs() int {
90103
return ods.sampleDurationMs
91104
}
105+
func (ods *opusDecodingStream) ID() string {
106+
return ods.id
107+
}
92108

93109
func (ods *opusDecodingStream) Decode(src []byte, dst []int16) (int, error) {
94110
return ods.decoder.Decode(src, dst)
95111
}
96112

97-
// It takes encoded opus data and decode then store in the buffer
113+
// Write decodes Opus data and writes PCM to the sink
98114
func (ods *opusDecodingStream) Write(data []byte) (int, error) {
115+
// Decode Opus data to PCM
99116
pcm := make([]int16, ods.size*ods.channel)
100117
n, err := ods.Decode(data, pcm)
101118
if err != nil {
102119
return 0, err
103120
}
121+
104122
log.Printf("samples decoded: %v, os.size: %v, data size: %v\n", n, ods.size, len(data))
123+
124+
// Write decoded PCM to sink if available
105125
if ods.sink != nil {
106126
_, err := ods.sink.Write(int16ToByteSlice(pcm[:n*ods.channel]))
107127
if err != nil {
108128
return 0, err
109129
}
110130
}
111-
// FIXME(itzmanish): do we need to store the pcm data?
131+
132+
// Write PCM data to buffer
112133
return ods.decoder.buffer.Write(pcm[:n*ods.channel])
113134
}
114135

115-
// This reads the raw PCM data from Decoding OPUS stream
136+
// ReadPCM reads raw PCM data from the decoding buffer
116137
func (ods *opusDecodingStream) ReadPCM(dst []int16) (int, error) {
117138
if ods.decoder == nil {
118139
return 0, errors.New("stream is not decoding supported")
119140
}
120141
return ods.decoder.buffer.Read(dst)
121142
}
122143

123-
// This reads the raw PCM data from Decoding OPUS stream and convert them to byte array
144+
// Read reads raw PCM data and converts it to a byte array
124145
func (ods *opusDecodingStream) Read(dst []byte) (int, error) {
125146
if ods.decoder == nil {
126147
return 0, errors.New("stream is not decoding supported")
@@ -167,6 +188,10 @@ func (oes *opusEncodingStream) SampleDurationMs() int {
167188
return oes.sampleDurationMs
168189
}
169190

191+
func (oes *opusEncodingStream) ID() string {
192+
return oes.id
193+
}
194+
170195
func (*opusEncodingStream) ReadPCM([]int16) (int, error) {
171196
return 0, errors.New("encoding stream doesn't support reading pcm")
172197
}
@@ -191,25 +216,21 @@ func (oes *opusEncodingStream) WritePCM(data []int16) (int, error) {
191216
return oes.encoder.buffer.Write(byteData[:n])
192217
}
193218

194-
// This reads encoded data after pulling and encoding from the reader
219+
// Read reads encoded Opus data from the encoder's buffer
195220
func (oes *opusEncodingStream) Read(dst []byte) (int, error) {
196-
// if oes.encoder == nil {
197-
// return 0, errors.New("stream is not encoding supported")
198-
// }
199-
// if oes.reader == nil {
200-
// return 0, errors.New("reader is not connected")
201-
// }
202-
// byteArr := make([]byte, oes.size*oes.channel*2)
203-
// n, err := oes.reader.Read(byteArr)
204-
// if err != nil {
205-
// return n, err
206-
// }
207-
// pcm := byteSliceToInt16(byteArr[:n])
208-
// _, err = oes.WritePCM(pcm)
209-
// if err != nil {
210-
// return 0, err
211-
// }
212-
return oes.encoder.buffer.Read(dst)
221+
if oes.encoder == nil {
222+
return 0, errors.New("encoder is not initialized")
223+
}
224+
225+
n, err := oes.encoder.buffer.Read(dst)
226+
if err != nil {
227+
if err == ErrEmptyBuffer {
228+
return 0, io.EOF
229+
}
230+
return n, err
231+
}
232+
233+
return n, nil
213234
}
214235

215236
func (oes *opusEncodingStream) Encode(src []int16, dst []byte) (int, error) {

opus_stream_test.go

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package avmuxer
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestNewDecodingOpusStream(t *testing.T) {
10+
stream, err := NewDecodingOpusStream("stream1", 48000, 20, 2)
11+
assert.NoError(t, err)
12+
assert.NotNil(t, stream)
13+
14+
assert.Equal(t, 48000, stream.SampleRate())
15+
assert.Equal(t, 2, stream.ChannelCount())
16+
assert.Equal(t, 20, stream.SampleDurationMs())
17+
assert.Equal(t, 960, stream.SampleCount()) // sampleDuration * sampleRate / 1000
18+
}
19+
20+
func TestNewEncodingOpusStream(t *testing.T) {
21+
stream, err := NewEncodingOpusStream("stream1", 48000, 20, 2)
22+
assert.NoError(t, err)
23+
assert.NotNil(t, stream)
24+
25+
assert.Equal(t, 48000, stream.SampleRate())
26+
assert.Equal(t, 2, stream.ChannelCount())
27+
assert.Equal(t, 20, stream.SampleDurationMs())
28+
assert.Equal(t, 960, stream.SampleCount()) // sampleDuration * sampleRate / 1000
29+
}
30+
31+
func TestDecodingStream_Write(t *testing.T) {
32+
stream, err := NewDecodingOpusStream("stream1", 48000, 20, 2)
33+
assert.NoError(t, err)
34+
assert.NotNil(t, stream)
35+
36+
encodedData := make([]byte, 960)
37+
n, err := stream.Write(encodedData)
38+
assert.NoError(t, err)
39+
assert.Equal(t, len(encodedData), n)
40+
}
41+
42+
func TestEncodingStream_WritePCM(t *testing.T) {
43+
stream, err := NewEncodingOpusStream("stream1", 48000, 20, 2)
44+
assert.NoError(t, err)
45+
assert.NotNil(t, stream)
46+
47+
pcmData := make([]int16, 960) // Example PCM data
48+
n, err := stream.WritePCM(pcmData)
49+
assert.NoError(t, err)
50+
assert.Equal(t, 960, n)
51+
}
52+
53+
func TestDecodingStream_ReadPCM(t *testing.T) {
54+
stream, err := NewDecodingOpusStream("stream1", 48000, 20, 2)
55+
assert.NoError(t, err)
56+
assert.NotNil(t, stream)
57+
58+
dst := make([]int16, 960)
59+
n, err := stream.ReadPCM(dst)
60+
assert.NoError(t, err)
61+
assert.Equal(t, 960, n)
62+
}
63+
64+
func TestEncodingStream_Read(t *testing.T) {
65+
stream, err := NewEncodingOpusStream("stream1", 48000, 20, 2)
66+
assert.NoError(t, err)
67+
assert.NotNil(t, stream)
68+
69+
dst := make([]byte, 4096)
70+
n, err := stream.Read(dst)
71+
assert.NoError(t, err)
72+
assert.True(t, n > 0)
73+
}

ringbuffer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package multiplexer
1+
package avmuxer
22

33
import (
44
"errors"

ringbuffer_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
package multiplexer
1+
package avmuxer

testdata/muxed.pcm

-26.3 KB
Binary file not shown.

transcoder.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package multiplexer
1+
package avmuxer
22

33
import (
44
"errors"
@@ -16,15 +16,15 @@ func NewTranscoder() *Transcoder {
1616
return &Transcoder{}
1717
}
1818

19-
func (tc *Transcoder) AddSource(id string, stream Stream) error {
19+
func (tc *Transcoder) AddSource(stream Stream) error {
2020
if tc.input != nil {
2121
return errors.New("source is already present")
2222
}
2323
tc.input = stream
2424
return nil
2525
}
2626

27-
func (tc *Transcoder) AddEncoder(id string, enc Encoder) error {
27+
func (tc *Transcoder) AddEncoder(enc Encoder) error {
2828
if tc.encoder != nil {
2929
return errors.New("encoder is already present")
3030
}

0 commit comments

Comments
 (0)