diff --git a/.github/workflows/code-samples-check.yml b/.github/workflows/code-samples-check.yml index dde9905b14c..1fc11c2e7e1 100644 --- a/.github/workflows/code-samples-check.yml +++ b/.github/workflows/code-samples-check.yml @@ -5,6 +5,8 @@ on: branches: [ main ] types: [ opened ] paths: + - "components/audioin/audioin.go" + - "components/audioout/audioout.go" - "components/base/base.go" - "components/board/board.go" - "components/camera/camera.go" @@ -12,7 +14,6 @@ on: - "components/sensor/sensor.go" - "components/servo/servo.go" - "components/arm/arm.go" - - "components/audioinput/audio_input.go" - "components/gantry/gantry.go" - "components/gripper/gripper.go" - "components/input/input.go" @@ -20,7 +21,6 @@ on: - "components/posetracker/pose_tracker.go" - "services/motion/motion.go" - "services/vision/vision.go" - jobs: comment: name: "Post Comment on PR" @@ -40,4 +40,4 @@ jobs: header: Code-Samples Warning number: ${{ env.PR_NUMBER }} recreate: true - path: code-samples-warning.md \ No newline at end of file + path: code-samples-warning.md diff --git a/.github/workflows/test-module-generation.yml b/.github/workflows/test-module-generation.yml index fa41c5d1757..aea0379cb37 100644 --- a/.github/workflows/test-module-generation.yml +++ b/.github/workflows/test-module-generation.yml @@ -14,7 +14,8 @@ jobs: resource: [ { subtype: "arm" }, - { subtype: "audio_input" }, + { subtype: "audio_in" }, + { subtype: "audio_out" }, { subtype: "base" }, { subtype: "board" }, { subtype: "camera" }, diff --git a/cli/module_generate/modulegen/inputs.go b/cli/module_generate/modulegen/inputs.go index 768854aed4a..8fd7d272ff0 100644 --- a/cli/module_generate/modulegen/inputs.go +++ b/cli/module_generate/modulegen/inputs.go @@ -44,7 +44,8 @@ type ModuleInputs struct { // Resources is a list of all the available resources in Viam. var Resources = []string{ "arm component", - "audio_input component", + "audio_in component", + "audio_out component", "base component", "board component", "button component", diff --git a/components/audioinput/audio_input.go b/components/audioinput/audio_input.go deleted file mode 100644 index 03075b5112d..00000000000 --- a/components/audioinput/audio_input.go +++ /dev/null @@ -1,159 +0,0 @@ -// Package audioinput defines an audio capturing device. -package audioinput - -import ( - "context" - "errors" - - "github.com/pion/mediadevices/pkg/prop" - pb "go.viam.com/api/component/audioinput/v1" - - "go.viam.com/rdk/data" - "go.viam.com/rdk/gostream" - "go.viam.com/rdk/resource" - "go.viam.com/rdk/robot" -) - -func init() { - resource.RegisterAPI(API, resource.APIRegistration[AudioInput]{ - RPCServiceServerConstructor: NewRPCServiceServer, - RPCServiceHandler: pb.RegisterAudioInputServiceHandlerFromEndpoint, - RPCServiceDesc: &pb.AudioInputService_ServiceDesc, - RPCClient: NewClientFromConn, - }) - data.RegisterCollector(data.MethodMetadata{ - API: API, - MethodName: doCommand.String(), - }, newDoCommandCollector) -} - -// SubtypeName is a constant that identifies the audio input resource subtype string. -const SubtypeName = "audio_input" - -// API is a variable that identifies the audio input resource API. -var API = resource.APINamespaceRDK.WithComponentType(SubtypeName) - -// Named is a helper for getting the named audio inputs's typed resource name. -func Named(name string) resource.Name { - return resource.NewName(API, name) -} - -// An AudioInput is a resource that can capture audio. -type AudioInput interface { - resource.Resource - AudioSource -} - -// An AudioSource represents anything that can capture audio. -type AudioSource interface { - gostream.AudioSource - gostream.AudioPropertyProvider -} - -// A LivenessMonitor is responsible for monitoring the liveness of an audio input. An example -// is connectivity. Since the model itself knows best about how to maintain this state, -// the reconfigurable offers a safe way to notify if a state needs to be reset due -// to some exceptional event (like a reconnect). -// It is expected that the monitoring code is tied to the lifetime of the resource -// and once the resource is closed, so should the monitor. That is, it should -// no longer send any resets once a Close on its associated resource has returned. -type LivenessMonitor interface { - Monitor(notifyReset func()) -} - -// Deprecated: FromDependencies is a helper for getting the named audio input from a collection of -// dependencies. Use FromProvider instead. -// -//nolint:revive // ignore exported comment check. -func FromDependencies(deps resource.Dependencies, name string) (AudioInput, error) { - return resource.FromDependencies[AudioInput](deps, Named(name)) -} - -// Deprecated: FromRobot is a helper for getting the named audio input from the given Robot. -// Use FromProvider instead. -// -//nolint:revive // ignore exported comment check -func FromRobot(r robot.Robot, name string) (AudioInput, error) { - return robot.ResourceFromRobot[AudioInput](r, Named(name)) -} - -// FromProvider is a helper for getting the named AudioInput from a resource Provider (collection of Dependencies or a Robot). -func FromProvider(provider resource.Provider, name string) (AudioInput, error) { - return resource.FromProvider[AudioInput](provider, Named(name)) -} - -// NamesFromRobot is a helper for getting all audio input names from the given Robot. -func NamesFromRobot(r robot.Robot) []string { - return robot.NamesByAPI(r, API) -} - -type audioPropertiesFunc func(ctx context.Context) (prop.Audio, error) - -func (apf audioPropertiesFunc) MediaProperties(ctx context.Context) (prop.Audio, error) { - return apf(ctx) -} - -// NewAudioSourceFromReader creates an AudioSource from a reader. -func NewAudioSourceFromReader(reader gostream.AudioReader, props prop.Audio) (AudioSource, error) { - if reader == nil { - return nil, errors.New("cannot have a nil reader") - } - as := gostream.NewAudioSource(reader, props) - return &audioSource{ - as: as, - prov: audioPropertiesFunc(func(ctx context.Context) (prop.Audio, error) { - return props, nil - }), - }, nil -} - -// FromAudioSource creates an AudioInput resource either from a AudioSource. -func FromAudioSource(name resource.Name, src AudioSource) (AudioInput, error) { - return &sourceBasedInput{ - Named: name.AsNamed(), - AudioSource: src, - }, nil -} - -type sourceBasedInput struct { - resource.Named - resource.AlwaysRebuild - AudioSource -} - -// NewAudioSourceFromGostreamSource creates an AudioSource from a gostream.AudioSource. -func NewAudioSourceFromGostreamSource(audSrc gostream.AudioSource) (AudioSource, error) { - if audSrc == nil { - return nil, errors.New("cannot have a nil audio source") - } - provider, ok := audSrc.(gostream.AudioPropertyProvider) - if !ok { - return nil, errors.New("source must have property provider") - } - return &audioSource{ - as: audSrc, - prov: provider, - }, nil -} - -// AudioSource implements an AudioInput with a gostream.AudioSource. -type audioSource struct { - as gostream.AudioSource - prov gostream.AudioPropertyProvider -} - -func (as *audioSource) Stream( - ctx context.Context, - errHandlers ...gostream.ErrorHandler, -) (gostream.AudioStream, error) { - return as.as.Stream(ctx, errHandlers...) -} - -func (as *audioSource) MediaProperties(ctx context.Context) (prop.Audio, error) { - return as.prov.MediaProperties(ctx) -} - -// Close closes the underlying AudioSource. -func (as *audioSource) Close(ctx context.Context) error { - return as.as.Close(ctx) -} diff --git a/components/audioinput/client.go b/components/audioinput/client.go deleted file mode 100644 index 321aa25fd23..00000000000 --- a/components/audioinput/client.go +++ /dev/null @@ -1,230 +0,0 @@ -package audioinput - -import ( - "context" - "io" - "math" - "sync" - - "github.com/pion/mediadevices/pkg/prop" - "github.com/pion/mediadevices/pkg/wave" - "github.com/pkg/errors" - pb "go.viam.com/api/component/audioinput/v1" - "go.viam.com/utils" - "go.viam.com/utils/rpc" - - "go.viam.com/rdk/gostream" - "go.viam.com/rdk/logging" - "go.viam.com/rdk/protoutils" - "go.viam.com/rdk/resource" -) - -// client is an audio input client. -type client struct { - resource.Named - resource.TriviallyReconfigurable - conn rpc.ClientConn - client pb.AudioInputServiceClient - logger logging.Logger - mu sync.Mutex - name string - activeBackgroundWorkers sync.WaitGroup - healthyClientCh chan struct{} -} - -// NewClientFromConn constructs a new Client from connection passed in. -func NewClientFromConn( - ctx context.Context, - conn rpc.ClientConn, - remoteName string, - name resource.Name, - logger logging.Logger, -) (AudioInput, error) { - c := pb.NewAudioInputServiceClient(conn) - return &client{ - Named: name.PrependRemote(remoteName).AsNamed(), - name: name.Name, - conn: conn, - client: c, - logger: logger, - }, nil -} - -func (c *client) Read(ctx context.Context) (wave.Audio, func(), error) { - stream, err := c.Stream(ctx) - if err != nil { - return nil, nil, err - } - defer func() { - if err := stream.Close(ctx); err != nil { - c.logger.CErrorw(ctx, "error closing stream", "error", err) - } - }() - return stream.Next(ctx) -} - -func (c *client) Stream( - ctx context.Context, - errHandlers ...gostream.ErrorHandler, -) (gostream.AudioStream, error) { - // RSDK-6340: The resource manager closes remote resources when the underlying - // connection goes bad. However, when the connection is re-established, the client - // objects these resources represent are not re-initialized/marked "healthy". - // `healthyClientCh` helps track these transitions between healthy and unhealthy - // states. - // - // When a new `client.Stream()` is created we will either use the existing - // `healthyClientCh` or create a new one. - // - // The goroutine a `Stream()` method spins off will listen to its version of the - // `healthyClientCh` to be notified when the connection has died so it can gracefully - // terminate. - // - // When a connection becomes unhealthy, the resource manager will call `Close` on the - // audioinput client object. Closing the client will: - // 1. close its `client.healthyClientCh` channel - // 2. wait for existing "stream" goroutines to drain - // 3. nil out the `client.healthyClientCh` member variable - // - // New streams concurrent with closing cannot start until this drain completes. There - // will never be stream goroutines from the old "generation" running concurrently - // with those from the new "generation". - c.mu.Lock() - if c.healthyClientCh == nil { - c.healthyClientCh = make(chan struct{}) - } - healthyClientCh := c.healthyClientCh - c.mu.Unlock() - - streamCtx, stream, chunkCh := gostream.NewMediaStreamForChannel[wave.Audio](context.Background()) - - chunksClient, err := c.client.Chunks(ctx, &pb.ChunksRequest{ - Name: c.name, - SampleFormat: pb.SampleFormat_SAMPLE_FORMAT_FLOAT32_INTERLEAVED, - }) - if err != nil { - return nil, err - } - - infoResp, err := chunksClient.Recv() - if err != nil { - return nil, err - } - infoProto := infoResp.GetInfo() - - c.mu.Lock() - if err := streamCtx.Err(); err != nil { - c.mu.Unlock() - return nil, err - } - c.activeBackgroundWorkers.Add(1) - c.mu.Unlock() - - utils.PanicCapturingGo(func() { - defer c.activeBackgroundWorkers.Done() - defer close(chunkCh) - - for { - if streamCtx.Err() != nil { - return - } - - var nextErr error - - chunkResp, err := chunksClient.Recv() - - var chunk wave.Audio - if err != nil { - if errors.Is(err, io.EOF) { - return - } - for _, handler := range errHandlers { - handler(streamCtx, err) - } - nextErr = err - } else { - chunkProto := chunkResp.GetChunk() - info := wave.ChunkInfo{ - Len: int(chunkProto.Length), - Channels: int(infoProto.Channels), - SamplingRate: int(infoProto.SamplingRate), - } - - switch infoProto.SampleFormat { - case pb.SampleFormat_SAMPLE_FORMAT_INT16_INTERLEAVED: - chunkActual := wave.NewInt16Interleaved(info) - for i := 0; i < info.Len; i++ { - chunkActual.Data[i] = int16(HostEndian.Uint16(chunkProto.Data[i*2:])) - } - chunk = chunkActual - case pb.SampleFormat_SAMPLE_FORMAT_FLOAT32_INTERLEAVED: - chunkActual := wave.NewFloat32Interleaved(info) - for i := 0; i < info.Len; i++ { - chunkActual.Data[i] = math.Float32frombits(HostEndian.Uint32(chunkProto.Data[i*4:])) - } - chunk = chunkActual - case pb.SampleFormat_SAMPLE_FORMAT_UNSPECIFIED: - fallthrough - default: - nextErr = errors.Errorf("unknown type of audio sample format %v", infoProto.SampleFormat) - } - } - - select { - case <-streamCtx.Done(): - return - case <-healthyClientCh: - if err := stream.Close(context.Background()); err != nil { - c.logger.Warn("error closing stream", err) - } - return - case chunkCh <- gostream.MediaReleasePairWithError[wave.Audio]{ - Media: chunk, - Release: func() {}, - Err: nextErr, - }: - } - } - }) - - return stream, nil -} - -func (c *client) MediaProperties(ctx context.Context) (prop.Audio, error) { - resp, err := c.client.Properties(ctx, &pb.PropertiesRequest{ - Name: c.name, - }) - if err != nil { - return prop.Audio{}, err - } - return prop.Audio{ - ChannelCount: int(resp.ChannelCount), - Latency: resp.Latency.AsDuration(), - SampleRate: int(resp.SampleRate), - SampleSize: int(resp.SampleSize), - IsBigEndian: resp.IsBigEndian, - IsFloat: resp.IsFloat, - IsInterleaved: resp.IsInterleaved, - }, nil -} - -func (c *client) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) { - return protoutils.DoFromResourceClient(ctx, c.client, c.name, cmd) -} - -// TODO(RSDK-6433): This method can be called more than once during a client's lifecycle. -// For example, consider a case where a remote audioinput goes offline and then back -// online. We will call `Close` on the audioinput client when we detect the disconnection -// to remove active streams but then reuse the client when the connection is -// re-established. -func (c *client) Close(ctx context.Context) error { - c.mu.Lock() - defer c.mu.Unlock() - - if c.healthyClientCh != nil { - close(c.healthyClientCh) - } - c.activeBackgroundWorkers.Wait() - c.healthyClientCh = nil - return nil -} diff --git a/components/audioinput/client_test.go b/components/audioinput/client_test.go deleted file mode 100644 index b32f9c33054..00000000000 --- a/components/audioinput/client_test.go +++ /dev/null @@ -1,223 +0,0 @@ -package audioinput_test - -import ( - "context" - "errors" - "net" - "testing" - - "github.com/pion/mediadevices/pkg/prop" - "github.com/pion/mediadevices/pkg/wave" - "go.viam.com/test" - "go.viam.com/utils/rpc" - - "go.viam.com/rdk/components/audioinput" - "go.viam.com/rdk/gostream" - viamgrpc "go.viam.com/rdk/grpc" - "go.viam.com/rdk/logging" - "go.viam.com/rdk/resource" - "go.viam.com/rdk/testutils" - "go.viam.com/rdk/testutils/inject" -) - -var ( - testAudioInputName = "audio1" - failAudioInputName = "audio2" -) - -func TestClient(t *testing.T) { - logger := logging.NewTestLogger(t) - listener1, err := net.Listen("tcp", "localhost:0") - test.That(t, err, test.ShouldBeNil) - rpcServer, err := rpc.NewServer(logger, rpc.WithUnauthenticated()) - test.That(t, err, test.ShouldBeNil) - - audioData := &wave.Float32Interleaved{ - Data: []float32{ - 0.1, -0.5, 0.2, -0.6, 0.3, -0.7, 0.4, -0.8, 0.5, -0.9, 0.6, -1.0, 0.7, -1.1, 0.8, -1.2, - }, - Size: wave.ChunkInfo{8, 2, 48000}, - } - - injectAudioInput := &inject.AudioInput{} - - // good audio input - injectAudioInput.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.AudioStream, error) { - return gostream.NewEmbeddedAudioStreamFromReader(gostream.AudioReaderFunc(func(ctx context.Context) (wave.Audio, func(), error) { - return audioData, func() {}, nil - })), nil - } - expectedProps := prop.Audio{ - ChannelCount: 1, - SampleRate: 2, - IsBigEndian: true, - IsInterleaved: true, - Latency: 5, - } - injectAudioInput.MediaPropertiesFunc = func(ctx context.Context) (prop.Audio, error) { - return expectedProps, nil - } - // bad audio input - injectAudioInput2 := &inject.AudioInput{} - injectAudioInput2.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.AudioStream, error) { - return nil, errors.New("can't generate stream") - } - - resources := map[resource.Name]audioinput.AudioInput{ - audioinput.Named(testAudioInputName): injectAudioInput, - audioinput.Named(failAudioInputName): injectAudioInput2, - } - audioInputSvc, err := resource.NewAPIResourceCollection(audioinput.API, resources) - test.That(t, err, test.ShouldBeNil) - resourceAPI, ok, err := resource.LookupAPIRegistration[audioinput.AudioInput](audioinput.API) - test.That(t, err, test.ShouldBeNil) - test.That(t, ok, test.ShouldBeTrue) - test.That(t, resourceAPI.RegisterRPCService(context.Background(), rpcServer, audioInputSvc), test.ShouldBeNil) - - injectAudioInput.DoFunc = testutils.EchoFunc - - go rpcServer.Serve(listener1) - defer rpcServer.Stop() - - t.Run("Failing client", func(t *testing.T) { - cancelCtx, cancel := context.WithCancel(context.Background()) - cancel() - _, err := viamgrpc.Dial(cancelCtx, listener1.Addr().String(), logger) - test.That(t, err, test.ShouldNotBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, "canceled") - }) - - t.Run("audio input client 1", func(t *testing.T) { - conn, err := viamgrpc.Dial(context.Background(), listener1.Addr().String(), logger) - test.That(t, err, test.ShouldBeNil) - audioInput1Client, err := audioinput.NewClientFromConn(context.Background(), conn, "", audioinput.Named(testAudioInputName), logger) - test.That(t, err, test.ShouldBeNil) - chunk, _, err := gostream.ReadAudio(context.Background(), audioInput1Client) - test.That(t, err, test.ShouldBeNil) - - audioData := wave.NewFloat32Interleaved(chunk.ChunkInfo()) - // convert - for i := 0; i < chunk.ChunkInfo().Len; i++ { - for j := 0; j < chunk.ChunkInfo().Channels; j++ { - audioData.Set(i, j, chunk.At(i, j)) - } - } - - test.That(t, chunk, test.ShouldResemble, audioData) - - props, err := audioInput1Client.MediaProperties(context.Background()) - test.That(t, err, test.ShouldBeNil) - test.That(t, props, test.ShouldResemble, expectedProps) - - // DoCommand - resp, err := audioInput1Client.DoCommand(context.Background(), testutils.TestCommand) - test.That(t, err, test.ShouldBeNil) - test.That(t, resp["command"], test.ShouldEqual, testutils.TestCommand["command"]) - test.That(t, resp["data"], test.ShouldEqual, testutils.TestCommand["data"]) - - test.That(t, audioInput1Client.Close(context.Background()), test.ShouldBeNil) - test.That(t, conn.Close(), test.ShouldBeNil) - }) - - t.Run("audio input client 2", func(t *testing.T) { - conn, err := viamgrpc.Dial(context.Background(), listener1.Addr().String(), logger) - test.That(t, err, test.ShouldBeNil) - client2, err := resourceAPI.RPCClient(context.Background(), conn, "", audioinput.Named(failAudioInputName), logger) - test.That(t, err, test.ShouldBeNil) - - _, _, err = gostream.ReadAudio(context.Background(), client2) - test.That(t, err, test.ShouldNotBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, "can't generate stream") - - test.That(t, conn.Close(), test.ShouldBeNil) - }) -} - -func TestClientStreamAfterClose(t *testing.T) { - // Set up gRPC server - logger := logging.NewTestLogger(t) - listener, err := net.Listen("tcp", "localhost:0") - test.That(t, err, test.ShouldBeNil) - rpcServer, err := rpc.NewServer(logger, rpc.WithUnauthenticated()) - test.That(t, err, test.ShouldBeNil) - - // Set up audioinput that can stream audio - - audioData := &wave.Float32Interleaved{ - Data: []float32{ - 0.1, -0.5, 0.2, -0.6, 0.3, -0.7, 0.4, -0.8, 0.5, -0.9, 0.6, -1.0, 0.7, -1.1, 0.8, -1.2, - }, - Size: wave.ChunkInfo{8, 2, 48000}, - } - - injectAudioInput := &inject.AudioInput{} - - // good audio input - injectAudioInput.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.AudioStream, error) { - return gostream.NewEmbeddedAudioStreamFromReader(gostream.AudioReaderFunc(func(ctx context.Context) (wave.Audio, func(), error) { - return audioData, func() {}, nil - })), nil - } - - expectedProps := prop.Audio{ - ChannelCount: 1, - SampleRate: 2, - IsBigEndian: true, - IsInterleaved: true, - Latency: 5, - } - injectAudioInput.MediaPropertiesFunc = func(ctx context.Context) (prop.Audio, error) { - return expectedProps, nil - } - - // Register AudioInputService API in our gRPC server. - resources := map[resource.Name]audioinput.AudioInput{ - audioinput.Named(testAudioInputName): injectAudioInput, - } - audioinputSvc, err := resource.NewAPIResourceCollection(audioinput.API, resources) - test.That(t, err, test.ShouldBeNil) - resourceAPI, ok, err := resource.LookupAPIRegistration[audioinput.AudioInput](audioinput.API) - test.That(t, err, test.ShouldBeNil) - test.That(t, ok, test.ShouldBeTrue) - test.That(t, resourceAPI.RegisterRPCService(context.Background(), rpcServer, audioinputSvc), test.ShouldBeNil) - - // Start serving requests. - go rpcServer.Serve(listener) - defer rpcServer.Stop() - - // Make client connection - conn, err := viamgrpc.Dial(context.Background(), listener.Addr().String(), logger) - test.That(t, err, test.ShouldBeNil) - client, err := audioinput.NewClientFromConn(context.Background(), conn, "", audioinput.Named(testAudioInputName), logger) - test.That(t, err, test.ShouldBeNil) - - // Get a stream - stream, err := client.Stream(context.Background()) - test.That(t, stream, test.ShouldNotBeNil) - test.That(t, err, test.ShouldBeNil) - - // Read from stream - media, _, err := stream.Next(context.Background()) - test.That(t, media, test.ShouldNotBeNil) - test.That(t, err, test.ShouldBeNil) - - // Close client and read from stream - test.That(t, client.Close(context.Background()), test.ShouldBeNil) - media, _, err = stream.Next(context.Background()) - test.That(t, media, test.ShouldBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, "context canceled") - - // Get a new stream - stream, err = client.Stream(context.Background()) - test.That(t, stream, test.ShouldNotBeNil) - test.That(t, err, test.ShouldBeNil) - - // Read from the new stream - media, _, err = stream.Next(context.Background()) - test.That(t, media, test.ShouldNotBeNil) - test.That(t, err, test.ShouldBeNil) - - // Close client and connection - test.That(t, client.Close(context.Background()), test.ShouldBeNil) - test.That(t, conn.Close(), test.ShouldBeNil) -} diff --git a/components/audioinput/collectors.go b/components/audioinput/collectors.go deleted file mode 100644 index 93931cbc576..00000000000 --- a/components/audioinput/collectors.go +++ /dev/null @@ -1,38 +0,0 @@ -package audioinput - -import ( - "go.viam.com/rdk/data" -) - -type method int64 - -const ( - doCommand method = iota -) - -func (m method) String() string { - if m == doCommand { - return "DoCommand" - } - return "Unknown" -} - -// newDoCommandCollector returns a collector to register a doCommand action. If one is already registered -// with the same MethodMetadata it will panic. -func newDoCommandCollector(resource interface{}, params data.CollectorParams) (data.Collector, error) { - audioinput, err := assertAudioInput(resource) - if err != nil { - return nil, err - } - - cFunc := data.NewDoCommandCaptureFunc(audioinput, params) - return data.NewCollector(cFunc, params) -} - -func assertAudioInput(resource interface{}) (AudioInput, error) { - audioinput, ok := resource.(AudioInput) - if !ok { - return nil, data.InvalidInterfaceErr(API) - } - return audioinput, nil -} diff --git a/components/audioinput/collectors_test.go b/components/audioinput/collectors_test.go deleted file mode 100644 index 4d242d39323..00000000000 --- a/components/audioinput/collectors_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package audioinput_test - -import ( - "context" - "testing" - "time" - - audioinput "go.viam.com/rdk/components/audioinput" - datatu "go.viam.com/rdk/data/testutils" - "go.viam.com/rdk/testutils/inject" -) - -const ( - componentName = "audioinput" - captureInterval = time.Millisecond -) - -var doCommandMap = map[string]any{"readings": "random-test"} - -func TestDoCommandCollector(t *testing.T) { - datatu.TestDoCommandCollector(t, datatu.DoCommandTestConfig{ - ComponentName: componentName, - CaptureInterval: captureInterval, - DoCommandMap: doCommandMap, - Collector: audioinput.NewDoCommandCollector, - ResourceFactory: func() interface{} { return newAudioInput() }, - }) -} - -func newAudioInput() audioinput.AudioInput { - ai := &inject.AudioInput{} - ai.DoFunc = func(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) { - return doCommandMap, nil - } - return ai -} diff --git a/components/audioinput/export_collectors_test.go b/components/audioinput/export_collectors_test.go deleted file mode 100644 index dee56ec177f..00000000000 --- a/components/audioinput/export_collectors_test.go +++ /dev/null @@ -1,7 +0,0 @@ -// export_collectors_test.go adds functionality to the package that we only want to use and expose during testing. -package audioinput - -// Exported variables for testing collectors, see unexported collectors for implementation details. -var ( - NewDoCommandCollector = newDoCommandCollector -) diff --git a/components/audioinput/fake/audio_input.go b/components/audioinput/fake/audio_input.go deleted file mode 100644 index 860aed83a59..00000000000 --- a/components/audioinput/fake/audio_input.go +++ /dev/null @@ -1,161 +0,0 @@ -//go:build !no_cgo - -// Package fake implements a fake audio input. -package fake - -import ( - "context" - "encoding/binary" - "math" - "sync" - "sync/atomic" - "time" - - "github.com/pion/mediadevices/pkg/prop" - "github.com/pion/mediadevices/pkg/wave" - "go.viam.com/utils" - - "go.viam.com/rdk/components/audioinput" - "go.viam.com/rdk/gostream" - "go.viam.com/rdk/logging" - "go.viam.com/rdk/resource" -) - -func init() { - resource.RegisterComponent( - audioinput.API, - resource.DefaultModelFamily.WithModel("fake"), - resource.Registration[audioinput.AudioInput, resource.NoNativeConfig]{Constructor: func( - _ context.Context, - _ resource.Dependencies, - conf resource.Config, - logger logging.Logger, - ) (audioinput.AudioInput, error) { - cancelCtx, cancelFunc := context.WithCancel(context.Background()) - var condMu sync.RWMutex - cond := sync.NewCond(condMu.RLocker()) - input := &audioInput{ - Named: conf.ResourceName().AsNamed(), - toneHz: 440, - cancel: cancelFunc, - cancelCtx: cancelCtx, - cond: cond, - } - input.activeBackgroundWorkers.Add(1) - utils.ManagedGo(func() { - ticker := time.NewTicker(latencyMillis * time.Millisecond) - for { - if !utils.SelectContextOrWaitChan(cancelCtx, ticker.C) { - return - } - atomic.AddInt64(&input.step, 1) - cond.Broadcast() - } - }, input.activeBackgroundWorkers.Done) - as := gostream.NewAudioSource(gostream.AudioReaderFunc(input.Read), prop.Audio{ - ChannelCount: channelCount, - SampleRate: samplingRate, - IsBigEndian: audioinput.HostEndian == binary.BigEndian, - IsInterleaved: true, - Latency: time.Millisecond * latencyMillis, - }) - input.AudioSource = as - return audioinput.FromAudioSource(conf.ResourceName(), input) - }}) -} - -// audioInput is a fake audioinput that always returns the same chunk. -type audioInput struct { - resource.Named - resource.TriviallyReconfigurable - gostream.AudioSource - mu sync.RWMutex - step int64 - toneHz float64 - cancel func() - cancelCtx context.Context - activeBackgroundWorkers sync.WaitGroup - cond *sync.Cond -} - -const ( - latencyMillis = 20 - samplingRate = 48000 - channelCount = 1 -) - -func (i *audioInput) Read(ctx context.Context) (wave.Audio, func(), error) { - select { - case <-i.cancelCtx.Done(): - return nil, nil, i.cancelCtx.Err() - case <-ctx.Done(): - return nil, nil, ctx.Err() - default: - } - - i.cond.L.Lock() - i.cond.Wait() - i.cond.L.Unlock() - - select { - case <-i.cancelCtx.Done(): - return nil, nil, i.cancelCtx.Err() - case <-ctx.Done(): - return nil, nil, ctx.Err() - default: - } - - const length = samplingRate * latencyMillis / 1000 - const numChunks = samplingRate / length - angle := math.Pi * 2 / (float64(length) * numChunks) - - i.mu.RLock() - toneHz := i.toneHz - i.mu.RUnlock() - - step := int(atomic.LoadInt64(&i.step) % numChunks) - chunk := wave.NewFloat32Interleaved(wave.ChunkInfo{ - Len: length, - Channels: channelCount, - SamplingRate: samplingRate, - }) - - for sample := 0; sample < length; sample++ { - val := wave.Float32Sample(math.Sin(angle * toneHz * (float64((length * step) + sample)))) - chunk.Set(sample, 0, val) - } - return chunk, func() {}, nil -} - -func (i *audioInput) MediaProperties(_ context.Context) (prop.Audio, error) { - return prop.Audio{ - ChannelCount: channelCount, - SampleRate: samplingRate, - IsBigEndian: audioinput.HostEndian == binary.BigEndian, - IsInterleaved: true, - Latency: time.Millisecond * latencyMillis, - }, nil -} - -// DoCommand allows setting of tone. -func (i *audioInput) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) { - i.mu.Lock() - defer i.mu.Unlock() - newTone, ok := cmd["set_tone_hz"].(float64) - if !ok { - return map[string]interface{}{}, nil - } - oldTone := i.toneHz - i.toneHz = newTone - return map[string]interface{}{"prev_tone_hz": oldTone}, nil -} - -// Close stops the generator routine. -func (i *audioInput) Close(ctx context.Context) error { - i.cancel() - i.activeBackgroundWorkers.Wait() - i.cond.L.Lock() - i.cond.Signal() - i.cond.L.Unlock() - return i.AudioSource.Close(ctx) -} diff --git a/components/audioinput/microphone/microphone.go b/components/audioinput/microphone/microphone.go deleted file mode 100644 index d305dc92a95..00000000000 --- a/components/audioinput/microphone/microphone.go +++ /dev/null @@ -1,131 +0,0 @@ -//go:build !no_cgo - -// Package microphone implements a microphone audio input. Really the microphone -// is any audio input device that can be found via gostream. -package microphone - -import ( - "context" - "errors" - "path/filepath" - "regexp" - - "github.com/pion/mediadevices" - - "go.viam.com/rdk/components/audioinput" - "go.viam.com/rdk/gostream" - "go.viam.com/rdk/logging" - "go.viam.com/rdk/resource" -) - -var model = resource.DefaultModelFamily.WithModel("microphone") - -func init() { - resource.RegisterComponent( - audioinput.API, - model, - resource.Registration[audioinput.AudioInput, *Config]{ - Constructor: func( - _ context.Context, - _ resource.Dependencies, - conf resource.Config, - logger logging.Logger, - ) (audioinput.AudioInput, error) { - newConf, err := resource.NativeConfig[*Config](conf) - if err != nil { - return nil, err - } - src, err := newMicrophoneSource(newConf, logger) - if err != nil { - return nil, err - } - // This always rebuilds on reconfiguration right now. A better system - // would be to reuse the monitored webcam code. - return audioinput.FromAudioSource(conf.ResourceName(), src) - }, - }) -} - -// Config is the attribute struct for microphones. -type Config struct { - resource.TriviallyValidateConfig - Path string `json:"audio_path"` - PathPattern string `json:"audio_path_pattern"` - Debug bool `json:"debug"` -} - -// newMicrophoneSource returns a new source based on a microphone discovered from the given attributes. -func newMicrophoneSource(conf *Config, logger logging.Logger) (audioinput.AudioSource, error) { - var err error - - debug := conf.Debug - - constraints := mediadevices.MediaTrackConstraints{} - audioConstraints := constraints.MediaConstraints.AudioConstraints - audioOption := func(trackConstraints *mediadevices.MediaTrackConstraints) { - trackConstraints.AudioConstraints = audioConstraints - } - audioStreamConstraints := mediadevices.MediaStreamConstraints{ - Audio: audioOption, - } - - if conf.Path != "" { - return tryMicrophoneOpen(conf.Path, audioStreamConstraints, logger) - } - - var pattern *regexp.Regexp - if conf.PathPattern != "" { - pattern, err = regexp.Compile(conf.PathPattern) - if err != nil { - return nil, err - } - } - all := gostream.QueryAudioDevices() - - for _, info := range all { - logger.Debugf("%s", info.ID) - logger.Debugf("\t labels: %v", info.Labels) - for _, label := range info.Labels { - if pattern != nil && !pattern.MatchString(label) { - if debug { - logger.Debug("\t skipping because of pattern") - } - continue - } - for _, p := range info.Properties { - logger.Debugf("\t %+v", p.Audio) - if p.Audio.ChannelCount == 0 { - if debug { - logger.Debug("\t skipping because audio channels are empty") - } - continue - } - s, err := tryMicrophoneOpen(label, audioStreamConstraints, logger) - if err == nil { - if debug { - logger.Debug("\t USING") - } - return s, nil - } - if debug { - logger.Debugw("cannot open driver with properties", "properties", p, - "error", err) - } - } - } - } - return nil, errors.New("found no microphones") -} - -func tryMicrophoneOpen( - path string, - constraints mediadevices.MediaStreamConstraints, - logger logging.Logger, -) (audioinput.AudioSource, error) { - source, err := gostream.GetNamedAudioSource(filepath.Base(path), constraints, logger) - if err != nil { - return nil, err - } - // TODO(XXX): implement LivenessMonitor - return audioinput.NewAudioSourceFromGostreamSource(source) -} diff --git a/components/audioinput/register/register.go b/components/audioinput/register/register.go deleted file mode 100644 index 7dee5cf2d6b..00000000000 --- a/components/audioinput/register/register.go +++ /dev/null @@ -1,10 +0,0 @@ -//go:build !no_cgo - -// Package register registers all relevant audio inputs and also API specific functions -package register - -import ( - // for audio inputs. - _ "go.viam.com/rdk/components/audioinput/fake" - _ "go.viam.com/rdk/components/audioinput/microphone" -) diff --git a/components/audioinput/server.go b/components/audioinput/server.go deleted file mode 100644 index 6eb81472066..00000000000 --- a/components/audioinput/server.go +++ /dev/null @@ -1,320 +0,0 @@ -package audioinput - -import ( - "bytes" - "context" - "encoding/binary" - "fmt" - "io" - "time" - "unsafe" - - "github.com/go-audio/audio" - "github.com/go-audio/transforms" - "github.com/go-audio/wav" - "github.com/pion/mediadevices/pkg/wave" - "github.com/pkg/errors" - commonpb "go.viam.com/api/common/v1" - pb "go.viam.com/api/component/audioinput/v1" - "go.viam.com/utils" - "go.viam.com/utils/trace" - "google.golang.org/genproto/googleapis/api/httpbody" - "google.golang.org/protobuf/types/known/durationpb" - "gopkg.in/src-d/go-billy.v4/memfs" - - "go.viam.com/rdk/protoutils" - "go.viam.com/rdk/resource" -) - -// HostEndian indicates the byte ordering this host natively uses. -var HostEndian binary.ByteOrder - -func init() { - // from github.com/pion/mediadevices/pkg/wave/decoder.go - //nolint:gosec - switch v := *(*uint16)(unsafe.Pointer(&([]byte{0x12, 0x34}[0]))); v { - case 0x1234: - HostEndian = binary.BigEndian - case 0x3412: - HostEndian = binary.LittleEndian - default: - panic(fmt.Sprintf("failed to determine host endianness: %x", v)) - } -} - -// serviceServer implements the AudioInputService from audioinput.proto. -type serviceServer struct { - pb.UnimplementedAudioInputServiceServer - coll resource.APIResourceGetter[AudioInput] -} - -// NewRPCServiceServer constructs an audio input gRPC service server. -// It is intentionally untyped to prevent use outside of tests. -func NewRPCServiceServer(coll resource.APIResourceGetter[AudioInput]) interface{} { - return &serviceServer{coll: coll} -} - -// Chunks returns audio chunks (samples) forever from an audio input of the underlying robot. A specific sampling -// format can be requested but may not necessarily be the same one returned. -func (s *serviceServer) Chunks(req *pb.ChunksRequest, server pb.AudioInputService_ChunksServer) error { - audioInput, err := s.coll.Resource(req.Name) - if err != nil { - return err - } - - chunkStream, err := audioInput.Stream(server.Context()) - if err != nil { - return err - } - defer func() { - utils.UncheckedError(chunkStream.Close(server.Context())) - }() - - firstChunk, release, err := chunkStream.Next(server.Context()) - if err != nil { - return err - } - info := firstChunk.ChunkInfo() - release() - - var sf wave.SampleFormat - sfProto := req.SampleFormat - switch req.SampleFormat { - case pb.SampleFormat_SAMPLE_FORMAT_UNSPECIFIED: - sfProto = pb.SampleFormat_SAMPLE_FORMAT_INT16_INTERLEAVED - fallthrough - case pb.SampleFormat_SAMPLE_FORMAT_INT16_INTERLEAVED: - sf = wave.Int16SampleFormat - case pb.SampleFormat_SAMPLE_FORMAT_FLOAT32_INTERLEAVED: - sf = wave.Float32SampleFormat - default: - return errors.Errorf("unknown type of audio sample format %v", req.SampleFormat) - } - - if err := server.Send(&pb.ChunksResponse{ - Type: &pb.ChunksResponse_Info{ - Info: &pb.AudioChunkInfo{ - SampleFormat: sfProto, - Channels: uint32(info.Channels), - SamplingRate: int64(info.SamplingRate), - }, - }, - }); err != nil { - return err - } - - sendNextChunk := func() error { - chunk, release, err := chunkStream.Next(server.Context()) - if err != nil { - return err - } - defer release() - - var outBytes []byte - switch c := chunk.(type) { - case *wave.Int16Interleaved: - outBytes = make([]byte, len(c.Data)*2) - buf := bytes.NewBuffer(outBytes[:0]) - chunkCopy := c - if sf != chunk.SampleFormat() { - chunkCopy = wave.NewInt16Interleaved(info) - // convert - for i := 0; i < c.Size.Len; i++ { - for j := 0; j < c.Size.Channels; j++ { - chunkCopy.Set(i, j, chunk.At(i, j)) - } - } - } - if err := binary.Write(buf, HostEndian, chunkCopy.Data); err != nil { - return err - } - case *wave.Float32Interleaved: - outBytes = make([]byte, len(c.Data)*4) - buf := bytes.NewBuffer(outBytes[:0]) - chunkCopy := c - if sf != chunk.SampleFormat() { - chunkCopy = wave.NewFloat32Interleaved(info) - // convert - for i := 0; i < c.Size.Len; i++ { - for j := 0; j < c.Size.Channels; j++ { - chunkCopy.Set(i, j, chunk.At(i, j)) - } - } - } - if err := binary.Write(buf, HostEndian, chunkCopy.Data); err != nil { - return err - } - default: - return errors.Errorf("unknown type of audio buffer %T", chunk) - } - - return server.Send(&pb.ChunksResponse{ - Type: &pb.ChunksResponse_Chunk{ - Chunk: &pb.AudioChunk{ - Data: outBytes, - Length: uint32(info.Len), - }, - }, - }) - } - - for { - if err := sendNextChunk(); err != nil { - return err - } - } -} - -// Properties returns properties of an audio input of the underlying robot. -func (s *serviceServer) Properties( - ctx context.Context, - req *pb.PropertiesRequest, -) (*pb.PropertiesResponse, error) { - audioInput, err := s.coll.Resource(req.Name) - if err != nil { - return nil, err - } - - props, err := audioInput.MediaProperties(ctx) - if err != nil { - return nil, err - } - - return &pb.PropertiesResponse{ - ChannelCount: uint32(props.ChannelCount), - Latency: durationpb.New(props.Latency), - SampleRate: uint32(props.SampleRate), - SampleSize: uint32(props.SampleSize), - IsBigEndian: props.IsBigEndian, - IsFloat: props.IsFloat, - IsInterleaved: props.IsInterleaved, - }, nil -} - -// Record renders an audio chunk from an audio input of the underlying robot -// to an HTTP response. A specific MIME type cannot be requested and may not necessarily -// be the same one returned each time. -func (s *serviceServer) Record( - ctx context.Context, - req *pb.RecordRequest, -) (*httpbody.HttpBody, error) { - ctx, span := trace.StartSpan(ctx, "audioinput::server::Record") - defer span.End() - audioInput, err := s.coll.Resource(req.Name) - if err != nil { - return nil, err - } - - chunkStream, err := audioInput.Stream(ctx) - if err != nil { - return nil, err - } - defer func() { - utils.UncheckedError(chunkStream.Close(ctx)) - }() - - firstChunk, release, err := chunkStream.Next(ctx) - if err != nil { - return nil, err - } - info := firstChunk.ChunkInfo() - release() - - duration := req.Duration.AsDuration() - if duration == 0 { - duration = time.Second - } - if duration > 5*time.Second { - return nil, errors.New("can only record up to 5 seconds") - } - - ms := memfs.New() - fd, err := ms.Create("dummy") - if err != nil { - return nil, err - } - - wavEnc := wav.NewEncoder(fd, - info.SamplingRate, - 24, - info.Channels, - 1, // PCM - ) - - nextChunk := func() error { - chunk, release, err := chunkStream.Next(ctx) - if err != nil { - return err - } - defer release() - - switch c := chunk.(type) { - case *wave.Int16Interleaved: - cData := make([]int, len(c.Data)) - for i := 0; i < len(c.Data); i++ { - cData[i] = int(c.Data[i]) - } - buf := &audio.IntBuffer{ - Format: &audio.Format{ - NumChannels: info.Channels, - SampleRate: info.SamplingRate, - }, - Data: cData, - SourceBitDepth: 16, - } - - return wavEnc.Write(buf) - case *wave.Float32Interleaved: - dataCopy := make([]float32, len(c.Data)) - copy(dataCopy, c.Data) - buf := &audio.Float32Buffer{ - Format: &audio.Format{ - NumChannels: info.Channels, - SampleRate: info.SamplingRate, - }, - Data: dataCopy, - SourceBitDepth: 32, - } - if err := transforms.PCMScaleF32(buf, 24); err != nil { - return err - } - - return wavEnc.Write(buf.AsIntBuffer()) - default: - return errors.Errorf("unknown type of audio buffer %T", chunk) - } - } - numChunks := int(duration.Seconds() * float64(info.SamplingRate/info.Len)) - for i := 0; i < numChunks; i++ { - if err := nextChunk(); err != nil { - return nil, err - } - } - - if err := wavEnc.Close(); err != nil { - return nil, err - } - if _, err := fd.Seek(0, io.SeekStart); err != nil { - return nil, err - } - rd, err := io.ReadAll(fd) - if err != nil { - return nil, err - } - - return &httpbody.HttpBody{ - ContentType: "audio/wav", - Data: rd, - }, nil -} - -// DoCommand receives arbitrary commands. -func (s *serviceServer) DoCommand(ctx context.Context, - req *commonpb.DoCommandRequest, -) (*commonpb.DoCommandResponse, error) { - audioInput, err := s.coll.Resource(req.GetName()) - if err != nil { - return nil, err - } - return protoutils.DoFromResourceServer(ctx, audioInput, req) -} diff --git a/components/audioinput/verify_main_test.go b/components/audioinput/verify_main_test.go deleted file mode 100644 index beafd196237..00000000000 --- a/components/audioinput/verify_main_test.go +++ /dev/null @@ -1,12 +0,0 @@ -package audioinput - -import ( - "testing" - - testutilsext "go.viam.com/utils/testutils/ext" -) - -// TestMain is used to control the execution of all tests run within this package (including _test packages). -func TestMain(m *testing.M) { - testutilsext.VerifyTestMain(m) -} diff --git a/components/camera/client_test.go b/components/camera/client_test.go index 7c539c24b20..9de10cdc19b 100644 --- a/components/camera/client_test.go +++ b/components/camera/client_test.go @@ -24,7 +24,6 @@ import ( "go.viam.com/rdk/config" "go.viam.com/rdk/data" "go.viam.com/rdk/gostream" - "go.viam.com/rdk/gostream/codec/opus" "go.viam.com/rdk/gostream/codec/x264" viamgrpc "go.viam.com/rdk/grpc" "go.viam.com/rdk/logging" @@ -717,7 +716,6 @@ func setupRealRobot( // We initialize with a stream config such that the stream server is capable of creating video stream and // audio stream data. webSvc := web.New(robot, logger, web.WithStreamConfig(gostream.StreamConfig{ - AudioEncoderFactory: opus.NewEncoderFactory(), VideoEncoderFactory: x264.NewEncoderFactory(), })) options, _, addr := robottestutils.CreateBaseOptionsAndListener(t) @@ -742,7 +740,6 @@ func setupRealRobotWithOptions( // We initialize with a stream config such that the stream server is capable of creating video stream and // audio stream data. webSvc := web.New(robot, logger, web.WithStreamConfig(gostream.StreamConfig{ - AudioEncoderFactory: opus.NewEncoderFactory(), VideoEncoderFactory: x264.NewEncoderFactory(), })) err = webSvc.Start(ctx, options) diff --git a/components/register/all_cgo.go b/components/register/all_cgo.go index 5395a9552ec..5f59312468b 100644 --- a/components/register/all_cgo.go +++ b/components/register/all_cgo.go @@ -5,5 +5,4 @@ package register import ( // blank import registration pattern. _ "go.viam.com/rdk/components/arm/register" - _ "go.viam.com/rdk/components/audioinput/register" ) diff --git a/etc/lint_register_apis.sh b/etc/lint_register_apis.sh index 82b91152621..853bb084b2a 100755 --- a/etc/lint_register_apis.sh +++ b/etc/lint_register_apis.sh @@ -15,7 +15,7 @@ pkgs=(components services) # package can be used by modules and other client code without imposing a # dependency on cgo, further requiring installed libraries and properly # configured CGO_... environment variables for Go compilation to succeed. -cgo_paths=(services/motion services/vision components/camera components/audioinput) +cgo_paths=(services/motion services/vision components/camera) for p in "${pkgs[@]}"; do pushd $p > /dev/null diff --git a/etc/setup.sh b/etc/setup.sh index 6d1e1e10654..e3f5ae261d2 100755 --- a/etc/setup.sh +++ b/etc/setup.sh @@ -138,7 +138,7 @@ mod_profiles(){ check_gcloud_auth(){ APP_CREDENTIALS_DIR="$HOME/.config/gcloud" mkdir -p $APP_CREDENTIALS_DIR - APP_CREDENTIALS_FILE="$APP_CREDENTIALS_DIR/application_default_credentials.json" + APP_CREDENTIALS_FILE="$APP_CREDENTIALS_DIR/application_default_credentials.json" if [ ! -f "$APP_CREDENTIALS_FILE" ]; then echo "Missing gcloud application default credentials, this can cause goroutines to leak if not configured. Creating with empty config at $APP_CREDENTIALS_FILE" echo '{"client_id":"XXXX","client_secret":"XXXX","refresh_token":"XXXX","type":"authorized_user"}' > $APP_CREDENTIALS_FILE diff --git a/go.mod b/go.mod index d84fab010bd..049542c7bad 100644 --- a/go.mod +++ b/go.mod @@ -31,9 +31,6 @@ require ( github.com/fogleman/gg v1.3.0 github.com/fsnotify/fsnotify v1.9.0 github.com/fullstorydev/grpcurl v1.8.6 - github.com/go-audio/audio v1.0.0 - github.com/go-audio/transforms v0.0.0-20180121090939-51830ccc35a5 - github.com/go-audio/wav v1.1.0 github.com/go-co-op/gocron/v2 v2.18.0 github.com/go-git/go-git/v5 v5.16.2 github.com/go-gl/mathgl v1.0.0 @@ -118,7 +115,6 @@ require ( google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1 google.golang.org/protobuf v1.36.10 gopkg.in/natefinch/lumberjack.v2 v2.2.1 - gopkg.in/src-d/go-billy.v4 v4.3.2 gorgonia.org/tensor v0.9.24 gotest.tools/gotestsum v1.12.2 periph.io/x/conn/v3 v3.7.0 @@ -205,7 +201,6 @@ require ( github.com/felixge/httpsnoop v1.0.4 // indirect github.com/gen2brain/malgo v0.11.24 // indirect github.com/gin-gonic/gin v1.9.1 // indirect - github.com/go-audio/riff v1.0.0 // indirect github.com/go-chi/chi/v5 v5.2.2 // indirect github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect github.com/go-git/go-billy/v5 v5.6.2 // indirect diff --git a/go.sum b/go.sum index 5a3612799fe..d0e5a60ee2b 100644 --- a/go.sum +++ b/go.sum @@ -352,14 +352,6 @@ github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg= github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU= -github.com/go-audio/audio v1.0.0 h1:zS9vebldgbQqktK4H0lUqWrG8P0NxCJVqcj7ZpNnwd4= -github.com/go-audio/audio v1.0.0/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= -github.com/go-audio/riff v1.0.0 h1:d8iCGbDvox9BfLagY94fBynxSPHO80LmZCaOsmKxokA= -github.com/go-audio/riff v1.0.0/go.mod h1:l3cQwc85y79NQFCRB7TiPoNiaijp6q8Z0Uv38rVG498= -github.com/go-audio/transforms v0.0.0-20180121090939-51830ccc35a5 h1:acgZxkn6oSJCh/snMQdZYuOeroSbZHdOinIa1n251Wk= -github.com/go-audio/transforms v0.0.0-20180121090939-51830ccc35a5/go.mod h1:z9ahC4nc9/kxKfl1BnTZ/D2Cm5TbhjR2LeuUpepL9zI= -github.com/go-audio/wav v1.1.0 h1:jQgLtbqBzY7G+BM8fXF7AHUk1uHUviWS4X39d5rsL2g= -github.com/go-audio/wav v1.1.0/go.mod h1:mpe9qfwbScEbkd8uybLuIpTgHyrISw/OTuvjUW2iGtE= github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618= github.com/go-chi/chi/v5 v5.2.2/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= github.com/go-co-op/gocron/v2 v2.18.0 h1:DS3Uhru66q1jy/5f9V0itmi3cLXcn2b7N+duGfgT7gU= @@ -646,7 +638,6 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= @@ -1543,8 +1534,6 @@ gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= -gopkg.in/src-d/go-billy.v4 v4.3.2 h1:0SQA1pRztfTFx2miS8sA97XvooFeNOmvUenF4o0EcVg= -gopkg.in/src-d/go-billy.v4 v4.3.2/go.mod h1:nDjArDMp+XMs1aFAESLRjfGSgfvoYN0hDfzEk0GjC98= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME= gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= diff --git a/gostream/audio.go b/gostream/audio.go deleted file mode 100644 index 207a128ec05..00000000000 --- a/gostream/audio.go +++ /dev/null @@ -1,59 +0,0 @@ -package gostream - -import ( - "context" - - "github.com/pion/mediadevices/pkg/driver" - "github.com/pion/mediadevices/pkg/prop" - "github.com/pion/mediadevices/pkg/wave" -) - -type ( - // An AudioReader is anything that can read and recycle audio data. - AudioReader = MediaReader[wave.Audio] - - // An AudioReaderFunc is a helper to turn a function into an AudioReader. - AudioReaderFunc = MediaReaderFunc[wave.Audio] - - // An AudioSource is responsible for producing audio chunks when requested. A source - // should produce the chunk as quickly as possible and introduce no rate limiting - // of its own as that is handled internally. - AudioSource = MediaSource[wave.Audio] - - // An AudioStream streams audio forever until closed. - AudioStream = MediaStream[wave.Audio] - - // AudioPropertyProvider providers information about an audio source. - AudioPropertyProvider = MediaPropertyProvider[prop.Audio] -) - -// NewAudioSource instantiates a new audio read closer. -func NewAudioSource(r AudioReader, p prop.Audio) AudioSource { - return newMediaSource(nil, r, p) -} - -// NewAudioSourceForDriver instantiates a new audio read closer and references the given -// driver. -func NewAudioSourceForDriver(d driver.Driver, r AudioReader, p prop.Audio) AudioSource { - return newMediaSource(d, r, p) -} - -// ReadAudio gets a single audio wave from an audio source. Using this has less of a guarantee -// than AudioSource.Stream that the Nth wave follows the N-1th wave. -func ReadAudio(ctx context.Context, source AudioSource) (wave.Audio, func(), error) { - return ReadMedia(ctx, source) -} - -// NewEmbeddedAudioStream returns an audio stream from an audio source that is -// intended to be embedded/composed by another source. It defers the creation -// of its stream. -func NewEmbeddedAudioStream(src AudioSource) AudioStream { - return NewEmbeddedMediaStream[wave.Audio, prop.Audio](src) -} - -// NewEmbeddedAudioStreamFromReader returns an audio stream from an audio reader that is -// intended to be embedded/composed by another source. It defers the creation -// of its stream. -func NewEmbeddedAudioStreamFromReader(reader AudioReader) AudioStream { - return NewEmbeddedMediaStreamFromReader(reader, prop.Audio{}) -} diff --git a/gostream/codec/audio_encoder.go b/gostream/codec/audio_encoder.go deleted file mode 100644 index 91e94864cbc..00000000000 --- a/gostream/codec/audio_encoder.go +++ /dev/null @@ -1,24 +0,0 @@ -package codec - -import ( - "context" - "time" - - "github.com/pion/mediadevices/pkg/wave" - - "go.viam.com/rdk/logging" -) - -// An AudioEncoder is anything that can encode audo chunks into bytes. This means that -// the encoder must follow some type of format dictated by a type (see AudioEncoderFactory.MimeType). -// An encoder that produces bytes of different encoding formats per call is invalid. -type AudioEncoder interface { - Encode(ctx context.Context, chunk wave.Audio) ([]byte, bool, error) - Close() -} - -// An AudioEncoderFactory produces AudioEncoders and provides information about the underlying encoder itself. -type AudioEncoderFactory interface { - New(sampleRate, channelCount int, latency time.Duration, logger logging.Logger) (AudioEncoder, error) - MIMEType() string -} diff --git a/gostream/codec/opus/encoder.go b/gostream/codec/opus/encoder.go deleted file mode 100644 index 6f44c05f17a..00000000000 --- a/gostream/codec/opus/encoder.go +++ /dev/null @@ -1,142 +0,0 @@ -// Package opus contains the opus video codec. -package opus - -import ( - "context" - "io" - "sync" - "time" - - "github.com/pion/mediadevices/pkg/codec" - "github.com/pion/mediadevices/pkg/codec/opus" - "github.com/pion/mediadevices/pkg/prop" - "github.com/pion/mediadevices/pkg/wave" - "go.viam.com/utils" - - ourcodec "go.viam.com/rdk/gostream/codec" - "go.viam.com/rdk/logging" -) - -type encoder struct { - codec codec.ReadCloser - chunkCh chan wave.Audio - encodedCh chan encodedData - logger logging.Logger - cancelCtx context.Context - cancelFunc func() - activeBackgroundWorkers sync.WaitGroup -} - -// Gives suitable results. Probably want to make this configurable this in the future. -const bitrate = 32000 - -type encodedData struct { - data []byte - err error -} - -// NewEncoder returns an Opus encoder that can encode images of the given width and height. It will -// also ensure that it produces key frames at the given interval. -func NewEncoder(sampleRate, channelCount int, latency time.Duration, logger logging.Logger) (ourcodec.AudioEncoder, error) { - cancelCtx, cancelFunc := context.WithCancel(context.Background()) - enc := &encoder{ - chunkCh: make(chan wave.Audio, 1), - encodedCh: make(chan encodedData, 1), - logger: logger, - cancelCtx: cancelCtx, - cancelFunc: cancelFunc, - } - - var builder codec.AudioEncoderBuilder - params, err := opus.NewParams() - if err != nil { - return nil, err - } - builder = ¶ms - params.BitRate = bitrate - params.Latency = opus.Latency(latency) - - codec, err := builder.BuildAudioEncoder(enc, prop.Media{ - Audio: prop.Audio{ - Latency: latency, - SampleRate: sampleRate, - ChannelCount: channelCount, - }, - }) - if err != nil { - return nil, err - } - enc.codec = codec - - enc.activeBackgroundWorkers.Add(1) - utils.ManagedGo(func() { - for { - if cancelCtx.Err() != nil { - return - } - data, release, err := enc.codec.Read() - dataCopy := make([]byte, len(data)) - copy(dataCopy, data) - release() - - select { - case <-cancelCtx.Done(): - return - case enc.encodedCh <- encodedData{dataCopy, err}: - } - } - }, func() { - defer enc.activeBackgroundWorkers.Done() - close(enc.encodedCh) - }) - - return enc, nil -} - -// Read returns an audio chunk for codec to process. -func (a *encoder) Read() (chunk wave.Audio, release func(), err error) { - if err := a.cancelCtx.Err(); err != nil { - return nil, func() {}, err - } - - select { - case <-a.cancelCtx.Done(): - return nil, func() {}, io.EOF - case chunk := <-a.chunkCh: - return chunk, func() {}, nil - } -} - -// Encode asks the codec to process the given audio chunk. -func (a *encoder) Encode(ctx context.Context, chunk wave.Audio) ([]byte, bool, error) { - defer func() { - select { - case <-ctx.Done(): - return - case <-a.cancelCtx.Done(): - return - case a.chunkCh <- chunk: - } - }() - if err := a.cancelCtx.Err(); err != nil { - return nil, false, err - } - select { - case <-ctx.Done(): - return nil, false, ctx.Err() - case <-a.cancelCtx.Done(): - return nil, false, a.cancelCtx.Err() - case encoded := <-a.encodedCh: - if encoded.err != nil { - return nil, false, encoded.err - } - return encoded.data, true, nil - default: - return nil, false, nil - } -} - -func (a *encoder) Close() { - a.cancelFunc() - a.activeBackgroundWorkers.Wait() -} diff --git a/gostream/codec/opus/utils.go b/gostream/codec/opus/utils.go deleted file mode 100644 index 7d42eadb103..00000000000 --- a/gostream/codec/opus/utils.go +++ /dev/null @@ -1,31 +0,0 @@ -package opus - -import ( - "time" - - "go.viam.com/rdk/gostream" - "go.viam.com/rdk/gostream/codec" - "go.viam.com/rdk/logging" -) - -// DefaultStreamConfig configures Opus as the audio encoder for a stream. -var DefaultStreamConfig gostream.StreamConfig - -func init() { - DefaultStreamConfig.AudioEncoderFactory = NewEncoderFactory() -} - -// NewEncoderFactory returns an Opus audio encoder factory. -func NewEncoderFactory() codec.AudioEncoderFactory { - return &factory{} -} - -type factory struct{} - -func (f *factory) New(sampleRate, channelCount int, latency time.Duration, logger logging.Logger) (codec.AudioEncoder, error) { - return NewEncoder(sampleRate, channelCount, latency, logger) -} - -func (f *factory) MIMEType() string { - return "audio/opus" -} diff --git a/gostream/codec/video_encoder.go b/gostream/codec/video_encoder.go index 6f479cb49bc..15f14c736f1 100644 --- a/gostream/codec/video_encoder.go +++ b/gostream/codec/video_encoder.go @@ -1,4 +1,4 @@ -// Package codec defines the encoder and factory interfaces for encoding video frames and audio chunks. +// Package codec defines the encoder and factory interfaces for encoding video frames. package codec import ( diff --git a/gostream/query.go b/gostream/query.go index 9e1049bcf79..bdc351fa045 100644 --- a/gostream/query.go +++ b/gostream/query.go @@ -13,7 +13,6 @@ import ( "github.com/pion/mediadevices/pkg/driver/availability" "github.com/pion/mediadevices/pkg/frame" "github.com/pion/mediadevices/pkg/prop" - "github.com/pion/mediadevices/pkg/wave" "github.com/pkg/errors" "go.viam.com/rdk/logging" @@ -121,44 +120,6 @@ func GetAnyVideoSource( return newVideoSourceFromDriver(d, selectedMedia) } -// GetAnyAudioSource attempts to find any suitable audio device. -func GetAnyAudioSource( - constraints mediadevices.MediaStreamConstraints, - logger logging.Logger, -) (MediaSource[wave.Audio], error) { - d, selectedMedia, err := getUserAudioDriver(constraints, nil, logger) - if err != nil { - return nil, err - } - return newAudioSourceFromDriver(d, selectedMedia, logger) -} - -// GetNamedAudioSource attempts to find an audio device by the given name. -func GetNamedAudioSource( - name string, - constraints mediadevices.MediaStreamConstraints, - logger logging.Logger, -) (MediaSource[wave.Audio], error) { - d, selectedMedia, err := getUserAudioDriver(constraints, &name, logger) - if err != nil { - return nil, err - } - return newAudioSourceFromDriver(d, selectedMedia, logger) -} - -// GetPatternedAudioSource attempts to find an audio device by the given label pattern. -func GetPatternedAudioSource( - labelPattern *regexp.Regexp, - constraints mediadevices.MediaStreamConstraints, - logger logging.Logger, -) (MediaSource[wave.Audio], error) { - d, selectedMedia, err := getUserAudioDriverPattern(constraints, labelPattern, logger) - if err != nil { - return nil, err - } - return newAudioSourceFromDriver(d, selectedMedia, logger) -} - // DeviceInfo describes a driver. type DeviceInfo struct { ID string @@ -178,11 +139,6 @@ func QueryScreenDevices() []DeviceInfo { return getDriverInfo(driver.GetManager().Query(getScreenFilterBase()), true) } -// QueryAudioDevices lists all known audio devices. -func QueryAudioDevices() []DeviceInfo { - return getDriverInfo(driver.GetManager().Query(getAudioFilterBase()), true) -} - func getDriverInfo(drivers []driver.Driver, useSep bool) []DeviceInfo { infos := make([]DeviceInfo, len(drivers)) for i, d := range drivers { @@ -213,11 +169,6 @@ func QueryVideoDeviceLabels() []string { return getDriversLabels(driver.GetManager().Query(getVideoFilterBase()), true) } -// QueryAudioDeviceLabels lists all known audio devices. -func QueryAudioDeviceLabels() []string { - return getDriversLabels(driver.GetManager().Query(getAudioFilterBase()), true) -} - func getDriversLabels(drivers []driver.Driver, useSep bool) []string { var labels []string for _, d := range drivers { @@ -306,56 +257,6 @@ func newVideoSourceFromDriver( return newMediaSource[image.Image](videoDriver, mediaReaderFuncNoCtx[image.Image](reader.Read), mediaProp.Video), nil } -func getUserAudioDriver( - constraints mediadevices.MediaStreamConstraints, - label *string, - logger logging.Logger, -) (driver.Driver, prop.Media, error) { - var audioConstraints mediadevices.MediaTrackConstraints - if constraints.Audio != nil { - constraints.Audio(&audioConstraints) - } - return selectAudio(audioConstraints, label, logger) -} - -func getUserAudioDriverPattern( - constraints mediadevices.MediaStreamConstraints, - labelPattern *regexp.Regexp, - logger logging.Logger, -) (driver.Driver, prop.Media, error) { - var audioConstraints mediadevices.MediaTrackConstraints - if constraints.Audio != nil { - constraints.Audio(&audioConstraints) - } - return selectVideoPattern(audioConstraints, labelPattern, logger) -} - -func newAudioSourceFromDriver( - audioDriver driver.Driver, - mediaProp prop.Media, - logger logging.Logger, -) (MediaSource[wave.Audio], error) { - recorder, ok := audioDriver.(driver.AudioRecorder) - if !ok { - return nil, errors.New("driver not a driver.AudioRecorder") - } - - if driverStatus := audioDriver.Status(); driverStatus != driver.StateClosed { - logger.Warnw("audio driver is not closed, attempting to close and reopen", "status", driverStatus) - if err := audioDriver.Close(); err != nil { - logger.Errorw("error closing driver", "error", err) - } - } - if err := audioDriver.Open(); err != nil { - return nil, err - } - reader, err := recorder.AudioRecord(mediaProp) - if err != nil { - return nil, err - } - return newMediaSource[wave.Audio](audioDriver, mediaReaderFuncNoCtx[wave.Audio](reader.Read), mediaProp.Audio), nil -} - func labelFilter(target string, useSep bool) driver.FilterFn { return driver.FilterFn(func(d driver.Driver) bool { if !useSep { @@ -408,14 +309,6 @@ func selectScreenPattern( return selectBestDriver(getScreenFilterBase(), getScreenFilterPattern(labelPattern), constraints, logger) } -func selectAudio( - constraints mediadevices.MediaTrackConstraints, - label *string, - logger logging.Logger, -) (driver.Driver, prop.Media, error) { - return selectBestDriver(getAudioFilterBase(), getAudioFilter(label), constraints, logger) -} - func getVideoFilterBase() driver.FilterFn { typeFilter := driver.FilterVideoRecorder() notScreenFilter := driver.FilterNot(driver.FilterDeviceType(driver.Screen)) @@ -456,18 +349,6 @@ func getScreenFilterPattern(labelPattern *regexp.Regexp) driver.FilterFn { return filter } -func getAudioFilterBase() driver.FilterFn { - return driver.FilterAudioRecorder() -} - -func getAudioFilter(label *string) driver.FilterFn { - filter := getAudioFilterBase() - if label != nil { - filter = driver.FilterAnd(filter, labelFilter(*label, true)) - } - return filter -} - // select implements SelectSettings algorithm. // Reference: https://w3c.github.io/mediacapture-main/#dfn-selectsettings func selectBestDriver( diff --git a/gostream/source_stream_utils.go b/gostream/source_stream_utils.go index cd4b1a2b1df..ae26622adec 100644 --- a/gostream/source_stream_utils.go +++ b/gostream/source_stream_utils.go @@ -15,13 +15,6 @@ func StreamVideoSource(ctx context.Context, vs VideoSource, stream Stream, logge }, stream.InputVideoFrames, logger) } -// StreamAudioSource streams the given video source to the stream forever until context signals cancellation. -func StreamAudioSource(ctx context.Context, as AudioSource, stream Stream, logger logging.Logger) error { - return streamMediaSource(ctx, as, stream, func(ctx context.Context, frameErr error) { - logger.Debugw("error getting frame", "error", frameErr) - }, stream.InputAudioChunks, logger) -} - // StreamVideoSourceWithErrorHandler streams the given video source to the stream forever // until context signals cancellation, frame errors are sent via the error handler. func StreamVideoSourceWithErrorHandler( @@ -30,14 +23,6 @@ func StreamVideoSourceWithErrorHandler( return streamMediaSource(ctx, vs, stream, errHandler, stream.InputVideoFrames, logger) } -// StreamAudioSourceWithErrorHandler streams the given audio source to the stream forever -// until context signals cancellation, audio errors are sent via the error handler. -func StreamAudioSourceWithErrorHandler( - ctx context.Context, as AudioSource, stream Stream, errHandler ErrorHandler, logger logging.Logger, -) error { - return streamMediaSource(ctx, as, stream, errHandler, stream.InputAudioChunks, logger) -} - // streamMediaSource will stream a source of media forever to the stream until the given context tells it to cancel. func streamMediaSource[T, U any]( ctx context.Context, diff --git a/gostream/stream.go b/gostream/stream.go index d7fef0fc40f..ce1abefd717 100644 --- a/gostream/stream.go +++ b/gostream/stream.go @@ -11,7 +11,6 @@ import ( "github.com/google/uuid" "github.com/pion/mediadevices/pkg/prop" - "github.com/pion/mediadevices/pkg/wave" "github.com/pion/rtp" "github.com/viamrobotics/webrtc/v3" "go.viam.com/utils" @@ -44,15 +43,12 @@ type Stream interface { InputVideoFrames(props prop.Video) (chan<- MediaReleasePair[image.Image], error) - InputAudioChunks(props prop.Audio) (chan<- MediaReleasePair[wave.Audio], error) - // Stop stops further processing of frames. Stop() } type internalStream interface { VideoTrackLocal() (webrtc.TrackLocal, bool) - AudioTrackLocal() (webrtc.TrackLocal, bool) } // MediaReleasePair associates a media with a corresponding @@ -66,8 +62,8 @@ type MediaReleasePair[T any] struct { // NewStream returns a newly configured stream that can begin to handle // new connections. func NewStream(config StreamConfig, logger logging.Logger) (Stream, error) { - if config.VideoEncoderFactory == nil && config.AudioEncoderFactory == nil { - return nil, errors.New("at least one audio or video encoder factory must be set") + if config.VideoEncoderFactory == nil { + return nil, errors.New("at least one video encoder factory must be set") } if config.TargetFrameRate == 0 { config.TargetFrameRate = defaultTargetFrameRate @@ -87,15 +83,6 @@ func NewStream(config StreamConfig, logger logging.Logger) (Stream, error) { ) } - var audioTrackLocal *trackLocalStaticSample - if config.AudioEncoderFactory != nil { - audioTrackLocal = newAudioTrackLocalStaticSample( - webrtc.RTPCodecCapability{MimeType: config.AudioEncoderFactory.MIMEType()}, - "audio", - name, - ) - } - ctx, cancelFunc := context.WithCancel(context.Background()) bs := &basicStream{ name: name, @@ -106,10 +93,6 @@ func NewStream(config StreamConfig, logger logging.Logger) (Stream, error) { inputImageChan: make(chan MediaReleasePair[image.Image]), outputVideoChan: make(chan []byte), - audioTrackLocal: audioTrackLocal, - inputAudioChan: make(chan MediaReleasePair[wave.Audio]), - outputAudioChan: make(chan []byte), - logger: logger, shutdownCtx: ctx, shutdownCtxCancel: cancelFunc, @@ -130,16 +113,6 @@ type basicStream struct { outputVideoChan chan []byte videoEncoder codec.VideoEncoder - audioTrackLocal *trackLocalStaticSample - inputAudioChan chan MediaReleasePair[wave.Audio] - outputAudioChan chan []byte - audioEncoder codec.AudioEncoder - - // audioLatency specifies how long in between audio samples. This must be guaranteed - // by all streamed audio. - audioLatency time.Duration - audioLatencySet bool - shutdownCtx context.Context shutdownCtxCancel func() activeBackgroundWorkers sync.WaitGroup @@ -159,11 +132,9 @@ func (bs *basicStream) Start() { } bs.started = true close(bs.streamingReadyCh) - bs.activeBackgroundWorkers.Add(4) + bs.activeBackgroundWorkers.Add(2) utils.ManagedGo(bs.processInputFrames, bs.activeBackgroundWorkers.Done) utils.ManagedGo(bs.processOutputFrames, bs.activeBackgroundWorkers.Done) - utils.ManagedGo(bs.processInputAudioChunks, bs.activeBackgroundWorkers.Done) - utils.ManagedGo(bs.processOutputAudioChunks, bs.activeBackgroundWorkers.Done) } // NOTE: (Nick S) This only writes video RTP packets @@ -184,9 +155,6 @@ func (bs *basicStream) Stop() { bs.started = false bs.shutdownCtxCancel() bs.activeBackgroundWorkers.Wait() - if bs.audioEncoder != nil { - bs.audioEncoder.Close() - } if bs.videoEncoder != nil { if err := bs.videoEncoder.Close(); err != nil { bs.logger.Error(err) @@ -195,7 +163,6 @@ func (bs *basicStream) Stop() { // reset bs.outputVideoChan = make(chan []byte) - bs.outputAudioChan = make(chan []byte) ctx, cancelFunc := context.WithCancel(context.Background()) bs.shutdownCtx = ctx bs.shutdownCtxCancel = cancelFunc @@ -215,28 +182,10 @@ func (bs *basicStream) InputVideoFrames(props prop.Video) (chan<- MediaReleasePa return bs.inputImageChan, nil } -func (bs *basicStream) InputAudioChunks(props prop.Audio) (chan<- MediaReleasePair[wave.Audio], error) { - if bs.config.AudioEncoderFactory == nil { - return nil, errors.New("no audio in stream") - } - bs.mu.Lock() - if bs.audioLatencySet && bs.audioLatency != props.Latency { - return nil, errors.New("cannot stream audio source with different latencies") - } - bs.audioLatencySet = true - bs.audioLatency = props.Latency - bs.mu.Unlock() - return bs.inputAudioChan, nil -} - func (bs *basicStream) VideoTrackLocal() (webrtc.TrackLocal, bool) { return bs.videoTrackLocal, bs.videoTrackLocal != nil } -func (bs *basicStream) AudioTrackLocal() (webrtc.TrackLocal, bool) { - return bs.audioTrackLocal, bs.audioTrackLocal != nil -} - func (bs *basicStream) processInputFrames() { frameLimiterDur := time.Second / time.Duration(bs.config.TargetFrameRate) defer close(bs.outputVideoChan) @@ -334,63 +283,6 @@ func (bs *basicStream) processInputFrames() { } } -func (bs *basicStream) processInputAudioChunks() { - defer close(bs.outputAudioChan) - var samplingRate, channels int - for { - select { - case <-bs.shutdownCtx.Done(): - return - default: - } - var audioChunkPair MediaReleasePair[wave.Audio] - select { - case audioChunkPair = <-bs.inputAudioChan: - case <-bs.shutdownCtx.Done(): - return - } - if audioChunkPair.Media == nil { - continue - } - var initErr bool - func() { - if audioChunkPair.Release != nil { - defer audioChunkPair.Release() - } - - info := audioChunkPair.Media.ChunkInfo() - newSamplingRate, newChannels := info.SamplingRate, info.Channels - if samplingRate != newSamplingRate || channels != newChannels { - samplingRate, channels = newSamplingRate, newChannels - bs.logger.Infow("detected new audio info", "sampling_rate", samplingRate, "channels", channels) - - bs.audioTrackLocal.setAudioLatency(bs.audioLatency) - if err := bs.initAudioCodec(samplingRate, channels); err != nil { - bs.logger.Error(err) - initErr = true - return - } - } - - encodedChunk, ready, err := bs.audioEncoder.Encode(bs.shutdownCtx, audioChunkPair.Media) - if err != nil { - bs.logger.Error(err) - return - } - if ready && encodedChunk != nil { - select { - case <-bs.shutdownCtx.Done(): - return - case bs.outputAudioChan <- encodedChunk: - } - } - }() - if initErr { - return - } - } -} - func (bs *basicStream) processOutputFrames() { framesSent := 0 for outputFrame := range bs.outputVideoChan { @@ -410,36 +302,8 @@ func (bs *basicStream) processOutputFrames() { } } -func (bs *basicStream) processOutputAudioChunks() { - chunksSent := 0 - for outputChunk := range bs.outputAudioChan { - select { - case <-bs.shutdownCtx.Done(): - return - default: - } - now := time.Now() - if err := bs.audioTrackLocal.WriteData(outputChunk); err != nil { - bs.logger.Errorw("error writing audio chunk", "error", err) - } - chunksSent++ - if Debug { - bs.logger.Debugw("wrote sample", "chunks_sent", chunksSent, "write_time", time.Since(now)) - } - } -} - func (bs *basicStream) initVideoCodec(width, height int) error { var err error bs.videoEncoder, err = bs.config.VideoEncoderFactory.New(width, height, bs.config.TargetFrameRate, bs.logger) return err } - -func (bs *basicStream) initAudioCodec(sampleRate, channelCount int) error { - var err error - if bs.audioEncoder != nil { - bs.audioEncoder.Close() - } - bs.audioEncoder, err = bs.config.AudioEncoderFactory.New(sampleRate, channelCount, bs.audioLatency, bs.logger) - return err -} diff --git a/gostream/stream_config.go b/gostream/stream_config.go index 63aa9694451..49282445ea3 100644 --- a/gostream/stream_config.go +++ b/gostream/stream_config.go @@ -8,7 +8,6 @@ import ( type StreamConfig struct { Name string VideoEncoderFactory codec.VideoEncoderFactory - AudioEncoderFactory codec.AudioEncoderFactory // TargetFrameRate will hint to the stream to try to maintain this frame rate. TargetFrameRate int diff --git a/gostream/swapper.go b/gostream/swapper.go index 08d62b51ac4..e8e9a5b4943 100644 --- a/gostream/swapper.go +++ b/gostream/swapper.go @@ -6,7 +6,6 @@ import ( "sync" "github.com/pion/mediadevices/pkg/prop" - "github.com/pion/mediadevices/pkg/wave" "github.com/pkg/errors" "go.viam.com/utils" ) @@ -23,10 +22,6 @@ type ( // A HotSwappableVideoSource allows for continuous streaming of video of // swappable underlying video sources. HotSwappableVideoSource = HotSwappableMediaSource[image.Image, prop.Video] - - // A HotSwappableAudioSource allows for continuous streaming of audio of - // swappable underlying audio sources. - HotSwappableAudioSource = HotSwappableMediaSource[wave.Audio, prop.Audio] ) type hotSwappableMediaSource[T, U any] struct { @@ -48,11 +43,6 @@ func NewHotSwappableVideoSource(src VideoSource) HotSwappableVideoSource { return NewHotSwappableMediaSource[image.Image, prop.Video](src) } -// NewHotSwappableAudioSource returns a hot swappable audio source. -func NewHotSwappableAudioSource(src AudioSource) HotSwappableAudioSource { - return NewHotSwappableMediaSource[wave.Audio, prop.Audio](src) -} - var errSwapperClosed = errors.New("hot swapper closed or uninitialized") // Stream returns a stream that is tolerant to the underlying media source changing. diff --git a/gostream/webrtc_track.go b/gostream/webrtc_track.go index 8e26d6550a2..8445933ec57 100644 --- a/gostream/webrtc_track.go +++ b/gostream/webrtc_track.go @@ -156,12 +156,10 @@ func (s *trackLocalStaticRTP) Write(b []byte) (n int, err error) { // trackLocalStaticSample is a TrackLocal that has a pre-set codec and accepts Samples. // If you wish to send a RTP Packet use trackLocalStaticRTP. type trackLocalStaticSample struct { - packetizer rtp.Packetizer - rtpTrack *trackLocalStaticRTP - sampler samplerFunc - isAudio bool - clockRate uint32 - audioLatency time.Duration + packetizer rtp.Packetizer + rtpTrack *trackLocalStaticRTP + sampler samplerFunc + clockRate uint32 } // newVideoTrackLocalStaticSample returns a trackLocalStaticSample for video. @@ -171,17 +169,6 @@ func newVideoTrackLocalStaticSample(c webrtc.RTPCodecCapability, id, streamID st } } -// newAudioTrackLocalStaticSample returns a trackLocalStaticSample for audio. -func newAudioTrackLocalStaticSample( - c webrtc.RTPCodecCapability, - id, streamID string, -) *trackLocalStaticSample { - return &trackLocalStaticSample{ - rtpTrack: newtrackLocalStaticRTP(c, id, streamID), - isAudio: true, - } -} - // ID is the unique identifier for this Track. This should be unique for the // stream, but doesn't have to globally unique. A common example would be 'audio' or 'video' // and StreamID would be 'desktop' or 'webcam'. @@ -193,14 +180,14 @@ func (s *trackLocalStaticSample) StreamID() string { return s.rtpTrack.StreamID( // RID is the RTP stream identifier. func (s *trackLocalStaticSample) RID() string { return s.rtpTrack.RID() } -// Kind controls if this TrackLocal is audio or video. -func (s *trackLocalStaticSample) Kind() webrtc.RTPCodecType { return s.rtpTrack.Kind() } - // Codec gets the Codec of the track. func (s *trackLocalStaticSample) Codec() webrtc.RTPCodecCapability { return s.rtpTrack.Codec() } +// Kind controls if this TrackLocal is audio or video. +func (s *trackLocalStaticSample) Kind() webrtc.RTPCodecType { return s.rtpTrack.Kind() } + const rtpOutboundMTU = 1200 // Bind is called by the PeerConnection after negotiation is complete @@ -240,12 +227,6 @@ func (s *trackLocalStaticSample) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCod return codec, nil } -func (s *trackLocalStaticSample) setAudioLatency(latency time.Duration) { - s.rtpTrack.mu.Lock() - defer s.rtpTrack.mu.Unlock() - s.audioLatency = latency -} - // Unbind implements the teardown logic when the track is no longer needed. This happens // because a track has been stopped. func (s *trackLocalStaticSample) Unbind(t webrtc.TrackLocalContext) error { @@ -263,16 +244,9 @@ func (s *trackLocalStaticSample) WriteData(frame []byte) error { s.rtpTrack.mu.Unlock() return nil } - if s.isAudio && s.audioLatency == 0 { - return nil - } sampler := s.sampler if sampler == nil { - if s.isAudio { - s.sampler = newAudioSampler(s.clockRate, s.audioLatency) - } else { - s.sampler = newVideoSampler(s.clockRate) - } + s.sampler = newVideoSampler(s.clockRate) } s.rtpTrack.mu.Unlock() @@ -318,8 +292,6 @@ func payloaderForCodec(codec webrtc.RTPCodecCapability) (rtp.Payloader, error) { switch strings.ToLower(codec.MimeType) { case strings.ToLower(webrtc.MimeTypeH264): return &codecs.H264Payloader{}, nil - case strings.ToLower(webrtc.MimeTypeOpus): - return &codecs.OpusPayloader{}, nil case strings.ToLower(webrtc.MimeTypeVP8): return &codecs.VP8Payloader{}, nil case strings.ToLower(webrtc.MimeTypeVP9): @@ -349,12 +321,3 @@ func newVideoSampler(clockRate uint32) samplerFunc { return samples }) } - -// newAudioSampler creates a audio sampler that uses a fixed latency and -// the codec's clock rate to come up with a duration for each sample. -func newAudioSampler(clockRate uint32, latency time.Duration) samplerFunc { - samples := uint32(math.Round(float64(clockRate) * latency.Seconds())) - return samplerFunc(func() uint32 { - return samples - }) -} diff --git a/robot/impl/data/fake.json b/robot/impl/data/fake.json index e194e27b3cf..f77315d598a 100644 --- a/robot/impl/data/fake.json +++ b/robot/impl/data/fake.json @@ -8,11 +8,6 @@ "parent": "pieceArm" } }, - { - "name": "mic1", - "type": "audio_input", - "model": "fake" - }, { "name": "cameraOver", "type": "camera", diff --git a/robot/impl/data/fake_jobs.json b/robot/impl/data/fake_jobs.json index 46dff550b95..47abf268b11 100644 --- a/robot/impl/data/fake_jobs.json +++ b/robot/impl/data/fake_jobs.json @@ -8,11 +8,6 @@ "parent": "pieceArm" } }, - { - "name": "mic1", - "type": "audio_input", - "model": "fake" - }, { "name": "cameraOver", "type": "camera", @@ -71,15 +66,19 @@ "attributes": { "relative": true } + }, + { + "name": "audioin1", + "type": "audio_in", + "model": "fake" + }, + { + "name": "audioout1", + "type": "audio_out", + "model": "fake" } ], "jobs" : [ - { - "name" : "my_mic_job", - "schedule" : "4s", - "resource" : "mic1", - "method" : "Properties" - }, { "name" : "my_arm_job", "schedule" : "*/4 * * * * *", @@ -91,6 +90,18 @@ "schedule" : "5s", "resource" : "movement_sensor1", "method" : "GetReadings" + }, + { + "name" : "my_audioin_job", + "schedule" : "5s", + "resource" : "audioin1", + "method" : "GetProperties" + }, + { + "name" : "my_audioout_job", + "schedule" : "5s", + "resource" : "audioout1", + "method" : "GetProperties" } ] } diff --git a/robot/impl/jobmanager_test.go b/robot/impl/jobmanager_test.go index f90711ce005..efa5d7a24b2 100644 --- a/robot/impl/jobmanager_test.go +++ b/robot/impl/jobmanager_test.go @@ -10,14 +10,14 @@ import ( "testing" "time" - "github.com/pion/mediadevices/pkg/prop" "go.uber.org/zap/zapcore" "go.viam.com/test" "go.viam.com/utils" "go.viam.com/utils/testutils" "go.viam.com/rdk/components/arm" - "go.viam.com/rdk/components/audioinput" + "go.viam.com/rdk/components/audioin" + "go.viam.com/rdk/components/audioout" "go.viam.com/rdk/components/base" "go.viam.com/rdk/components/board" "go.viam.com/rdk/components/button" @@ -56,6 +56,7 @@ import ( "go.viam.com/rdk/testutils/inject" injectmotion "go.viam.com/rdk/testutils/inject/motion" "go.viam.com/rdk/testutils/robottestutils" + rutils "go.viam.com/rdk/utils" ) func TestJobManagerDurationAndCronFromJson(t *testing.T) { @@ -826,26 +827,38 @@ func TestJobManagerComponents(t *testing.T) { return dummyArm, nil }}) - // audioinput - dummyAudioInput := inject.NewAudioInput("audio") - dummyAudioInput.MediaPropertiesFunc = func(ctx context.Context) (prop.Audio, error) { - audio := prop.Audio{ - ChannelCount: 10, - Latency: 3 * time.Second, - SampleRate: 128, - } - return audio, nil + // audioin + dummyAudioIn := inject.NewAudioIn("audioin") + dummyAudioIn.PropertiesFunc = func(ctx context.Context, extra map[string]interface{}) (rutils.Properties, error) { + return rutils.Properties{}, nil + } + resource.RegisterComponent( + audioin.API, + model, + resource.Registration[audioin.AudioIn, resource.NoNativeConfig]{Constructor: func( + ctx context.Context, + deps resource.Dependencies, + conf resource.Config, + logger logging.Logger, + ) (audioin.AudioIn, error) { + return dummyAudioIn, nil + }}) + + // audioout + dummyAudioOut := inject.NewAudioOut("audioout") + dummyAudioOut.PropertiesFunc = func(ctx context.Context, extra map[string]interface{}) (rutils.Properties, error) { + return rutils.Properties{}, nil } resource.RegisterComponent( - audioinput.API, + audioout.API, model, - resource.Registration[audioinput.AudioInput, resource.NoNativeConfig]{Constructor: func( + resource.Registration[audioout.AudioOut, resource.NoNativeConfig]{Constructor: func( ctx context.Context, deps resource.Dependencies, conf resource.Config, logger logging.Logger, - ) (audioinput.AudioInput, error) { - return dummyAudioInput, nil + ) (audioout.AudioOut, error) { + return dummyAudioOut, nil }}) // base @@ -1142,8 +1155,13 @@ func TestJobManagerComponents(t *testing.T) { }, { Model: model, - Name: "audio", - API: audioinput.API, + Name: "audioin", + API: audioin.API, + }, + { + Model: model, + Name: "audioout", + API: audioout.API, }, { Model: model, @@ -1237,10 +1255,18 @@ func TestJobManagerComponents(t *testing.T) { }, { config.JobConfigData{ - Name: "audio input job", + Name: "audioin job", + Schedule: "3s", + Resource: "audioin", + Method: "GetProperties", + }, + }, + { + config.JobConfigData{ + Name: "audioout job", Schedule: "3s", - Resource: "audio", - Method: "Properties", + Resource: "audioout", + Method: "GetProperties", }, }, { @@ -1378,7 +1404,8 @@ func TestJobManagerComponents(t *testing.T) { } defer func() { resource.Deregister(arm.API, model) - resource.Deregister(audioinput.API, model) + resource.Deregister(audioin.API, model) + resource.Deregister(audioout.API, model) resource.Deregister(base.API, model) resource.Deregister(board.API, model) resource.Deregister(button.API, model) @@ -1405,9 +1432,9 @@ func TestJobManagerComponents(t *testing.T) { // we will test for succeeded jobs to be the amount we started, // and that there are no failed jobs test.That(tb, logs.FilterMessage("Job triggered").Len(), - test.ShouldBeGreaterThanOrEqualTo, 18) + test.ShouldBeGreaterThanOrEqualTo, 19) test.That(tb, logs.FilterMessage("Job succeeded").Len(), - test.ShouldBeGreaterThanOrEqualTo, 18) + test.ShouldBeGreaterThanOrEqualTo, 19) test.That(tb, logs.FilterMessage("Job failed").Len(), test.ShouldBeLessThanOrEqualTo, 0) }) diff --git a/robot/impl/local_robot_test.go b/robot/impl/local_robot_test.go index 607fa56d176..45e917e70fb 100644 --- a/robot/impl/local_robot_test.go +++ b/robot/impl/local_robot_test.go @@ -30,7 +30,6 @@ import ( "go.viam.com/rdk/cloud" "go.viam.com/rdk/components/arm" "go.viam.com/rdk/components/arm/fake" - "go.viam.com/rdk/components/audioinput" "go.viam.com/rdk/components/base" "go.viam.com/rdk/components/board" "go.viam.com/rdk/components/camera" @@ -175,7 +174,6 @@ func TestConfigRemote(t *testing.T) { base.Named("foo"), base.Named("myParentIsRemote"), camera.Named("foo:cameraOver"), - audioinput.Named("foo:mic1"), movementsensor.Named("foo:movement_sensor1"), movementsensor.Named("foo:movement_sensor2"), gripper.Named("foo:pieceGripper"), @@ -367,8 +365,6 @@ func TestConfigRemoteWithPrefixes(t *testing.T) { base.Named("myParentIsRemote"), camera.Named("foo:foocameraOver"), camera.Named("bar:barcameraOver"), - audioinput.Named("foo:foomic1"), - audioinput.Named("bar:barmic1"), movementsensor.Named("foo:foomovement_sensor1"), movementsensor.Named("bar:barmovement_sensor1"), movementsensor.Named("foo:foomovement_sensor2"), @@ -430,8 +426,6 @@ func TestConfigRemoteWithPrefixes(t *testing.T) { base.Named("myParentIsRemote"), camera.Named("foocameraOver"), camera.Named("barcameraOver"), - audioinput.Named("foomic1"), - audioinput.Named("barmic1"), movementsensor.Named("foomovement_sensor1"), movementsensor.Named("barmovement_sensor1"), movementsensor.Named("foomovement_sensor2"), @@ -576,8 +570,6 @@ func TestConfigRemoteWithAuth(t *testing.T) { expected := []resource.Name{ arm.Named("bar:barpieceArm"), arm.Named("foo:foopieceArm"), - audioinput.Named("bar:barmic1"), - audioinput.Named("foo:foomic1"), camera.Named("bar:barcameraOver"), camera.Named("foo:foocameraOver"), movementsensor.Named("bar:barmovement_sensor1"), @@ -713,7 +705,6 @@ func TestConfigRemoteWithTLSAuth(t *testing.T) { expected := []resource.Name{ arm.Named("foo:pieceArm"), - audioinput.Named("foo:mic1"), camera.Named("foo:cameraOver"), movementsensor.Named("foo:movement_sensor1"), movementsensor.Named("foo:movement_sensor2"), @@ -731,7 +722,7 @@ func TestConfigRemoteWithTLSAuth(t *testing.T) { statuses, err := r2.MachineStatus(context.Background()) test.That(t, err, test.ShouldBeNil) - test.That(t, len(statuses.Resources), test.ShouldEqual, 11) + test.That(t, len(statuses.Resources), test.ShouldEqual, 10) test.That(t, statuses, test.ShouldNotBeNil) } @@ -935,13 +926,12 @@ func TestMetadataUpdate(t *testing.T) { resources := r.ResourceNames() test.That(t, err, test.ShouldBeNil) - test.That(t, len(resources), test.ShouldEqual, 6) + test.That(t, len(resources), test.ShouldEqual, 5) test.That(t, err, test.ShouldBeNil) // 5 declared resources + default motion resourceNames := []resource.Name{ arm.Named("pieceArm"), - audioinput.Named("mic1"), camera.Named("cameraOver"), gripper.Named("pieceGripper"), movementsensor.Named("movement_sensor1"), @@ -1038,7 +1028,6 @@ func TestGetRemoteResourceAndGrandFather(t *testing.T) { arm.Named("remote:arm1"), arm.Named("remote:arm2"), arm.Named("remote:pieceArm"), - audioinput.Named("remote:mic1"), camera.Named("remote:cameraOver"), movementsensor.Named("remote:movement_sensor1"), movementsensor.Named("remote:movement_sensor2"), diff --git a/robot/jobmanager/jobmanager.go b/robot/jobmanager/jobmanager.go index 0ecbf99a979..6e2f0eb958d 100644 --- a/robot/jobmanager/jobmanager.go +++ b/robot/jobmanager/jobmanager.go @@ -184,7 +184,7 @@ func (jm *JobManager) createDescriptorSourceAndgRPCMethod( reflSource := grpcurl.DescriptorSourceFromServer(jm.ctx, refClient) descSource := reflSource resourceType := res.Name().API.SubtypeName - // some subtypes have an underscore in their name, like audio_input, input_controller, + // some subtypes have an underscore in their name, like audio_in, audio_out, input_controller, // or pose_tracker, while their APIs do not - so we have to remove the underscore. resourceType = strings.ReplaceAll(resourceType, "_", "") services, err := descSource.ListServices() diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index 285b5f8ce72..79b3704e061 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -15,7 +15,6 @@ import ( "go.viam.com/utils/rpc" "go.viam.com/utils/trace" - "go.viam.com/rdk/components/audioinput" "go.viam.com/rdk/components/camera" "go.viam.com/rdk/gostream" "go.viam.com/rdk/logging" @@ -43,7 +42,7 @@ type peerState struct { senders []*webrtc.RTPSender } -// Server implements the gRPC audio/video streaming service. +// Server implements the gRPC video streaming service. type Server struct { streampb.UnimplementedStreamServiceServer logger logging.Logger @@ -59,7 +58,6 @@ type Server struct { streamConfig gostream.StreamConfig videoSources map[string]gostream.HotSwappableVideoSource - audioSources map[string]gostream.HotSwappableAudioSource } // Resolution holds the width and height of a video stream. @@ -86,7 +84,6 @@ func NewServer( isAlive: true, streamConfig: streamConfig, videoSources: map[string]gostream.HotSwappableVideoSource{}, - audioSources: map[string]gostream.HotSwappableAudioSource{}, } server.startMonitorCameraAvailable() return server @@ -169,11 +166,10 @@ func (server *Server) AddStream(ctx context.Context, req *streampb.AddStreamRequ return nil, err } - // return error if resource is neither a camera nor audioinput + // return error if resource is not a camera _, isCamErr := camerautils.Camera(server.robot, streamStateToAdd.Stream) - _, isAudioErr := audioinput.FromProvider(server.robot, streamStateToAdd.Stream.Name()) - if isCamErr != nil && isAudioErr != nil { - return nil, errors.Errorf("stream is neither a camera nor audioinput. streamName: %v", streamStateToAdd.Stream) + if isCamErr != nil { + return nil, errors.Errorf("stream is not a camera. streamName: %v", streamStateToAdd.Stream) } var nameToPeerState map[string]*peerState @@ -232,13 +228,6 @@ func (server *Server) AddStream(ctx context.Context, req *streampb.AddStreamRequ return nil, err } } - // if the stream supports audio, add the audio track - if trackLocal, haveTrackLocal := streamStateToAdd.Stream.AudioTrackLocal(); haveTrackLocal { - if err := addTrack(trackLocal); err != nil { - server.logger.Error(err.Error()) - return nil, err - } - } if err := streamStateToAdd.Increment(); err != nil { server.logger.Error(err.Error()) return nil, err @@ -268,12 +257,9 @@ func (server *Server) RemoveStream(ctx context.Context, req *streampb.RemoveStre if !ok { return &streampb.RemoveStreamResponse{}, nil } - - streamName := streamToRemove.Stream.Name() - _, isAudioResourceErr := audioinput.FromProvider(server.robot, streamName) _, isCameraResourceErr := camerautils.Camera(server.robot, streamToRemove.Stream) - if isAudioResourceErr != nil && isCameraResourceErr != nil { + if isCameraResourceErr != nil { return &streampb.RemoveStreamResponse{}, nil } @@ -461,19 +447,18 @@ func (server *Server) resetVideoSource(ctx context.Context, name string) error { return nil } -// AddNewStreams adds new video and audio streams to the server using the updated set of video and -// audio sources. It refreshes the sources, checks for a valid stream configuration, and starts -// the streams if applicable. +// AddNewStreams adds new video streams to the server using the updated set of video sources. +// It refreshes the sources, checks for a valid stream configuration, and starts the streams +// if applicable. func (server *Server) AddNewStreams(ctx context.Context) error { - // Refreshing sources will walk the robot resources for anything implementing the camera and - // audioinput APIs and mutate the `svc.videoSources` and `svc.audioSources` maps. + // Refreshing sources will walk the robot resources for anything implementing the camera APIs + // and mutate the `svc.videoSources` map. server.refreshVideoSources(ctx) - server.refreshAudioSources() if server.streamConfig == (gostream.StreamConfig{}) { - // The `streamConfig` dictates the video and audio encoder libraries to use. We can't do - // much if none are present. - if len(server.videoSources) != 0 || len(server.audioSources) != 0 { + // The `streamConfig` dictates the video encoder library to use. We can't do + // much if none is present. + if len(server.videoSources) != 0 { server.logger.Warn("not starting streams due to no stream config being set") } return nil @@ -511,23 +496,6 @@ func (server *Server) AddNewStreams(ctx context.Context) error { server.startVideoStream(ctx, server.videoSources[name], stream) } - for name := range server.audioSources { - // Similarly, we walk the updated set of `audioSources` and ensure all of the audio sources - // are "created" and "started". `createStream` and `startAudioStream` have the same - // behaviors as described above for video streams. - config := gostream.StreamConfig{ - Name: name, - AudioEncoderFactory: server.streamConfig.AudioEncoderFactory, - } - stream, alreadyRegistered, err := server.createStream(config, name) - if err != nil { - return err - } else if alreadyRegistered { - continue - } - server.startAudioStream(ctx, server.audioSources[name], stream) - } - return nil } @@ -583,11 +551,6 @@ func (server *Server) removeMissingStreams() { // Stream names are slightly modified versions of the resource short name camName := streamState.Stream.Name() shortName := resource.SDPTrackNameToShortName(camName) - if _, err := audioinput.FromProvider(server.robot, shortName); err == nil { - // `nameToStreamState` can contain names for both camera and audio resources. Leave the - // stream in place if its an audio resource. - continue - } _, err := camera.FromProvider(server.robot, shortName) if !resource.IsNotFoundError(err) { @@ -687,23 +650,6 @@ func (server *Server) refreshVideoSources(ctx context.Context) { } } -// refreshAudioSources checks and initializes every possible audio source that could be viewed from the robot. -func (server *Server) refreshAudioSources() { - for _, name := range audioinput.NamesFromRobot(server.robot) { - input, err := audioinput.FromProvider(server.robot, name) - if err != nil { - continue - } - existing, ok := server.audioSources[input.Name().Name] - if ok { - existing.Swap(input) - continue - } - newSwapper := gostream.NewHotSwappableAudioSource(input) - server.audioSources[input.Name().Name] = newSwapper - } -} - func (server *Server) createStream(config gostream.StreamConfig, name string) (gostream.Stream, bool, error) { stream, err := server.NewStream(config) // Skip if stream is already registered, otherwise raise any other errors @@ -742,14 +688,6 @@ func (server *Server) startVideoStream(ctx context.Context, source gostream.Vide }) } -func (server *Server) startAudioStream(ctx context.Context, source gostream.AudioSource, stream gostream.Stream) { - server.startStream(func(opts *BackoffTuningOptions) error { - // Merge ctx that may be coming from a Reconfigure. - streamAudioCtx, _ := utils.MergeContext(server.closedCtx, ctx) - return streamAudioSource(streamAudioCtx, source, stream, opts, server.logger) - }) -} - func (server *Server) getFramerateFromCamera(name string) (int, error) { cam, err := camera.FromProvider(server.robot, name) if err != nil { diff --git a/robot/web/stream/stream_c.go b/robot/web/stream/stream_c.go index 09cfe5693c5..eaad01dc160 100644 --- a/robot/web/stream/stream_c.go +++ b/robot/web/stream/stream_c.go @@ -20,14 +20,3 @@ func streamVideoSource( ) error { return gostream.StreamVideoSourceWithErrorHandler(ctx, source, stream, backoffOpts.getErrorThrottledHandler(logger, stream.Name()), logger) } - -// streamAudioSource starts a stream from an audio source with a throttled error handler. -func streamAudioSource( - ctx context.Context, - source gostream.AudioSource, - stream gostream.Stream, - backoffOpts *BackoffTuningOptions, - logger logging.Logger, -) error { - return gostream.StreamAudioSourceWithErrorHandler(ctx, source, stream, backoffOpts.getErrorThrottledHandler(logger, "audio"), logger) -} diff --git a/robot/web/stream/stream_notc.go b/robot/web/stream/stream_notc.go index 3ce7a20b8df..c359b80daf9 100644 --- a/robot/web/stream/stream_notc.go +++ b/robot/web/stream/stream_notc.go @@ -21,14 +21,3 @@ func streamVideoSource( ) error { return errors.New("not implemented for non-cgo") } - -// streamAudioSource starts a stream from an audio source with a throttled error handler. -func streamAudioSource( - ctx context.Context, - source gostream.AudioSource, - stream gostream.Stream, - backoffOpts *BackoffTuningOptions, - logger logging.Logger, -) error { - return errors.New("not implemented for non-cgo") -} diff --git a/robot/web/stream/stream_test.go b/robot/web/stream/stream_test.go index 56e44d06996..6d86c77bb1e 100644 --- a/robot/web/stream/stream_test.go +++ b/robot/web/stream/stream_test.go @@ -16,7 +16,6 @@ import ( "go.viam.com/rdk/components/camera/fake" "go.viam.com/rdk/config" "go.viam.com/rdk/gostream" - "go.viam.com/rdk/gostream/codec/opus" "go.viam.com/rdk/gostream/codec/x264" rgrpc "go.viam.com/rdk/grpc" "go.viam.com/rdk/logging" @@ -39,10 +38,8 @@ func setupRealRobot(t *testing.T, robotConfig *config.Config, logger logging.Log robot, err := robotimpl.RobotFromConfig(ctx, robotConfig, nil, logger) test.That(t, err, test.ShouldBeNil) - // We initialize with a stream config such that the stream server is capable of creating video stream and - // audio stream data. + // We initialize with a stream config such that the stream server is capable of creating video stream. webSvc := web.New(robot, logger, web.WithStreamConfig(gostream.StreamConfig{ - AudioEncoderFactory: opus.NewEncoderFactory(), VideoEncoderFactory: x264.NewEncoderFactory(), })) options, _, addr := robottestutils.CreateBaseOptionsAndListener(t) diff --git a/robot/web/web_test.go b/robot/web/web_test.go index 91ea146e1f9..0a0728199e5 100644 --- a/robot/web/web_test.go +++ b/robot/web/web_test.go @@ -38,12 +38,10 @@ import ( "google.golang.org/grpc/status" "go.viam.com/rdk/components/arm" - "go.viam.com/rdk/components/audioinput" "go.viam.com/rdk/components/camera" "go.viam.com/rdk/config" gizmopb "go.viam.com/rdk/examples/customresources/apis/proto/api/component/gizmo/v1" "go.viam.com/rdk/gostream" - "go.viam.com/rdk/gostream/codec/opus" "go.viam.com/rdk/gostream/codec/x264" rgrpc "go.viam.com/rdk/grpc" "go.viam.com/rdk/logging" @@ -765,7 +763,6 @@ func TestWebWithStreams(t *testing.T) { robot.LoggerFunc = func() logging.Logger { return logger } options, _, addr := robottestutils.CreateBaseOptionsAndListener(t) svc := web.New(robot, logger, web.WithStreamConfig(gostream.StreamConfig{ - AudioEncoderFactory: opus.NewEncoderFactory(), VideoEncoderFactory: x264.NewEncoderFactory(), })) err := svc.Start(ctx, options) @@ -794,21 +791,12 @@ func TestWebWithStreams(t *testing.T) { err = svc.Reconfigure(context.Background(), rs, resource.Config{}) test.That(t, err, test.ShouldBeNil) - // Add an audio stream - audio := &inject.AudioInput{} - robot.Mu.Lock() - rs[audioinput.Named(audioKey)] = audio - robot.Mu.Unlock() - robot.MockResourcesFromMap(rs) - err = svc.Reconfigure(context.Background(), rs, resource.Config{}) - test.That(t, err, test.ShouldBeNil) - // Test that new streams are available resp, err = streamClient.ListStreams(ctx, &streampb.ListStreamsRequest{}) test.That(t, err, test.ShouldBeNil) test.That(t, resp.Names, test.ShouldContain, camera1Key) test.That(t, resp.Names, test.ShouldContain, camera2Key) - test.That(t, resp.Names, test.ShouldHaveLength, 3) + test.That(t, resp.Names, test.ShouldHaveLength, 2) // We need to cancel otherwise we are stuck waiting for WebRTC to start streaming. cancel() diff --git a/services/datamanager/builtin/builtin_test.go b/services/datamanager/builtin/builtin_test.go index fab504f006b..4b9a321d4b6 100644 --- a/services/datamanager/builtin/builtin_test.go +++ b/services/datamanager/builtin/builtin_test.go @@ -62,7 +62,7 @@ func (p *pathologicalAssociatedConfig) Link(conf *resource.Config) func TestCollectorRegistry(t *testing.T) { collectors := data.DumpRegisteredCollectors() - test.That(t, len(collectors), test.ShouldEqual, 55) + test.That(t, len(collectors), test.ShouldEqual, 54) mds := slices.SortedFunc(maps.Keys(collectors), func(a, b data.MethodMetadata) int { return cmp.Compare(a.String(), b.String()) }) @@ -74,7 +74,6 @@ func TestCollectorRegistry(t *testing.T) { {API: resource.API{Type: rdkComponent, SubtypeName: "arm"}, MethodName: "JointPositions"}, {API: resource.API{Type: rdkComponent, SubtypeName: "audio_in"}, MethodName: "DoCommand"}, {API: resource.API{Type: rdkComponent, SubtypeName: "audio_in"}, MethodName: "GetAudio"}, - {API: resource.API{Type: rdkComponent, SubtypeName: "audio_input"}, MethodName: "DoCommand"}, {API: resource.API{Type: rdkComponent, SubtypeName: "base"}, MethodName: "DoCommand"}, {API: resource.API{Type: rdkComponent, SubtypeName: "board"}, MethodName: "Analogs"}, {API: resource.API{Type: rdkComponent, SubtypeName: "board"}, MethodName: "DoCommand"}, diff --git a/testutils/inject/audioin.go b/testutils/inject/audioin.go index d3449e2f24f..cd273a415df 100644 --- a/testutils/inject/audioin.go +++ b/testutils/inject/audioin.go @@ -16,6 +16,7 @@ type AudioIn struct { GetAudioFunc func(ctx context.Context, codec string, durationSeconds float32, previousTimestampNs int64, extra map[string]interface{}) ( chan *audioin.AudioChunk, error) PropertiesFunc func(ctx context.Context, extra map[string]interface{}) (utils.Properties, error) + CloseFunc func(ctx context.Context) error } // NewAudioIn returns a new injected audio in. @@ -53,3 +54,14 @@ func (a *AudioIn) DoCommand(ctx context.Context, cmd map[string]interface{}) (ma } return a.DoFunc(ctx, cmd) } + +// Close calls the injected Close or the real version. +func (a *AudioIn) Close(ctx context.Context) error { + if a.CloseFunc == nil { + if a.AudioIn == nil { + return nil + } + return a.AudioIn.Close(ctx) + } + return a.CloseFunc(ctx) +} diff --git a/testutils/inject/audioinput.go b/testutils/inject/audioinput.go deleted file mode 100644 index 18695158c6e..00000000000 --- a/testutils/inject/audioinput.go +++ /dev/null @@ -1,78 +0,0 @@ -//go:build !no_cgo - -package inject - -import ( - "context" - - "github.com/pion/mediadevices/pkg/prop" - "github.com/pkg/errors" - - "go.viam.com/rdk/components/audioinput" - "go.viam.com/rdk/gostream" - "go.viam.com/rdk/resource" -) - -// AudioInput is an injected audio input. -type AudioInput struct { - audioinput.AudioInput - name resource.Name - DoFunc func(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) - StreamFunc func( - ctx context.Context, - errHandlers ...gostream.ErrorHandler, - ) (gostream.AudioStream, error) - MediaPropertiesFunc func(ctx context.Context) (prop.Audio, error) - CloseFunc func(ctx context.Context) error -} - -// NewAudioInput returns a new injected audio input. -func NewAudioInput(name string) *AudioInput { - return &AudioInput{name: audioinput.Named(name)} -} - -// Name returns the name of the resource. -func (ai *AudioInput) Name() resource.Name { - return ai.name -} - -// Stream calls the injected Stream or the real version. -func (ai *AudioInput) Stream( - ctx context.Context, - errHandlers ...gostream.ErrorHandler, -) (gostream.AudioStream, error) { - if ai.StreamFunc == nil { - return ai.AudioInput.Stream(ctx, errHandlers...) - } - return ai.StreamFunc(ctx, errHandlers...) -} - -// MediaProperties calls the injected MediaProperties or the real version. -func (ai *AudioInput) MediaProperties(ctx context.Context) (prop.Audio, error) { - if ai.MediaPropertiesFunc != nil { - return ai.MediaPropertiesFunc(ctx) - } - if ai.AudioInput != nil { - return ai.AudioInput.MediaProperties(ctx) - } - return prop.Audio{}, errors.Wrap(ctx.Err(), "media properties unavailable") -} - -// Close calls the injected Close or the real version. -func (ai *AudioInput) Close(ctx context.Context) error { - if ai.CloseFunc == nil { - if ai.AudioInput == nil { - return nil - } - return ai.AudioInput.Close(ctx) - } - return ai.CloseFunc(ctx) -} - -// DoCommand calls the injected DoCommand or the real version. -func (ai *AudioInput) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) { - if ai.DoFunc == nil { - return ai.AudioInput.DoCommand(ctx, cmd) - } - return ai.DoFunc(ctx, cmd) -} diff --git a/testutils/inject/audioout.go b/testutils/inject/audioout.go index 0868afe6532..079138c7954 100644 --- a/testutils/inject/audioout.go +++ b/testutils/inject/audioout.go @@ -15,6 +15,7 @@ type AudioOut struct { DoFunc func(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) PlayFunc func(ctx context.Context, data []byte, info *utils.AudioInfo, extra map[string]interface{}) error PropertiesFunc func(ctx context.Context, extra map[string]interface{}) (utils.Properties, error) + CloseFunc func(ctx context.Context) error } // NewAudioOut returns a new injected AudioOut. @@ -50,3 +51,14 @@ func (a *AudioOut) Play(ctx context.Context, data []byte, info *utils.AudioInfo, } return a.PlayFunc(ctx, data, info, extra) } + +// Close calls the injected Close or the real version. +func (a *AudioOut) Close(ctx context.Context) error { + if a.CloseFunc == nil { + if a.AudioOut == nil { + return nil + } + return a.AudioOut.Close(ctx) + } + return a.CloseFunc(ctx) +} diff --git a/web/server/entrypoint_linux.go b/web/server/entrypoint_linux.go index af90af72176..688ee4fcaba 100644 --- a/web/server/entrypoint_linux.go +++ b/web/server/entrypoint_linux.go @@ -4,13 +4,11 @@ package server import ( "go.viam.com/rdk/gostream" - "go.viam.com/rdk/gostream/codec/opus" "go.viam.com/rdk/gostream/codec/x264" ) func makeStreamConfig() gostream.StreamConfig { var streamConfig gostream.StreamConfig - streamConfig.AudioEncoderFactory = opus.NewEncoderFactory() streamConfig.VideoEncoderFactory = x264.NewEncoderFactory() return streamConfig } diff --git a/web/server/entrypoint_test.go b/web/server/entrypoint_test.go index 8c9db5379f9..a000a9b5923 100644 --- a/web/server/entrypoint_test.go +++ b/web/server/entrypoint_test.go @@ -88,7 +88,7 @@ func TestEntrypoint(t *testing.T) { numResources := 21 if runtime.GOOS == "windows" { // windows build excludes builtin models that use cgo, - // including fake audioinput, builtin motion, fake arm, and builtin navigation. + // including builtin motion, fake arm, and builtin navigation. numResources = 18 } @@ -112,7 +112,7 @@ func TestEntrypoint(t *testing.T) { err = json.Unmarshal(outputBytes, ®istrations) test.That(t, err, test.ShouldBeNil) - numReg := 57 + numReg := 55 if runtime.GOOS == "windows" { // windows build excludes builtin models that use cgo numReg = 47 diff --git a/web/server/entrypoint_unix.go b/web/server/entrypoint_unix.go index 2c46c390207..9a906ef12c7 100644 --- a/web/server/entrypoint_unix.go +++ b/web/server/entrypoint_unix.go @@ -4,13 +4,11 @@ package server import ( "go.viam.com/rdk/gostream" - "go.viam.com/rdk/gostream/codec/opus" "go.viam.com/rdk/gostream/codec/x264" ) func makeStreamConfig() gostream.StreamConfig { var streamConfig gostream.StreamConfig - streamConfig.AudioEncoderFactory = opus.NewEncoderFactory() streamConfig.VideoEncoderFactory = x264.NewEncoderFactory() return streamConfig }