From 818177fe465f0fdd1edd01358e30e586135a655d Mon Sep 17 00:00:00 2001 From: hexbabe Date: Fri, 4 Apr 2025 15:31:58 -0400 Subject: [PATCH 1/4] Add TestStreamMediaBehavior and logic to support it --- robot/web/stream/server.go | 9 + robot/web/stream/stream_test.go | 363 +++++++++++++++++++++++++++++++- robot/web/web.go | 6 + 3 files changed, 372 insertions(+), 6 deletions(-) diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index 10e2db1f931..bedcada780d 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -846,3 +846,12 @@ retryLoop: } return frame.Bounds().Dx(), frame.Bounds().Dy(), nil } + +// GetVideoSourceForTest returns the hot swappable video source for the given stream name. +// This is intended for use in tests only. +func (server *Server) GetVideoSourceForTest(name string) (gostream.HotSwappableVideoSource, bool) { + server.mu.RLock() + defer server.mu.RUnlock() + source, ok := server.videoSources[name] + return source, ok +} diff --git a/robot/web/stream/stream_test.go b/robot/web/stream/stream_test.go index cdd40d6bdc1..f7f79c1db8b 100644 --- a/robot/web/stream/stream_test.go +++ b/robot/web/stream/stream_test.go @@ -6,7 +6,10 @@ import ( "context" "strings" "testing" + "time" + "github.com/pion/rtp" + viamwebrtc "github.com/viamrobotics/webrtc/v3" streampb "go.viam.com/api/stream/v1" "go.viam.com/test" "go.viam.com/utils/rpc" @@ -14,6 +17,7 @@ import ( "go.viam.com/rdk/components/camera" "go.viam.com/rdk/components/camera/fake" + "go.viam.com/rdk/components/camera/rtppassthrough" "go.viam.com/rdk/config" "go.viam.com/rdk/gostream" "go.viam.com/rdk/gostream/codec/opus" @@ -28,11 +32,16 @@ import ( "go.viam.com/rdk/testutils/robottestutils" ) +// streamServerGetter is a test-only interface to access the stream server. +type streamServerGetter interface { + GetStreamServer() *webstream.Server +} + // setupRealRobot creates a robot from the input config and starts a WebRTC server with video // streaming capabilities. // //nolint:lll -func setupRealRobot(t *testing.T, robotConfig *config.Config, logger logging.Logger) (context.Context, robot.LocalRobot, string, web.Service) { +func setupRealRobot(t *testing.T, robotConfig *config.Config, logger logging.Logger) (context.Context, robot.LocalRobot, string, web.Service, *webstream.Server) { t.Helper() ctx := context.Background() @@ -49,7 +58,12 @@ func setupRealRobot(t *testing.T, robotConfig *config.Config, logger logging.Log err = webSvc.Start(ctx, options) test.That(t, err, test.ShouldBeNil) - return ctx, robot, addr, webSvc + // Attempt to get the stream server; this might be nil if CGO is disabled. + getter, ok := webSvc.(streamServerGetter) + test.That(t, ok, test.ShouldBeTrue) + streamServer := getter.GetStreamServer() + + return ctx, robot, addr, webSvc, streamServer } // TestAudioTrackIsNotCreatedForVideoStream asserts that starting a video stream from a camera does @@ -79,8 +93,10 @@ func TestAudioTrackIsNotCreatedForVideoStream(t *testing.T) { }} // Create a robot with a single fake camera. - ctx, robot, addr, webSvc := setupRealRobot(t, origCfg, logger) - + ctx, robot, addr, webSvc, streamServer := setupRealRobot(t, origCfg, logger) + if streamServer == nil { + t.Skip("Skipping test; CGO may be disabled, stream server is nil") + } defer robot.Close(ctx) defer webSvc.Close(ctx) @@ -309,7 +325,7 @@ func TestGetStreamOptions(t *testing.T) { }, }} - ctx, robot, addr, webSvc := setupRealRobot(t, origCfg, logger) + ctx, robot, addr, webSvc, _ := setupRealRobot(t, origCfg, logger) defer robot.Close(ctx) defer webSvc.Close(ctx) conn, err := rgrpc.Dial(context.Background(), addr, logger.Sublogger("TestDial"), rpc.WithDisableDirectGRPC()) @@ -405,7 +421,7 @@ func TestSetStreamOptions(t *testing.T) { }, }} - ctx, robot, addr, webSvc := setupRealRobot(t, origCfg, logger) + ctx, robot, addr, webSvc, _ := setupRealRobot(t, origCfg, logger) defer robot.Close(ctx) defer webSvc.Close(ctx) conn, err := rgrpc.Dial(context.Background(), addr, logger.Sublogger("TestDial"), rpc.WithDisableDirectGRPC()) @@ -548,3 +564,338 @@ func TestSetStreamOptions(t *testing.T) { test.That(t, removeRes, test.ShouldNotBeNil) }) } + +// TestStreamMediaBehavior verifies that WebRTC media streams survive reconfigures and +// that the resulting images are in the expected format from stream options handling and resets. +func TestStreamMediaBehavior(t *testing.T) { + logger := logging.NewTestLogger(t).Sublogger("TestStreamMediaBehavior") + origCfg := &config.Config{Components: []resource.Config{ + { + Name: "test-camera", + API: resource.NewAPI("rdk", "component", "camera"), + Model: resource.DefaultModelFamily.WithModel("fake"), + ConvertedAttributes: &fake.Config{ + Width: 1280, + Height: 720, + RTPPassthrough: true, + Model: true, + }, + }, + }} + ctx, robot, addr, webSvc, streamServer := setupRealRobot(t, origCfg, logger) + if streamServer == nil { + t.Fatal("stream server is nil. CGO may be disabled.") + } + + conn, err := rgrpc.Dial(context.Background(), addr, logger.Sublogger("TestDial"), rpc.WithDisableDirectGRPC()) + test.That(t, err, test.ShouldBeNil) + defer conn.Close() + + camClient, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named("test-camera"), logger) + test.That(t, err, test.ShouldBeNil) + defer camClient.Close(ctx) + + props, err := camClient.Properties(ctx) + test.That(t, err, test.ShouldBeNil) + test.That(t, props.IntrinsicParams.Width, test.ShouldEqual, 1280) + test.That(t, props.IntrinsicParams.Height, test.ShouldEqual, 720) + + camResource, err := robot.ResourceByName(camera.Named("test-camera")) + test.That(t, err, test.ShouldBeNil) + camServer, ok := camResource.(camera.Camera) + test.That(t, ok, test.ShouldBeTrue) + + // Cast the server-side resource to rtppassthrough.Source to set up for a subscription + rtpSource, ok := camServer.(rtppassthrough.Source) + test.That(t, ok, test.ShouldBeTrue) + + // Create stream client (using the connection) + livestreamClient := streampb.NewStreamServiceClient(conn) + + // Verify stream is available + listResp, err := livestreamClient.ListStreams(ctx, &streampb.ListStreamsRequest{}) + test.That(t, err, test.ShouldBeNil) + test.That(t, listResp.Names, test.ShouldResemble, []string{"test-camera"}) + + // Add test-camera stream to client + _, err = livestreamClient.AddStream(ctx, &streampb.AddStreamRequest{Name: "test-camera"}) + test.That(t, err, test.ShouldBeNil) + + // Wait for video track to be added + testutils.WaitForAssertion(t, func(tb testing.TB) { + // Handle potential nil description if connection closes early + desc := conn.PeerConn().CurrentLocalDescription() + if desc == nil { + tb.Log("Peer connection local description is nil, likely closed early") + return + } + test.That(tb, desc.SDP, test.ShouldContainSubstring, "m=video") + // Check that H264 is negotiated, regardless of the dynamic payload type + test.That(tb, desc.SDP, test.ShouldContainSubstring, "H264/90000") + }) + + // Wait for connection to be stable again + testutils.WaitForAssertion(t, func(tb testing.TB) { + state := conn.PeerConn().ConnectionState() + test.That(tb, state, test.ShouldEqual, viamwebrtc.PeerConnectionStateConnected) + iceState := conn.PeerConn().ICEConnectionState() + test.That(tb, iceState, test.ShouldEqual, viamwebrtc.ICEConnectionStateConnected) + }) + + pktsChan := make(chan []*rtp.Packet, 10) + subCtx, subCancel := context.WithCancel(ctx) + defer subCancel() + sub, err := rtpSource.SubscribeRTP(subCtx, 512, func(pkts []*rtp.Packet) { + select { + case <-subCtx.Done(): + return + default: + } + + if len(pkts) > 0 { + now := time.Now() + logger.Debugf("Callback received %d packets at %s", len(pkts), now.Format(time.RFC3339Nano)) + select { + case pktsChan <- pkts: + default: + logger.Debug("Packet channel full, dropping packet") + } + } + }) + test.That(t, err, test.ShouldBeNil) + defer func() { + logger.Debug("Unsubscribing from RTP source") + unsubscribeErr := rtpSource.Unsubscribe(ctx, sub.ID) // Use original ctx for unsubscribe + test.That(t, unsubscribeErr, test.ShouldBeNil) + logger.Debug("Unsubscribed from RTP source") + }() + + // Wait for the first packet + timeout := time.After(15 * time.Second) + var firstPkts []*rtp.Packet + select { + case p, ok := <-pktsChan: + if !ok { + t.Fatal("pktsChan was closed unexpectedly before first packet") + } + if p == nil { + t.Fatal("received nil packet slice from pktsChan for first packet") + } + if len(p) == 0 { + t.Fatal("received empty packet slice from pktsChan for first packet") + } + firstPkts = p // assign received packets + logger.Infof("Received first packet(s): count=%d", len(firstPkts)) + case <-timeout: + connState := conn.PeerConn().ConnectionState() + iceState := conn.PeerConn().ICEConnectionState() + t.Fatalf("timeout waiting for the first RTP packet (conn state: %s, ICE state: %s)", connState, iceState) + case <-ctx.Done(): // ctx is the main test context + t.Fatal("test context cancelled while waiting for the first packet") + } + + // Verify first packet + test.That(t, len(firstPkts), test.ShouldBeGreaterThan, 0) + pkt := firstPkts[0] + test.That(t, pkt.PayloadType, test.ShouldEqual, 96) // H264 + test.That(t, pkt.Version, test.ShouldEqual, 2) // RTP version + test.That(t, len(pkt.Payload), test.ShouldBeGreaterThan, 0) // Verify packet has payload + logger.Infof("First packet verified: payload size=%d bytes", len(pkt.Payload)) + + logger.Info("Initial packet verified successfully.") + initialPktLen := len(pkt.Payload) // Store initial packet length + + // --- Test Reconfiguration Survival --- + t.Run("Test reconfiguration survival", func(t *testing.T) { + logger.Info("Performing robot reconfiguration...") + newCfg := origCfg + robot.Reconfigure(ctx, newCfg) + test.That(t, err, test.ShouldBeNil) + + webSvc.Reconfigure(ctx, nil, resource.Config{}) + logger.Info("Reconfiguration complete.") + + time.Sleep(500 * time.Millisecond) + + // Verify the packets continue flowing after reconfigure + timeout = time.After(5 * time.Second) + select { + case pkts := <-pktsChan: + test.That(t, len(pkts), test.ShouldBeGreaterThan, 0) + logger.Infof("Received %d packets after reconfiguration", len(pkts)) + case <-timeout: + connState := conn.PeerConn().ConnectionState() + iceState := conn.PeerConn().ICEConnectionState() + t.Fatalf("timeout waiting for RTP packets after reconfiguration (conn state: %s, ICE state: %s)", connState, iceState) + case <-subCtx.Done(): + t.Fatal("Subscription context cancelled unexpectedly during reconfiguration test") + } + }) + + // Test stream resize with multiple resolutions + t.Run("Test multiple resolution changes", func(t *testing.T) { + // Test 640x360 + _, err = livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "test-camera", + Resolution: &streampb.Resolution{ + Width: 640, + Height: 360, + }, + }) + test.That(t, err, test.ShouldBeNil) + logger.Info("Set resolution to 640x360") + + // Verify source dimensions changed via server-side inspection + vs, ok := streamServer.GetVideoSourceForTest("test-camera") + test.That(t, ok, test.ShouldBeTrue) + mediaProps, err := vs.MediaProperties(ctx) + test.That(t, err, test.ShouldBeNil) + test.That(t, mediaProps.Width, test.ShouldEqual, 640) + test.That(t, mediaProps.Height, test.ShouldEqual, 360) + + // Wait for state transition + time.Sleep(1 * time.Second) + // Test 320x240 + _, err = livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "test-camera", + Resolution: &streampb.Resolution{ + Width: 320, + Height: 240, + }, + }) + test.That(t, err, test.ShouldBeNil) + logger.Info("Set resolution to 320x240") + + // Verify source dimensions changed + vs, ok = streamServer.GetVideoSourceForTest("test-camera") + test.That(t, ok, test.ShouldBeTrue) + mediaProps, err = vs.MediaProperties(ctx) + test.That(t, err, test.ShouldBeNil) + test.That(t, mediaProps.Width, test.ShouldEqual, 320) + test.That(t, mediaProps.Height, test.ShouldEqual, 240) + + // Test invalid resolution + _, err = livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "test-camera", + Resolution: &streampb.Resolution{ + Width: 0, + Height: 0, + }, + }) + test.That(t, err, test.ShouldNotBeNil) + test.That(t, err.Error(), test.ShouldContainSubstring, "invalid resolution") + logger.Info("Verified invalid resolution error handling") + + // Reset to original resolution - Should switch back to passthrough + _, err = livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "test-camera", + }) + test.That(t, err, test.ShouldBeNil) + logger.Info("Reset to original resolution") + + // Verify source dimensions reset. + // NOTE: We use a workaround here (direct resource lookup) because checking the + // HotSwappableVideoSource's MediaProperties immediately after the swap in resetVideoSource + // was observed to return stale data in previous debugging sessions. + vs, ok = streamServer.GetVideoSourceForTest("test-camera") // Swapper check is unreliable here + test.That(t, ok, test.ShouldBeTrue) + _, err = vs.MediaProperties(ctx) + test.That(t, err, test.ShouldBeNil) + + // Log resources potentially helpful for debugging + logger.Infof("Resources on robot before direct Properties call: %v", robot.ResourceNames()) + camRes, err := robot.ResourceByName(camera.Named("test-camera")) + test.That(t, err, test.ShouldBeNil) + camServer, ok := camRes.(camera.Camera) + test.That(t, ok, test.ShouldBeTrue) + origProps, err := camServer.Properties(ctx) + test.That(t, err, test.ShouldBeNil) + + test.That(t, origProps.IntrinsicParams, test.ShouldNotBeNil) + test.That(t, origProps.IntrinsicParams.Width, test.ShouldEqual, 1280) + test.That(t, origProps.IntrinsicParams.Height, test.ShouldEqual, 720) + + // Wait for switch back to passthrough and packets to resume on pktsChan + resumeTimeout := time.After(5 * time.Second) + var resetPkts []*rtp.Packet + select { + case p, ok := <-pktsChan: + if !ok { + t.Fatal("pktsChan closed unexpectedly while waiting for reset packet") + } + if len(p) == 0 { + t.Fatal("Received empty packet slice after reset") + } + resetPkts = p + logger.Infof("Received packet batch after reset (count=%d)", len(resetPkts)) + case <-resumeTimeout: + connState := conn.PeerConn().ConnectionState() + iceState := conn.PeerConn().ICEConnectionState() + t.Fatalf("timeout waiting for RTP packets after reset (conn state: %s, ICE state: %s)", connState, iceState) + case <-subCtx.Done(): + t.Fatal("Subscription context cancelled unexpectedly during reset resume check") + } + resetPkt := resetPkts[0] + test.That(t, len(resetPkt.Payload), test.ShouldEqual, initialPktLen) + logger.Infof("Verified packet payload length reverted to initial (%d) after reset", initialPktLen) + }) + + t.Run("Test stream removal and re-addition", func(t *testing.T) { + _, err = livestreamClient.RemoveStream(ctx, &streampb.RemoveStreamRequest{Name: "test-camera"}) + test.That(t, err, test.ShouldBeNil) + logger.Info("Removed stream") + + // Verify stream is still listed after RemoveStream (peer-specific removal) + testutils.WaitForAssertion(t, func(tb testing.TB) { + // Add a small delay to allow ListStreams to reflect potential (but unlikely) changes + time.Sleep(100 * time.Millisecond) + + listResp, err := livestreamClient.ListStreams(ctx, &streampb.ListStreamsRequest{}) + test.That(tb, err, test.ShouldBeNil) + + // Log the current stream list for debugging + logger.Debugf("Current stream list after RemoveStream: %v", listResp.Names) + + // Check that test-camera IS STILL in the list because RemoveStream is peer-specific, not global + test.That(tb, listResp.Names, test.ShouldHaveLength, 1) + test.That(tb, listResp.Names[0], test.ShouldEqual, "test-camera") + }) + logger.Info("Verified stream is still listed after peer-specific RemoveStream") + + // Re-add stream + _, err = livestreamClient.AddStream(ctx, &streampb.AddStreamRequest{Name: "test-camera"}) + test.That(t, err, test.ShouldBeNil) + logger.Info("Re-added stream") + + // Wait for video track to be re-added + testutils.WaitForAssertion(t, func(tb testing.TB) { + desc := conn.PeerConn().CurrentLocalDescription() + if desc == nil { + tb.Log("Peer connection local description is nil, likely closed early") + return + } + test.That(tb, desc.SDP, test.ShouldContainSubstring, "m=video") + test.That(tb, desc.SDP, test.ShouldContainSubstring, "H264/90000") + }) + + // Wait for connection to be stable again + testutils.WaitForAssertion(t, func(tb testing.TB) { + state := conn.PeerConn().ConnectionState() + test.That(tb, state, test.ShouldEqual, viamwebrtc.PeerConnectionStateConnected) + iceState := conn.PeerConn().ICEConnectionState() + test.That(tb, iceState, test.ShouldEqual, viamwebrtc.ICEConnectionStateConnected) + }) + + // Verify packets continue flowing after re-addition + timeout = time.After(5 * time.Second) + select { + case pkts := <-pktsChan: + test.That(t, len(pkts), test.ShouldBeGreaterThan, 0) + logger.Infof("Received %d packets after stream re-addition", len(pkts)) + case <-timeout: + connState := conn.PeerConn().ConnectionState() + iceState := conn.PeerConn().ICEConnectionState() + t.Fatalf("timeout waiting for RTP packets after stream re-addition (conn state: %s, ICE state: %s)", connState, iceState) + } + }) +} diff --git a/robot/web/web.go b/robot/web/web.go index 650fcbd5b40..e21ce97ac17 100644 --- a/robot/web/web.go +++ b/robot/web/web.go @@ -110,6 +110,12 @@ type webService struct { modPeerConnTracker *grpc.ModPeerConnTracker } +// GetStreamServer returns the internal stream server instance. Used for testing. +func (svc *webService) GetStreamServer() *webstream.Server { + // This might return nil if CGO is disabled or init failed, handle appropriately in tests. + return svc.streamServer +} + var internalWebServiceName = resource.NewName( resource.APINamespaceRDKInternal.WithServiceType("web"), "builtin", From 9472c2122bb97a5f127ef31144a38f00ebb76ea8 Mon Sep 17 00:00:00 2001 From: hexbabe Date: Fri, 4 Apr 2025 16:32:37 -0400 Subject: [PATCH 2/4] Pass blank logger to web server to prevent log races --- robot/web/stream/stream_test.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/robot/web/stream/stream_test.go b/robot/web/stream/stream_test.go index f7f79c1db8b..bcf6d76ec31 100644 --- a/robot/web/stream/stream_test.go +++ b/robot/web/stream/stream_test.go @@ -582,7 +582,8 @@ func TestStreamMediaBehavior(t *testing.T) { }, }, }} - ctx, robot, addr, webSvc, streamServer := setupRealRobot(t, origCfg, logger) + // Pass a blank logger to setupRealRobot to prevent race conditions with test logger usage in background goroutines. + ctx, robot, addr, webSvc, streamServer := setupRealRobot(t, origCfg, logging.NewBlankLogger("")) if streamServer == nil { t.Fatal("stream server is nil. CGO may be disabled.") } @@ -715,8 +716,6 @@ func TestStreamMediaBehavior(t *testing.T) { webSvc.Reconfigure(ctx, nil, resource.Config{}) logger.Info("Reconfiguration complete.") - time.Sleep(500 * time.Millisecond) - // Verify the packets continue flowing after reconfigure timeout = time.After(5 * time.Second) select { @@ -753,8 +752,6 @@ func TestStreamMediaBehavior(t *testing.T) { test.That(t, mediaProps.Width, test.ShouldEqual, 640) test.That(t, mediaProps.Height, test.ShouldEqual, 360) - // Wait for state transition - time.Sleep(1 * time.Second) // Test 320x240 _, err = livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ Name: "test-camera", @@ -847,9 +844,6 @@ func TestStreamMediaBehavior(t *testing.T) { // Verify stream is still listed after RemoveStream (peer-specific removal) testutils.WaitForAssertion(t, func(tb testing.TB) { - // Add a small delay to allow ListStreams to reflect potential (but unlikely) changes - time.Sleep(100 * time.Millisecond) - listResp, err := livestreamClient.ListStreams(ctx, &streampb.ListStreamsRequest{}) test.That(tb, err, test.ShouldBeNil) From 7ab65fe27420ae50fbc974eacca86cccb2b5f218 Mon Sep 17 00:00:00 2001 From: hexbabe Date: Fri, 4 Apr 2025 17:23:07 -0400 Subject: [PATCH 3/4] Remove misleading test.That --- robot/web/stream/stream_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/robot/web/stream/stream_test.go b/robot/web/stream/stream_test.go index bcf6d76ec31..6432f3e68fe 100644 --- a/robot/web/stream/stream_test.go +++ b/robot/web/stream/stream_test.go @@ -711,8 +711,6 @@ func TestStreamMediaBehavior(t *testing.T) { logger.Info("Performing robot reconfiguration...") newCfg := origCfg robot.Reconfigure(ctx, newCfg) - test.That(t, err, test.ShouldBeNil) - webSvc.Reconfigure(ctx, nil, resource.Config{}) logger.Info("Reconfiguration complete.") From 47624564db9f97b6b9ea6fac9176e1683ebd94cf Mon Sep 17 00:00:00 2001 From: hexbabe Date: Wed, 9 Apr 2025 13:29:49 -0400 Subject: [PATCH 4/4] Remove cgo note (irrelevant); Remove camera intrinsics checks (irrelevant); Add extended comment about orignal ctx usage --- robot/web/stream/stream_test.go | 47 +++++++++++++-------------------- 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/robot/web/stream/stream_test.go b/robot/web/stream/stream_test.go index 6432f3e68fe..cb7db3efdab 100644 --- a/robot/web/stream/stream_test.go +++ b/robot/web/stream/stream_test.go @@ -58,7 +58,7 @@ func setupRealRobot(t *testing.T, robotConfig *config.Config, logger logging.Log err = webSvc.Start(ctx, options) test.That(t, err, test.ShouldBeNil) - // Attempt to get the stream server; this might be nil if CGO is disabled. + // Make type assertion to access the stream server. getter, ok := webSvc.(streamServerGetter) test.That(t, ok, test.ShouldBeTrue) streamServer := getter.GetStreamServer() @@ -94,11 +94,11 @@ func TestAudioTrackIsNotCreatedForVideoStream(t *testing.T) { // Create a robot with a single fake camera. ctx, robot, addr, webSvc, streamServer := setupRealRobot(t, origCfg, logger) - if streamServer == nil { - t.Skip("Skipping test; CGO may be disabled, stream server is nil") - } defer robot.Close(ctx) defer webSvc.Close(ctx) + if streamServer == nil { + t.Fatal("stream server is nil") + } // Create a client connection to the robot. Disable direct GRPC to force a WebRTC // connection. Fail if a WebRTC connection cannot be made. @@ -584,23 +584,17 @@ func TestStreamMediaBehavior(t *testing.T) { }} // Pass a blank logger to setupRealRobot to prevent race conditions with test logger usage in background goroutines. ctx, robot, addr, webSvc, streamServer := setupRealRobot(t, origCfg, logging.NewBlankLogger("")) + defer robot.Close(ctx) + defer webSvc.Close(ctx) + if streamServer == nil { - t.Fatal("stream server is nil. CGO may be disabled.") + t.Fatal("stream server is nil") } conn, err := rgrpc.Dial(context.Background(), addr, logger.Sublogger("TestDial"), rpc.WithDisableDirectGRPC()) test.That(t, err, test.ShouldBeNil) defer conn.Close() - camClient, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named("test-camera"), logger) - test.That(t, err, test.ShouldBeNil) - defer camClient.Close(ctx) - - props, err := camClient.Properties(ctx) - test.That(t, err, test.ShouldBeNil) - test.That(t, props.IntrinsicParams.Width, test.ShouldEqual, 1280) - test.That(t, props.IntrinsicParams.Height, test.ShouldEqual, 720) - camResource, err := robot.ResourceByName(camera.Named("test-camera")) test.That(t, err, test.ShouldBeNil) camServer, ok := camResource.(camera.Camera) @@ -666,7 +660,9 @@ func TestStreamMediaBehavior(t *testing.T) { test.That(t, err, test.ShouldBeNil) defer func() { logger.Debug("Unsubscribing from RTP source") - unsubscribeErr := rtpSource.Unsubscribe(ctx, sub.ID) // Use original ctx for unsubscribe + // Use the original test context for unsubscribe to ensure cleanup proceeds + // even if subCtx was cancelled. + unsubscribeErr := rtpSource.Unsubscribe(ctx, sub.ID) test.That(t, unsubscribeErr, test.ShouldBeNil) logger.Debug("Unsubscribed from RTP source") }() @@ -784,6 +780,10 @@ func TestStreamMediaBehavior(t *testing.T) { // Reset to original resolution - Should switch back to passthrough _, err = livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ Name: "test-camera", + Resolution: &streampb.Resolution{ + Width: 1280, + Height: 720, + }, }) test.That(t, err, test.ShouldBeNil) logger.Info("Reset to original resolution") @@ -794,21 +794,10 @@ func TestStreamMediaBehavior(t *testing.T) { // was observed to return stale data in previous debugging sessions. vs, ok = streamServer.GetVideoSourceForTest("test-camera") // Swapper check is unreliable here test.That(t, ok, test.ShouldBeTrue) - _, err = vs.MediaProperties(ctx) - test.That(t, err, test.ShouldBeNil) - - // Log resources potentially helpful for debugging - logger.Infof("Resources on robot before direct Properties call: %v", robot.ResourceNames()) - camRes, err := robot.ResourceByName(camera.Named("test-camera")) - test.That(t, err, test.ShouldBeNil) - camServer, ok := camRes.(camera.Camera) - test.That(t, ok, test.ShouldBeTrue) - origProps, err := camServer.Properties(ctx) + mediaProps, err = vs.MediaProperties(ctx) test.That(t, err, test.ShouldBeNil) - - test.That(t, origProps.IntrinsicParams, test.ShouldNotBeNil) - test.That(t, origProps.IntrinsicParams.Width, test.ShouldEqual, 1280) - test.That(t, origProps.IntrinsicParams.Height, test.ShouldEqual, 720) + test.That(t, mediaProps.Width, test.ShouldEqual, 1280) + test.That(t, mediaProps.Height, test.ShouldEqual, 720) // Wait for switch back to passthrough and packets to resume on pktsChan resumeTimeout := time.After(5 * time.Second)