From 06c4fc9cd453671a503eafd069e699647f89a248 Mon Sep 17 00:00:00 2001 From: nathan contino Date: Mon, 14 Jul 2025 15:39:05 -0400 Subject: [PATCH 01/15] DOCS-4087: autonomous decision-making --- docs/data-ai/_index.md | 2 +- docs/data-ai/ai/act.md | 157 ----- .../data-ai/ai/make-decisions-autonomously.md | 616 ++++++++++++++++++ docs/data-ai/reference/apis/_index.md | 8 + .../reference/{ => apis}/data-client.md | 2 +- .../{ => apis}/data-management-client.md | 2 +- .../reference/{ => apis}/ml-model-client.md | 2 +- .../{ => apis}/ml-training-client.md | 2 +- .../reference/{ => apis}/vision-client.md | 2 +- docs/data-ai/reference/architecture.md | 2 +- docs/data-ai/reference/mlmodel-design.md | 2 +- .../reference/triggers-configuration.md | 2 +- docs/data-ai/train/train-tflite.md | 5 +- docs/data-ai/train/upload-external-data.md | 2 +- docs/operate/mobility/use-input-to-act.md | 2 +- .../services/webcam-line-follower-robot.md | 82 +-- 16 files changed, 679 insertions(+), 211 deletions(-) delete mode 100644 docs/data-ai/ai/act.md create mode 100644 docs/data-ai/ai/make-decisions-autonomously.md create mode 100644 docs/data-ai/reference/apis/_index.md rename docs/data-ai/reference/{ => apis}/data-client.md (84%) rename docs/data-ai/reference/{ => apis}/data-management-client.md (79%) rename docs/data-ai/reference/{ => apis}/ml-model-client.md (81%) rename docs/data-ai/reference/{ => apis}/ml-training-client.md (81%) rename docs/data-ai/reference/{ => apis}/vision-client.md (79%) diff --git a/docs/data-ai/_index.md b/docs/data-ai/_index.md index c002e81b2e..e966f75431 100644 --- a/docs/data-ai/_index.md +++ b/docs/data-ai/_index.md @@ -58,7 +58,7 @@ You can also monitor your machines through teleop, power your application logic, {{% card link="/data-ai/ai/deploy/" noimage="true" %}} {{% card link="/data-ai/ai/run-inference/" noimage="true" %}} {{% card link="/data-ai/ai/alert/" noimage="true" %}} -{{% card link="/data-ai/ai/act/" noimage="true" %}} +{{% card link="/data-ai/ai/make-decisions-autonomously/" noimage="true" %}} {{< /cards >}} {{< /how-to-expand >}} diff --git a/docs/data-ai/ai/act.md b/docs/data-ai/ai/act.md deleted file mode 100644 index b66b36a3a7..0000000000 --- a/docs/data-ai/ai/act.md +++ /dev/null @@ -1,157 +0,0 @@ ---- -linkTitle: "Act based on inferences" -title: "Act based on inferences" -weight: 70 -layout: "docs" -type: "docs" -description: "Use the vision service API to act based on inferences." -next: "/data-ai/train/upload-external-data/" ---- - -You can use the [vision service API](/dev/reference/apis/services/vision/) to get information about your machine's inferences and program behavior based on that. - -The following are examples of what you can do using a vision service alongside hardware: - -- [Line following robot](#program-a-line-following-robot): Using computer vision to follow objects or a pre-determined path -- [Accident prevention and quality assurance](#act-in-industrial-applications) - -## Program a line following robot - -For example, you can [program a line following robot](/tutorials/services/color-detection-scuttle/) that uses a vision service to follow a colored object. - -You can use the following code to detect and follow the location of a colored object: - -{{% expand "Click to view code" %}} - -```python {class="line-numbers linkable-line-numbers"} -async def connect(): - opts = RobotClient.Options.with_api_key( - # Replace "" (including brackets) with your machine's API key - api_key='', - # Replace "" (including brackets) with your machine's - # API key ID - api_key_id='' - ) - return await RobotClient.at_address("MACHINE ADDRESS", opts) - - -# Get largest detection box and see if it's center is in the left, center, or -# right third -def leftOrRight(detections, midpoint): - largest_area = 0 - largest = {"x_max": 0, "x_min": 0, "y_max": 0, "y_min": 0} - if not detections: - print("nothing detected :(") - return -1 - for d in detections: - a = (d.x_max - d.x_min) * (d.y_max-d.y_min) - if a > largest_area: - a = largest_area - largest = d - centerX = largest.x_min + largest.x_max/2 - if centerX < midpoint-midpoint/6: - return 0 # on the left - if centerX > midpoint+midpoint/6: - return 2 # on the right - else: - return 1 # basically centered - - -async def main(): - spinNum = 10 # when turning, spin the motor this much - straightNum = 300 # when going straight, spin motor this much - numCycles = 200 # run the loop X times - vel = 500 # go this fast when moving motor - - # Connect to robot client and set up components - machine = await connect() - base = Base.from_robot(machine, "my_base") - camera_name = "" - camera = Camera.from_robot(machine, camera_name) - frame = await camera.get_image(mime_type="image/jpeg") - - # Convert to PIL Image - pil_frame = viam_to_pil_image(frame) - - # Grab the vision service for the detector - my_detector = VisionClient.from_robot(machine, "my_color_detector") - - # Main loop. Detect the ball, determine if it's on the left or right, and - # head that way. Repeat this for numCycles - for i in range(numCycles): - detections = await my_detector.get_detections_from_camera(camera_name) - - answer = leftOrRight(detections, pil_frame.size[0]/2) - if answer == 0: - print("left") - await base.spin(spinNum, vel) # CCW is positive - await base.move_straight(straightNum, vel) - if answer == 1: - print("center") - await base.move_straight(straightNum, vel) - if answer == 2: - print("right") - await base.spin(-spinNum, vel) - # If nothing is detected, nothing moves - - await robot.close() - -if __name__ == "__main__": - print("Starting up... ") - asyncio.run(main()) - print("Done.") -``` - -{{% /expand%}} - -If you configured the color detector to detect red, your rover should detect and navigate towards any red objects that come into view of its camera. -Use something like a red sports ball or book cover as a target to follow to test your rover: - -
-{{
- -## Act in industrial applications - -You can also act based on inferences in an industrial context. -For example, you can program a robot arm to halt operations when workers enter dangerous zones, preventing potential accidents. - -The code for this would look like: - -```python {class="line-numbers linkable-line-numbers"} -detections = await detector.get_detections_from_camera(camera_name) -for d in detections: - if d.confidence > 0.6 and d.class_name == "PERSON": - arm.stop() -``` - -You can also use inferences of computer vision for quality assurance purposes. -For example, you can program a robot arm doing automated harvesting to use vision to identify ripe produce and pick crops selectively. - -The code for this would look like: - -```python {class="line-numbers linkable-line-numbers"} -classifications = await detector.get_classifications_from_camera( - camera_name, - 4) -for c in classifications: - if d.confidence > 0.6 and d.class_name == "RIPE": - arm.pick() -``` - -To get inferences programmatically, you will want to use the vision service API: - -{{< cards >}} -{{% card link="/dev/reference/apis/services/vision/" customTitle="Vision service API" noimage="True" %}} -{{< /cards >}} - -To implement industrial solutions in code, you can also explore the following component APIs: - -{{< cards >}} -{{< card link="/dev/reference/apis/components/arm/" customTitle="Arm API" noimage="True" >}} -{{< card link="/dev/reference/apis/components/base/" customTitle="Base API" noimage="True" >}} -{{< card link="/dev/reference/apis/components/camera/" customTitle="Camera API" noimage="True" >}} -{{< card link="/dev/reference/apis/components/gripper/" customTitle="Gripper API" noimage="True" >}} -{{< card link="/dev/reference/apis/components/motor/" customTitle="Motor API" noimage="True" >}} -{{< card link="/dev/reference/apis/components/sensor/" customTitle="Sensor API" noimage="True" >}} -{{< /cards >}} diff --git a/docs/data-ai/ai/make-decisions-autonomously.md b/docs/data-ai/ai/make-decisions-autonomously.md new file mode 100644 index 0000000000..9c4ae9e46d --- /dev/null +++ b/docs/data-ai/ai/make-decisions-autonomously.md @@ -0,0 +1,616 @@ +--- +linkTitle: "Make decisions autonomously" +title: "Make decisions autonomously" +weight: 70 +layout: "docs" +type: "docs" +description: "Use the vision service API to act based on inferences." +next: "/data-ai/train/upload-external-data/" +aliases: + - /data-ai/ai/act/ +--- + +Combine the [vision service API](/dev/reference/apis/services/vision/) and [component APIs](/dev/reference/apis/#component-apis) to interpret, then react to inferences. + +## Follow a line + +You can use a vision service and a motor to program a machine to follow a line. + +### Prerequisites + +- An SBC, for example a Raspberry Pi 4 +- A wheeled base component such as a [SCUTTLE robot](https://www.scuttlerobot.org/shop/) +- A webcam +- Colored tape, to create a path for your robot + +### Configure your machine + +Follow the [setup guide](/operate/get-started/setup/) to create a new machine. + +Connect your SCUTTLE base to your SBC. +Add the following `components` configuration to create board, base, and motor components in Viam so you can control your SCUTTLE base: + +```json +{ + "name": "my-board", + "model": "pi", + "api": "rdk:component:board", + "attributes": {}, + "depends_on": [] +}, +{ + "name": "leftm", + "model": "gpio", + "api": "rdk:component:motor", + "attributes": { + "pins": { + "a": "15", + "b": "16" + }, + "board": "my-board", + "max_rpm": 200 + }, + "depends_on": ["my-board"] +}, +{ + "name": "rightm", + "model": "gpio", + "api": "rdk:component:motor", + "attributes": { + "pins": { + "b": "11", + "dir": "", + "pwm": "", + "a": "12" + }, + "board": "my-board", + "max_rpm": 200 + }, + "depends_on": ["my-board"] +}, +{ + "name": "scuttlebase", + "model": "wheeled", + "api": "rdk:component:base", + "attributes": { + "width_mm": 400, + "wheel_circumference_mm": 258, + "left": ["leftm"], + "right": ["rightm"] + }, + "depends_on": ["leftm", "rightm"] +} +``` + +Connect your webcam to your SBC. +Add the following `components` configuration for your webcam: + +```json +{ + "name": "my_camera", + "model": "webcam", + "api": "rdk:component:camera", + "attributes": { + "video_path": "" + } +} +``` + +Finally, add the following `services` configuration for your vision service, replacing the `detect_color` value with the color of your line: + +```json +{ + "name": "green_detector", + "api": "rdk:service:vision", + "model": "my_line_detector", + "attributes": { + "segment_size_px": 100, + "detect_color": "#19FFD9", // replace with the color of your line + "hue_tolerance_pct": 0.06 + } +} +``` + +### Code + +```python {class="line-numbers linkable-line-numbers"} +import asyncio +from typing import Literal + +from viam.media.video import CameraMimeType +from viam.robot.client import RobotClient +from viam.components.base import Base, Vector3 +from viam.components.camera import Camera +from viam.services.vision import VisionClient +from viam.media.utils.pil import pil_to_viam_image, viam_to_pil_image +from viam.module.module import Module +from viam.resource.types import Model, Subtype +from viam.resource.base import ResourceBase +from viam.resource.registry import Registry, ResourceCreatorRegistration +from viam.proto.app.v1 import ComponentConfig + +class LineFollowerAPI(ResourceBase): + """ + LineFollowerAPI represents a custom API for controlling a base based on vision. + """ + SUBTYPE = Subtype("example-namespace", "example-module", "line_follower") + + async def start_line_following(self): + raise NotImplementedError + + async def stop_line_following(self): + raise NotImplementedError + +async def is_color_in_front(camera: Camera, detector: VisionClient): + """ + Returns whether the appropriate path color is detected in front of the center of the robot. + """ + frame = viam_to_pil_image(await camera.get_image(mime_type=CameraMimeType.JPEG)) + + x, y = frame.size[0], frame.size[1] + + # Crop the image to get only the middle fifth of the top third of the original image + cropped_frame = frame.crop((x / 2.5, 0, x / 1.25, y / 3)) + + detections = await detector.get_detections( + pil_to_viam_image(cropped_frame, CameraMimeType.JPEG) + ) + + if detections: # Check if the list is not empty + return True + return False + + +async def is_color_there( + camera: Camera, detector: VisionClient, location: Literal["left", "right"] +): + """ + Returns whether the appropriate path color is detected to the left/right of the robot's front. + """ + frame = viam_to_pil_image(await camera.get_image(mime_type=CameraMimeType.JPEG)) + x, y = frame.size[0], frame.size[1] + + if location == "left": + # Crop image to get only the left two fifths of the original image + cropped_frame = frame.crop((0, 0, x / 2.5, y)) + + detections = await detector.get_detections( + pil_to_viam_image(cropped_frame, CameraMimeType.JPEG) + ) + + elif location == "right": + # Crop image to get only the right two fifths of the original image + cropped_frame = frame.crop((x / 1.25, 0, x, y)) + + detections = await detector.get_detections( + pil_to_viam_image(cropped_frame, CameraMimeType.JPEG) + ) + else: + detections = [] # Ensure detections is defined if location is neither 'left' nor 'right' + + if detections: # Check if the list is not empty + return True + return False + + +async def stop_robot(base: Base): + """ + Stop the robot's motion. + """ + await base.stop() + +# Implement your custom control logic module +class LineFollowerModule(Module, LineFollowerAPI): + MODEL = Model("acme", "control", "line_follower_module") + + def __init__(self, name: str): + super().__init__(name) + self.camera: Camera = None + self.base: Base = None + self.detector: VisionClient = None + self._running_loop = False + self._loop_task = None + + # Speed parameters (can be configured via module config if desired) + self.linear_power = 0.35 + self.angular_power = 0.3 + + @classmethod + def new_resource(cls, config: ComponentConfig): + return cls(config.name) + + async def start(self): + """ + Called when the module starts. Get references to components. + """ + print(f"LineFollowerModule '{self.name}' starting...") + # Access components directly from the robot object provided by the module framework + self.camera = await Camera.from_robot(self.robot, "my_camera") + self.base = await Base.from_robot(self.robot, "scuttlebase") + # Replace "green_detector" with your actual vision service name + self.detector = await VisionClient.from_robot(self.robot, "my_line_detector") + print(f"LineFollowerModule '{self.name}' started.") + + async def close(self): + """ + Called when the module is shutting down. Clean up tasks. + """ + print(f"LineFollowerModule '{self.name}' closing...") + await self.stop_line_following() + print(f"LineFollowerModule '{self.name}' closed.") + + async def _line_follower_loop(self): + """ + The core line following control logic loop. + """ + print("Line follower control loop started.") + counter = 0 # counter to increase robustness + + while self._running_loop and counter <= 3: + try: + if await is_color_in_front(self.camera, self.detector): + print("going straight") + # Moves the base slowly forward in a straight line + await self.base.set_power(Vector3(y=self.linear_power), Vector3()) + counter = 0 + # If there is green to the left, turns the base left at a continuous, slow speed + elif await is_color_there(self.camera, self.detector, "left"): + print("going left") + await self.base.set_power(Vector3(), Vector3(z=self.angular_power)) + counter = 0 + # If there is green to the right, turns the base right at a continuous, slow speed + elif await is_color_there(self.camera, self.detector, "right"): + print("going right") + await self.base.set_power(Vector3(), Vector3(z=-self.angular_power)) + counter = 0 + else: + print(f"No color detected, counter: {counter}") + counter += 1 + # Optionally, stop or slow down if no color is detected + await self.base.stop() + + except Exception as e: + print(f"Error in line follower loop: {e}") + + await asyncio.sleep(0.05) # Adjust sleep time for desired loop frequency + + print("The path is behind us and forward is only open wasteland.") + await stop_robot(self.base) # Stop the robot when the loop finishes + self._running_loop = False # Ensure loop state is reset + + async def start_line_following(self): + """ + Starts the background loop for line following. + """ + if not self._running_loop: + self._running_loop = True + self._loop_task = asyncio.create_task(self._line_follower_loop()) + print("Requested to start line following loop.") + else: + print("Line following loop is already running.") + + async def stop_line_following(self): + """ + Stops the background loop for line following. + """ + if self._running_loop: + self._running_loop = False + if self._loop_task: + await self._loop_task # Wait for the task to complete its current iteration and exit + self._loop_task = None + print("Requested to stop line following loop.") + else: + print("Line following loop is not running.") + +# Register your module +Registry.register_resource_creator( + LineFollowerAPI.SUBTYPE, + LineFollowerModule.MODEL, + ResourceCreatorRegistration(LineFollowerModule.new_resource, LineFollowerModule.validate_config) +) + +async def main(): + """ + Main entry point for the Viam module. + """ + await Module.serve() + +if __name__ == "__main__": + asyncio.run(main()) + print("Done.") +``` + +## Follow a colored object + +You can use a vision service and a motor to program a machine to follow an object. + +### Prerequisites + +- An SBC, for example a Raspberry Pi 4 +- A wheeled base component such as a [SCUTTLE robot](https://www.scuttlerobot.org/shop/) +- A webcam +- Colored tape, to create a path for your robot + +### Configure your machine + +Follow the [setup guide](/operate/get-started/setup/) to create a new machine. + +Connect your SCUTTLE base to your SBC. +Add the following `components` configuration to create board, base, and motor components in Viam so you can control your SCUTTLE base: + +```json +{ + "name": "my-board", + "model": "pi", + "api": "rdk:component:board", + "attributes": {}, + "depends_on": [] +}, +{ + "name": "leftm", + "model": "gpio", + "api": "rdk:component:motor", + "attributes": { + "pins": { + "a": "15", + "b": "16" + }, + "board": "my-board", + "max_rpm": 200 + }, + "depends_on": ["my-board"] +}, +{ + "name": "rightm", + "model": "gpio", + "api": "rdk:component:motor", + "attributes": { + "pins": { + "b": "11", + "dir": "", + "pwm": "", + "a": "12" + }, + "board": "my-board", + "max_rpm": 200 + }, + "depends_on": ["my-board"] +}, +{ + "name": "my_base", + "model": "wheeled", + "api": "rdk:component:base", + "attributes": { + "width_mm": 400, + "wheel_circumference_mm": 258, + "left": ["leftm"], + "right": ["rightm"] + }, + "depends_on": ["leftm", "rightm"] +} +``` + +Connect your webcam to your SBC. +Add the following `components` configuration for your webcam: + +```json +{ + "name": "my_camera", + "model": "webcam", + "api": "rdk:component:camera", + "attributes": { + "video_path": "" + } +} +``` + +Add the following `services` configuration, replacing the `detect_color` value with the color of your object: + +```json +{ + "name": "my_color_detector", + "api": "rdk:service:vision", + "model": "my_object_detector", + "attributes": { + "segment_size_px": 100, + "detect_color": "#a13b4c", // replace with the color of your object + "hue_tolerance_pct": 0.06 + } +}, +``` + +### Code + +```python {class="line-numbers linkable-line-numbers"} +import asyncio +from typing import List, Literal + +from viam.robot.client import RobotClient +from viam.components.base import Base +from viam.components.camera import Camera +from viam.services.vision import VisionClient +from viam.media.utils.pil import pil_to_viam_image, viam_to_pil_image +from viam.module.module import Module +from viam.resource.types import Model, Subtype +from viam.resource.base import ResourceBase +from viam.resource.registry import Registry, ResourceCreatorRegistration +from viam.proto.app.v1 import ComponentConfig +from viam.services.vision import Detection + +class ObjectTrackingBaseAPI(ResourceBase): + """ + ObjectTrackingBaseAPI represents a custom API for controlling a base based on object tracking. + """ + SUBTYPE = Subtype("example-namespace", "example-module", "object_tracking_base") + + async def start_object_tracking(self): + raise NotImplementedError + + async def stop_object_tracking(self): + raise NotImplementedError + +def leftOrRight(detections: List[Detection], midpoint: float) -> Literal[0, 1, 2, -1]: + """ + Get largest detection box and see if its center is in the left, center, or right third. + Returns 0 for left, 1 for center, 2 for right, -1 if nothing detected. + """ + largest_area = 0 + largest_detection: Detection = None # Initialize with None or a default Detection object + + if not detections: + print("nothing detected :(") + return -1 + + for d in detections: + # Calculate area using x_max, x_min, y_max, y_min + area = (d.x_max - d.x_min) * (d.y_max - d.y_min) + if area > largest_area: + largest_area = area + largest_detection = d + + if largest_detection is None: # Should not happen if detections is not empty, but for safety + return -1 + + # Calculate center X of the largest detection + centerX = largest_detection.x_min + (largest_detection.x_max - largest_detection.x_min) / 2 + + # Determine if center is left, center, or right + if centerX < midpoint - midpoint / 6: + return 0 # on the left + elif centerX > midpoint + midpoint / 6: + return 2 # on the right + else: + return 1 # basically centered + +# Implement your custom control logic module +class ObjectTrackingBaseModule(Module, ObjectTrackingBaseAPI): + MODEL = Model("acme", "control", "object_tracking_base_module") + + def __init__(self, name: str): + super().__init__(name) + self.base: Base = None + self.camera: Camera = None + self.detector: VisionClient = None + self.camera_name: str = "my_camera" # Default camera name, adjust in config if needed + + self._running_loop = False + self._loop_task = None + + # Control parameters (can be configured via module config if desired) + self.spin_num = 10 # when turning, spin the motor this much + self.straight_num = 300 # when going straight, spin motor this much + self.vel = 500 # go this fast when moving motor + self.num_cycles = 200 # run the loop X times (module will run indefinitely if _running_loop is True) + + @classmethod + def new_resource(cls, config: ComponentConfig): + # You can parse attributes from the config here if you want to make + # camera_name, spin_num, etc. configurable from the Viam app. + # For simplicity, using defaults/hardcoded names for now. + return cls(config.name) + + async def start(self): + """ + Called when the module starts. Get references to components. + """ + print(f"ObjectTrackingBaseModule '{self.name}' starting...") + # Access components directly from the robot object provided by the module framework + self.base = await Base.from_robot(self.robot, "my_base") + self.camera = await Camera.from_robot(self.robot, self.camera_name) + # Replace "my_color_detector" with your actual vision service name + self.detector = await VisionClient.from_robot(self.robot, "my_object_detector") + print(f"ObjectTrackingBaseModule '{self.name}' started.") + + async def close(self): + """ + Called when the module is shutting down. Clean up tasks. + """ + print(f"ObjectTrackingBaseModule '{self.name}' closing...") + await self.stop_object_tracking() + print(f"ObjectTrackingBaseModule '{self.name}' closed.") + + async def _object_tracking_loop(self): + """ + The core object tracking and base control logic loop. + """ + print("Object tracking control loop started.") + + # Get initial frame to determine midpoint for detection logic + # This assumes the camera resolution doesn't change during operation + initial_frame = await self.camera.get_image(mime_type="image/jpeg") + pil_initial_frame = viam_to_pil_image(initial_frame) + midpoint = pil_initial_frame.size[0] / 2 + + cycle_count = 0 + while self._running_loop and (self.num_cycles == 0 or cycle_count < self.num_cycles): + try: + detections = await self.detector.get_detections_from_camera(self.camera_name) + + answer = leftOrRight(detections, midpoint) + + if answer == 0: + print("Detected object on left, spinning left.") + await self.base.spin(self.spin_num, self.vel) # CCW is positive + await self.base.move_straight(self.straight_num, self.vel) + elif answer == 1: + print("Detected object in center, moving straight.") + await self.base.move_straight(self.straight_num, self.vel) + elif answer == 2: + print("Detected object on right, spinning right.") + await self.base.spin(-self.spin_num, self.vel) # CW is negative + await self.base.move_straight(self.straight_num, self.vel) + else: # answer == -1 (nothing detected) + print("No object detected, stopping base.") + await self.base.stop() # Stop if nothing is detected + + except Exception as e: + print(f"Error in object tracking loop: {e}") + + cycle_count += 1 + await asyncio.sleep(0.1) # Small delay to prevent busy-waiting + + print("Object tracking loop finished or stopped.") + await self.base.stop() # Ensure base stops when loop ends + self._running_loop = False # Reset state + + async def start_object_tracking(self): + """ + Starts the background loop for object tracking and base control. + """ + if not self._running_loop: + self._running_loop = True + self._loop_task = asyncio.create_task(self._object_tracking_loop()) + print("Requested to start object tracking loop.") + else: + print("Object tracking loop is already running.") + + async def stop_object_tracking(self): + """ + Stops the background loop for object tracking and base control. + """ + if self._running_loop: + self._running_loop = False + if self._loop_task: + await self._loop_task # Wait for the task to complete its current iteration and exit + self._loop_task = None + print("Requested to stop object tracking loop.") + else: + print("Object tracking loop is not running.") + +# Register your module +Registry.register_resource_creator( + ObjectTrackingBaseAPI.SUBTYPE, + ObjectTrackingBaseModule.MODEL, + ResourceCreatorRegistration(ObjectTrackingBaseModule.new_resource, ObjectTrackingBaseModule.validate_config) +) + +async def main(): + """ + Main entry point for the Viam module. + """ + await Module.serve() + +if __name__ == "__main__": + asyncio.run(main()) + print("Done.") +``` diff --git a/docs/data-ai/reference/apis/_index.md b/docs/data-ai/reference/apis/_index.md new file mode 100644 index 0000000000..4e279ce2f8 --- /dev/null +++ b/docs/data-ai/reference/apis/_index.md @@ -0,0 +1,8 @@ +--- +linkTitle: "APIs" +title: "APIs" +weight: 30 +layout: "empty" +type: "docs" +empty_node: true +--- diff --git a/docs/data-ai/reference/data-client.md b/docs/data-ai/reference/apis/data-client.md similarity index 84% rename from docs/data-ai/reference/data-client.md rename to docs/data-ai/reference/apis/data-client.md index e283551be4..fd21d77239 100644 --- a/docs/data-ai/reference/data-client.md +++ b/docs/data-ai/reference/apis/data-client.md @@ -1,6 +1,6 @@ --- title: "Upload and retrieve data with Viam's data client API" -linkTitle: "Data client API" +linkTitle: "Data client" weight: 30 type: "docs" layout: "empty" diff --git a/docs/data-ai/reference/data-management-client.md b/docs/data-ai/reference/apis/data-management-client.md similarity index 79% rename from docs/data-ai/reference/data-management-client.md rename to docs/data-ai/reference/apis/data-management-client.md index 9b25725d0b..154075a0e3 100644 --- a/docs/data-ai/reference/data-management-client.md +++ b/docs/data-ai/reference/apis/data-management-client.md @@ -1,6 +1,6 @@ --- title: "Data management API" -linkTitle: "Data management API" +linkTitle: "Data management" weight: 30 type: "docs" layout: "empty" diff --git a/docs/data-ai/reference/ml-model-client.md b/docs/data-ai/reference/apis/ml-model-client.md similarity index 81% rename from docs/data-ai/reference/ml-model-client.md rename to docs/data-ai/reference/apis/ml-model-client.md index beeb82c808..bc23270295 100644 --- a/docs/data-ai/reference/ml-model-client.md +++ b/docs/data-ai/reference/apis/ml-model-client.md @@ -1,6 +1,6 @@ --- title: "ML model API" -linkTitle: "ML model API" +linkTitle: "ML model" weight: 30 type: "docs" layout: "empty" diff --git a/docs/data-ai/reference/ml-training-client.md b/docs/data-ai/reference/apis/ml-training-client.md similarity index 81% rename from docs/data-ai/reference/ml-training-client.md rename to docs/data-ai/reference/apis/ml-training-client.md index 60053e550e..abb7edd607 100644 --- a/docs/data-ai/reference/ml-training-client.md +++ b/docs/data-ai/reference/apis/ml-training-client.md @@ -1,6 +1,6 @@ --- title: "Work with ML training jobs with Viam's ML training API" -linkTitle: "ML training client API" +linkTitle: "ML training client" weight: 40 type: "docs" layout: "empty" diff --git a/docs/data-ai/reference/vision-client.md b/docs/data-ai/reference/apis/vision-client.md similarity index 79% rename from docs/data-ai/reference/vision-client.md rename to docs/data-ai/reference/apis/vision-client.md index d28e356b3e..fdac1ddd3c 100644 --- a/docs/data-ai/reference/vision-client.md +++ b/docs/data-ai/reference/apis/vision-client.md @@ -1,6 +1,6 @@ --- title: "Vision service API" -linkTitle: "Vision service API" +linkTitle: "Vision service" weight: 30 type: "docs" layout: "empty" diff --git a/docs/data-ai/reference/architecture.md b/docs/data-ai/reference/architecture.md index 17dfcd3acc..a91182e328 100644 --- a/docs/data-ai/reference/architecture.md +++ b/docs/data-ai/reference/architecture.md @@ -1,7 +1,7 @@ --- linkTitle: "Machine-cloud architecture" title: "Viam architecture" -weight: 1000 +weight: 20 layout: "docs" type: "docs" layout: "empty" diff --git a/docs/data-ai/reference/mlmodel-design.md b/docs/data-ai/reference/mlmodel-design.md index 44c8a54384..2b5598c304 100644 --- a/docs/data-ai/reference/mlmodel-design.md +++ b/docs/data-ai/reference/mlmodel-design.md @@ -1,7 +1,7 @@ --- title: "Design your ML models for vision" linkTitle: "ML model service design" -weight: 60 +weight: 10 type: "docs" tags: ["data management", "ml", "model training", "vision"] description: "Design your ML Model service to work with Viam's vision services." diff --git a/docs/data-ai/reference/triggers-configuration.md b/docs/data-ai/reference/triggers-configuration.md index aa094b7d25..aa37fb6220 100644 --- a/docs/data-ai/reference/triggers-configuration.md +++ b/docs/data-ai/reference/triggers-configuration.md @@ -1,7 +1,7 @@ --- title: "Trigger configuration" linkTitle: "Trigger configuration" -weight: 60 +weight: 20 type: "docs" tags: ["data management", "trigger", "webhook"] description: "Detailed information about how to configure triggers and webhooks." diff --git a/docs/data-ai/train/train-tflite.md b/docs/data-ai/train/train-tflite.md index 30045ec4d6..4b5de1ab4c 100644 --- a/docs/data-ai/train/train-tflite.md +++ b/docs/data-ai/train/train-tflite.md @@ -167,8 +167,9 @@ To capture images of edge cases and re-train your model using those images, comp ## Next steps -Now your machine can make inferences about its environment. -The next step is to [deploy](/data-ai/ai/deploy/) the ML model and then [act](/data-ai/ai/act/) or [alert](/data-ai/ai/alert/) based on these inferences. +Now you can [deploy](/data-ai/ai/deploy/) your ML model. +Once deployed, you can use your ML model to make inferences on your machine. +Then, you [alert](/data-ai/ai/alert/) or even [make decisions](/data-ai/ai/make-decisions-autonomously/) based on these inferences. See the following tutorials for examples of using machine learning models to make your machine do things based on its inferences about its environment: diff --git a/docs/data-ai/train/upload-external-data.md b/docs/data-ai/train/upload-external-data.md index 457ef41077..658602b1d1 100644 --- a/docs/data-ai/train/upload-external-data.md +++ b/docs/data-ai/train/upload-external-data.md @@ -15,7 +15,7 @@ aliases: - /data-ai/ai/advanced/ date: "2024-12-04" description: "Upload data to Viam from your local computer or mobile device using the data client API, Viam CLI, or Viam mobile app." -prev: "/data-ai/ai/act/" +prev: "/data-ai/ai/make-decisions-autonomously/" --- When you configure the data management service, Viam automatically uploads data from the default directory `~/.viam/capture` and any directory you configured. diff --git a/docs/operate/mobility/use-input-to-act.md b/docs/operate/mobility/use-input-to-act.md index c99ca5416f..ec02227cbb 100644 --- a/docs/operate/mobility/use-input-to-act.md +++ b/docs/operate/mobility/use-input-to-act.md @@ -51,7 +51,7 @@ readings = await my_sensor.get_readings() Other common inputs include the methods of a [board](/dev/reference/apis/components/board/) (`GetGPIO`, `GetPWM`, `PWMFrequency`, `GetDigitalInterruptValue`, and `ReadAnalogReader`), or a [power sensor](/dev/reference/apis/components/power-sensor/) (`GetVoltage`, `GetCurrent`, `GetPower`, and `GetReadings`). You can also use camera input, for example to detect objects and pick them up with an arm. -See [Act based on inferences](/data-ai/ai/act/) for relevant examples. +See [Make decisions autonomously](/data-ai/ai/make-decisions-autonomously/) for relevant examples. If you want to send alerts based on computer vision or captured data, see [Alert on inferences](/data-ai/ai/alert/) or [Alert on data](/data-ai/data/advanced/alert-data/). diff --git a/docs/tutorials/services/webcam-line-follower-robot.md b/docs/tutorials/services/webcam-line-follower-robot.md index dd73fdc79e..be3416e4a2 100644 --- a/docs/tutorials/services/webcam-line-follower-robot.md +++ b/docs/tutorials/services/webcam-line-follower-robot.md @@ -219,46 +219,46 @@ Next, navigate to the **CONFIGURE** tab of your machine's page. 1. **Add a vision service.** -Next, add a vision service [detector](/dev/reference/apis/services/vision/#detections): + Next, add a vision service [detector](/dev/reference/apis/services/vision/#detections): -Click the **+** (Create) icon next to your machine part in the left-hand menu and select **Component or service**. -Select type `vision` and model `color detector`. -Enter `green_detector` for the name, then click **Create**. - -In your vision service’s panel, select the color your vision service will be detecting, as well as a hue tolerance and a segment size (in pixels). -Use a color picker like [colorpicker.me](https://colorpicker.me/) to approximate the color of your line and get the corresponding rgb or hex value. -We used `rgb(25,255,217)` or `#19FFD9` to match the color of our green electrical tape, and specified a segment size of 100 pixels with a tolerance of 0.06, but you can tweak these later to fine tune your line follower. - -2. Click **Save** in the top right corner of the screen. + Click the **+** (Create) icon next to your machine part in the left-hand menu and select **Component or service**. + Select type `vision` and model `color detector`. + Enter `green_detector` for the name, then click **Create**. -3. (optional) **Add a `transform` camera as a visualizer** + In your vision service’s panel, select the color your vision service will be detecting, as well as a hue tolerance and a segment size (in pixels). + Use a color picker like [colorpicker.me](https://colorpicker.me/) to approximate the color of your line and get the corresponding rgb or hex value. + We used `rgb(25,255,217)` or `#19FFD9` to match the color of our green electrical tape, and specified a segment size of 100 pixels with a tolerance of 0.06, but you can tweak these later to fine tune your line follower. -If you'd like to see the bounding boxes that the color detector identifies in a live stream, you'll need to configure a [transform camera](/operate/reference/components/camera/transform/). -This isn't another piece of hardware, but rather a virtual "camera" that takes in the stream from the webcam we just configured and outputs a stream overlaid with bounding boxes representing the color detections. +1. Click **Save** in the top right corner of the screen. -Click the **+** (Create) icon next to your machine part in the left-hand menu and select **Component or service**. -Add a [transform camera](/operate/reference/components/camera/transform/) with type `camera` and model `transform`. -Name it `transform_cam` and click **Create**. +1. (optional) **Add a `transform` camera as a visualizer** -Click **{}** (Switch to advanced) in the top right of the camera's configuration panel to switch to advanced mode. -Replace the attributes JSON object (`{}`) with the following object which specifies the camera source that the `transform` camera will be using and defines a pipeline that adds the defined `detector`: + If you'd like to see the bounding boxes that the color detector identifies in a live stream, you'll need to configure a [transform camera](/operate/reference/components/camera/transform/). + This isn't another piece of hardware, but rather a virtual "camera" that takes in the stream from the webcam we just configured and outputs a stream overlaid with bounding boxes representing the color detections. -```json -{ - "source": "my_camera", - "pipeline": [ - { - "type": "detections", - "attributes": { - "detector_name": "green_detector", - "confidence_threshold": 0.6 - } - } - ] -} -``` + Click the **+** (Create) icon next to your machine part in the left-hand menu and select **Component or service**. + Add a [transform camera](/operate/reference/components/camera/transform/) with type `camera` and model `transform`. + Name it `transform_cam` and click **Create**. + + Click **{}** (Switch to advanced) in the top right of the camera's configuration panel to switch to advanced mode. + Replace the attributes JSON object (`{}`) with the following object which specifies the camera source that the `transform` camera will be using and defines a pipeline that adds the defined `detector`: + + ```json + { + "source": "my_camera", + "pipeline": [ + { + "type": "detections", + "attributes": { + "detector_name": "green_detector", + "confidence_threshold": 0.6 + } + } + ] + } + ``` -4. Click **Save** in the top right corner of the screen. +1. Click **Save** in the top right corner of the screen. {{% /tab %}} {{% tab name="JSON" %}} @@ -393,7 +393,7 @@ To make your rover follow your line, you need to install Python and the Viam Pyt python3 --version ``` -2. Install the [Viam Python SDK](https://python.viam.dev/) by running +1. Install the [Viam Python SDK](https://python.viam.dev/) by running ```sh {class="command-line" data-prompt="$"} pip install viam-sdk @@ -429,7 +429,7 @@ To make your rover follow your line, you need to install Python and the Viam Pyt 1. In your Pi terminal, navigate to the directory where you’d like to save your code. Run, nano rgb_follower.py (or replace rgb_follower with the your desired filename). - 2. Paste all your code into this file. + 1. Paste all your code into this file. Press **CTRL + X** to close the file. Type **Y** to confirm file modification, then press enter to finish. @@ -520,11 +520,11 @@ The code you are using has several functions: The `main` function connects to the robot and initializes each component, then performs the following tasks: 1. If the color of the line is detected in the top center of the camera frame, the rover drives forward. -2. If the color is not detected in the top center, it checks the left side of the camera frame for the color. +1. If the color is not detected in the top center, it checks the left side of the camera frame for the color. If it detects the color on the left, the robot turns left. If it doesn’t detect the color on the left, it checks the right side of the camera frame, and turns right if it detects the color. -3. Once the line is back in the center front of the camera frame, the rover continues forward. -4. When the rover no longer sees any of the line color anywhere in the front portion of the camera frame, it stops and the program ends. +1. Once the line is back in the center front of the camera frame, the rover continues forward. +1. When the rover no longer sees any of the line color anywhere in the front portion of the camera frame, it stops and the program ends. ```python {class="line-numbers linkable-line-numbers"} async def main(): @@ -577,7 +577,7 @@ async def main(): To run the program: 1. Position the rover so that its camera can see the colored line. -2. If you have saved the code on your Pi, SSH into it by running: +1. If you have saved the code on your Pi, SSH into it by running: ```sh {class="command-line" data-prompt="$"} ssh @.local @@ -604,8 +604,8 @@ Along the way, you have learned how to configure a wheeled base, camera, and col If you are wondering what to do next, why not try one of the following ideas: 1. Automatically detect what color line the robot is on and follow that. -2. Use two differently colored lines that intersect and make the robot switch from one line to the other. -3. Put two rovers on intersecting lines and write code to keep them from crashing into each other. +1. Use two differently colored lines that intersect and make the robot switch from one line to the other. +1. Put two rovers on intersecting lines and write code to keep them from crashing into each other. ## Troubleshooting From 14820426e831d994ade6013179e02ad49fd97614 Mon Sep 17 00:00:00 2001 From: nathan contino Date: Wed, 16 Jul 2025 16:27:42 -0400 Subject: [PATCH 02/15] WIP: third guide --- .../data-ai/ai/make-decisions-autonomously.md | 249 +++++++++++++++++- 1 file changed, 244 insertions(+), 5 deletions(-) diff --git a/docs/data-ai/ai/make-decisions-autonomously.md b/docs/data-ai/ai/make-decisions-autonomously.md index 9c4ae9e46d..46d8ddcb7b 100644 --- a/docs/data-ai/ai/make-decisions-autonomously.md +++ b/docs/data-ai/ai/make-decisions-autonomously.md @@ -10,11 +10,11 @@ aliases: - /data-ai/ai/act/ --- -Combine the [vision service API](/dev/reference/apis/services/vision/) and [component APIs](/dev/reference/apis/#component-apis) to interpret, then react to inferences. +Use the [vision service API](/dev/reference/apis/services/vision/) to make inferences, then use [component APIs](/dev/reference/apis/#component-apis) to react to inferences with a machine. ## Follow a line -You can use a vision service and a motor to program a machine to follow a line. +This module uses a vision service and a motor to program a machine to follow a line. ### Prerequisites @@ -201,7 +201,7 @@ async def stop_robot(base: Base): # Implement your custom control logic module class LineFollowerModule(Module, LineFollowerAPI): - MODEL = Model("acme", "control", "line_follower_module") + MODEL = Model("example-namespace", "example-module", "line_follower_module") def __init__(self, name: str): super().__init__(name) @@ -322,7 +322,7 @@ if __name__ == "__main__": ## Follow a colored object -You can use a vision service and a motor to program a machine to follow an object. +This module uses a vision service and a motor to program a machine to follow an object. ### Prerequisites @@ -484,7 +484,7 @@ def leftOrRight(detections: List[Detection], midpoint: float) -> Literal[0, 1, 2 # Implement your custom control logic module class ObjectTrackingBaseModule(Module, ObjectTrackingBaseAPI): - MODEL = Model("acme", "control", "object_tracking_base_module") + MODEL = Model("example-namespace", "example-module", "object_tracking_base_module") def __init__(self, name: str): super().__init__(name) @@ -614,3 +614,242 @@ if __name__ == "__main__": asyncio.run(main()) print("Done.") ``` + +### Notify when a certain object appears in a video feed + +This module uses a vision service to program a machine to send a notification when a certain object appears in a video feed. + +### Prerequisites + +- An SBC, for example a Raspberry Pi 4 +- A webcam + +### Configure your machine + +Follow the [setup guide](/operate/get-started/setup/) to create a new machine. + +Connect your SCUTTLE base to your SBC. +Add the following `components` configuration: + +```json + +``` + +Connect your webcam to your SBC. +Add the following `components` configuration for your webcam: + +```json +{ + "name": "my_camera", + "model": "webcam", + "api": "rdk:component:camera", + "attributes": { + "video_path": "" + } +} +``` + + +### Code + + +```python +import asyncio +import os +from typing import List, Literal, Mapping, Any + +from viam.robot.client import RobotClient +from viam.components.camera import Camera +from viam.services.vision import VisionClient +from viam.media.utils.pil import pil_to_viam_image, viam_to_pil_image +from viam.module.module import Module +from viam.resource.types import Model, Subtype +from viam.resource.base import ResourceBase +from viam.resource.registry import Registry, ResourceCreatorRegistration +from viam.proto.app.v1 import ComponentConfig +from viam.services.vision import Detection +from viam.services.generic import Generic +import smtplib +from email.mime.text import MIMEText + +class EmailNotifierModule(Module, Generic): + MODEL = Model("example-namespace", "example-module", "email_notifier_generic") + + def __init__(self, name: str): + super().__init__(name) + self.camera: Camera = None + self.detector: VisionClient = None + self.camera_name: str = "my_camera" # Default camera name, adjust in config if needed + self.detector_name: str = "my_object_detector" # Default vision service name + self.target_object_name: str = "person" # The object to detect for notification + + # Email configuration (sensitive info should ideally be managed securely, e.g., environment variables) + self.sender_email: str = os.getenv("SENDER_EMAIL", "your_email@example.com") + self.sender_password: str = os.getenv("SENDER_PASSWORD", "your_email_password") + self.receiver_email: str = os.getenv("RECEIVER_EMAIL", "recipient_email@example.com") + self.smtp_server: str = os.getenv("SMTP_SERVER", "smtp.example.com") + self.smtp_port: int = int(os.getenv("SMTP_PORT", 587)) # Typically 587 for TLS + + self._running_loop = False + self._loop_task = None + self._notification_sent = False + + @classmethod + def new_resource(cls, config: ComponentConfig): + # Parse attributes from the config here to make them configurable + module = cls(config.name) + if "camera_name" in config.attributes.fields: + module.camera_name = config.attributes.fields["camera_name"].string_value + if "detector_name" in config.attributes.fields: + module.detector_name = config.attributes.fields["detector_name"].string_value + if "target_object_name" in config.attributes.fields: + module.target_object_name = config.attributes.fields["target_object_name"].string_value + + # Email configuration can also be set via config, but environment variables are often preferred for secrets + if "sender_email" in config.attributes.fields: + module.sender_email = config.attributes.fields["sender_email"].string_value + if "sender_password" in config.attributes.fields: + module.sender_password = config.attributes.fields["sender_password"].string_value + if "receiver_email" in config.attributes.fields: + module.receiver_email = config.attributes.fields["receiver_email"].string_value + if "smtp_server" in config.attributes.fields: + module.smtp_server = config.attributes.fields["smtp_server"].string_value + if "smtp_port" in config.attributes.fields: + module.smtp_port = int(config.attributes.fields["smtp_port"].number_value) + + return module + + async def start(self): + """ + Called when the module starts. Get references to components. + """ + print(f"EmailNotifierModule '{self.name}' starting...") + self.camera = await Camera.from_robot(self.robot, self.camera_name) + self.detector = await VisionClient.from_robot(self.robot, self.detector_name) + print(f"EmailNotifierModule '{self.name}' started. Monitoring for '{self.target_object_name}'.") + + async def close(self): + """ + Called when the module is shutting down. Clean up tasks. + """ + print(f"EmailNotifierModule '{self.name}' closing...") + await self._stop_detection_monitoring_internal() # Call internal stop method + print(f"EmailNotifierModule '{self.name}' closed.") + + def _send_email(self, subject: str, body: str): + """ + Helper function to send an email. + """ + try: + msg = MIMEText(body) + msg['Subject'] = subject + msg['From'] = self.sender_email + msg['To'] = self.receiver_email + + with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: + server.starttls() # Secure the connection + server.login(self.sender_email, self.sender_password) + server.send_message(msg) + print(f"Email sent successfully to {self.receiver_email}: '{subject}'") + self._notification_sent = True # Mark that notification has been sent + except Exception as e: + print(f"Failed to send email: {e}") + self._notification_sent = False # Reset if sending failed + + async def _detection_monitoring_loop(self): + """ + The core object detection monitoring and email notification logic loop. + """ + print("Detection monitoring loop started.") + + while self._running_loop: + try: + detections = await self.detector.get_detections_from_camera(self.camera_name) + + object_detected = False + for d in detections: + if d.class_name == self.target_object_name: + object_detected = True + break + + if object_detected and not self._notification_sent: + subject = f"Viam Module Alert: {self.target_object_name} Detected!" + body = f"A {self.target_object_name} was detected by the vision service '{self.detector_name}' on camera '{self.camera_name}'." + print(f"Detected '{self.target_object_name}'. Sending email notification...") + self._send_email(subject, body) + elif not object_detected and self._notification_sent: + # Reset notification status if the object is no longer detected, + # allowing another notification if it reappears. + print(f"'{self.target_object_name}' no longer detected. Resetting notification status.") + self._notification_sent = False + elif object_detected and self._notification_sent: + print(f"'{self.target_object_name}' still detected, but notification already sent.") + else: # not object_detected and not self._notification_sent + print(f"'{self.target_object_name}' not detected.") + + except Exception as e: + print(f"Error in detection monitoring loop: {e}") + + await asyncio.sleep(5) # Check every 5 seconds + + print("Detection monitoring loop finished or stopped.") + self._notification_sent = False # Reset state when loop stops + + async def _start_detection_monitoring_internal(self): + """ + Internal method to start the background loop. + """ + if not self._running_loop: + self._running_loop = True + self._loop_task = asyncio.create_task(self._detection_monitoring_loop()) + print("Requested to start detection monitoring loop.") + return {"status": "started"} + else: + print("Detection monitoring loop is already running.") + return {"status": "already_running"} + + async def _stop_detection_monitoring_internal(self): + """ + Internal method to stop the background loop. + """ + if self._running_loop: + self._running_loop = False + if self._loop_task: + await self._loop_task # Wait for the task to complete its current iteration and exit + self._loop_task = None + print("Requested to stop detection monitoring loop.") + return {"status": "stopped"} + else: + print("Detection monitoring loop is not running.") + return {"status": "not_running"} + + async def do_command(self, command: Mapping[str, Any], *, timeout: float | None = None, **kwargs) -> Mapping[str, Any]: + """ + Implement the do_command method to expose custom functionality. + """ + if "start_monitoring" in command: + print("Received 'start_monitoring' command via do_command.") + return await self._start_detection_monitoring_internal() + elif "stop_monitoring" in command: + print("Received 'stop_monitoring' command via do_command.") + return await self._stop_detection_monitoring_internal() + else: + raise NotImplementedError(f"Command '{command}' not recognized.") + +# Register your module +Registry.register_resource_creator( + Generic.SUBTYPE, # Register as a Generic service + EmailNotifierModule.MODEL, + ResourceCreatorRegistration(EmailNotifierModule.new_resource, EmailNotifierModule.validate_config) +) + +async def main(): + """ + Main entry point for the Viam module. + """ + await Module.serve() + +if __name__ == "__main__": + asyncio.run(main()) + print("Done.") +``` \ No newline at end of file From d67b0ce6511d5dc09e8d5a4468cc9a8c63e83592 Mon Sep 17 00:00:00 2001 From: nathan contino Date: Thu, 17 Jul 2025 16:31:50 -0400 Subject: [PATCH 03/15] Refactor code samples to use generic service api --- .../data-ai/ai/make-decisions-autonomously.md | 394 ++++++++---------- 1 file changed, 170 insertions(+), 224 deletions(-) diff --git a/docs/data-ai/ai/make-decisions-autonomously.md b/docs/data-ai/ai/make-decisions-autonomously.md index 46d8ddcb7b..a9e2ad02c5 100644 --- a/docs/data-ai/ai/make-decisions-autonomously.md +++ b/docs/data-ai/ai/make-decisions-autonomously.md @@ -115,93 +115,23 @@ Finally, add the following `services` configuration for your vision service, rep ```python {class="line-numbers linkable-line-numbers"} import asyncio -from typing import Literal +from typing import Any, Mapping, Sequence, Tuple +from typing_extensions import Self -from viam.media.video import CameraMimeType -from viam.robot.client import RobotClient -from viam.components.base import Base, Vector3 from viam.components.camera import Camera -from viam.services.vision import VisionClient -from viam.media.utils.pil import pil_to_viam_image, viam_to_pil_image +from viam.logging import getLogger from viam.module.module import Module -from viam.resource.types import Model, Subtype +from viam.resource.types import Model, ModelFamily from viam.resource.base import ResourceBase from viam.resource.registry import Registry, ResourceCreatorRegistration -from viam.proto.app.v1 import ComponentConfig - -class LineFollowerAPI(ResourceBase): - """ - LineFollowerAPI represents a custom API for controlling a base based on vision. - """ - SUBTYPE = Subtype("example-namespace", "example-module", "line_follower") - - async def start_line_following(self): - raise NotImplementedError - - async def stop_line_following(self): - raise NotImplementedError - -async def is_color_in_front(camera: Camera, detector: VisionClient): - """ - Returns whether the appropriate path color is detected in front of the center of the robot. - """ - frame = viam_to_pil_image(await camera.get_image(mime_type=CameraMimeType.JPEG)) - - x, y = frame.size[0], frame.size[1] - - # Crop the image to get only the middle fifth of the top third of the original image - cropped_frame = frame.crop((x / 2.5, 0, x / 1.25, y / 3)) - - detections = await detector.get_detections( - pil_to_viam_image(cropped_frame, CameraMimeType.JPEG) - ) - - if detections: # Check if the list is not empty - return True - return False - - -async def is_color_there( - camera: Camera, detector: VisionClient, location: Literal["left", "right"] -): - """ - Returns whether the appropriate path color is detected to the left/right of the robot's front. - """ - frame = viam_to_pil_image(await camera.get_image(mime_type=CameraMimeType.JPEG)) - x, y = frame.size[0], frame.size[1] - - if location == "left": - # Crop image to get only the left two fifths of the original image - cropped_frame = frame.crop((0, 0, x / 2.5, y)) - - detections = await detector.get_detections( - pil_to_viam_image(cropped_frame, CameraMimeType.JPEG) - ) - - elif location == "right": - # Crop image to get only the right two fifths of the original image - cropped_frame = frame.crop((x / 1.25, 0, x, y)) - - detections = await detector.get_detections( - pil_to_viam_image(cropped_frame, CameraMimeType.JPEG) - ) - else: - detections = [] # Ensure detections is defined if location is neither 'left' nor 'right' - - if detections: # Check if the list is not empty - return True - return False - - -async def stop_robot(base: Base): - """ - Stop the robot's motion. - """ - await base.stop() +from viam.proto.app.robot import ComponentConfig +from viam.proto.common import ResourceName +from viam.services.vision import VisionClient +from viam.components.base import Base, Vector3 -# Implement your custom control logic module -class LineFollowerModule(Module, LineFollowerAPI): - MODEL = Model("example-namespace", "example-module", "line_follower_module") +class ColorFollowerModule(Module, ResourceBase): + MODEL = Model(ModelFamily("example", "color-follower"), "color-follower-module") + LOGGER = getLogger(__name__) def __init__(self, name: str): super().__init__(name) @@ -210,103 +140,121 @@ class LineFollowerModule(Module, LineFollowerAPI): self.detector: VisionClient = None self._running_loop = False self._loop_task = None - - # Speed parameters (can be configured via module config if desired) self.linear_power = 0.35 self.angular_power = 0.3 @classmethod - def new_resource(cls, config: ComponentConfig): - return cls(config.name) + def new(cls, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]) -> Self: + instance = cls(config.name) + instance.reconfigure(config, dependencies) + return instance + + @classmethod + def validate(cls, config: ComponentConfig) -> Tuple[Sequence[str], Sequence[str]]: + required_attributes = [] + optional_attributes = [ + "camera_name", + "detector_name", + ] + + camera_name = config.attributes.fields["camera_name"].string_value if "camera_name" in config.attributes.fields else "my_camera" + detector_name = config.attributes.fields["detector_name"].string_value if "detector_name" in config.attributes.fields else "my_detector" + + dependencies = [camera_name, detector_name] + return dependencies, [] + + def reconfigure(self, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]): + self.camera_name = config.attributes.fields["camera_name"].string_value if "camera_name" in config.attributes.fields else "my_camera" + self.detector_name = config.attributes.fields["detector_name"].string_value if "detector_name" in config.attributes.fields else "my_detector" + + for dependency_name, dependency in dependencies.items(): + if dependency_name.subtype == "camera" and dependency_name.name == self.camera_name: + self.camera = dependency + elif dependency_name.subtype == "vision" and dependency_name.name == self.detector_name: + self.detector = dependency + + if not self.camera: + raise ValueError(f"Camera '{self.camera_name}' dependency not found.") + if not self.detector: + raise ValueError(f"Vision service '{self.detector_name}' dependency not found.") + + ColorFollowerModule.LOGGER.info("Reconfigured.") async def start(self): - """ - Called when the module starts. Get references to components. - """ - print(f"LineFollowerModule '{self.name}' starting...") - # Access components directly from the robot object provided by the module framework - self.camera = await Camera.from_robot(self.robot, "my_camera") - self.base = await Base.from_robot(self.robot, "scuttlebase") - # Replace "green_detector" with your actual vision service name - self.detector = await VisionClient.from_robot(self.robot, "my_line_detector") - print(f"LineFollowerModule '{self.name}' started.") + ColorFollowerModule.LOGGER.info("Starting color following...") + await self._start_color_following_internal() async def close(self): - """ - Called when the module is shutting down. Clean up tasks. - """ - print(f"LineFollowerModule '{self.name}' closing...") - await self.stop_line_following() - print(f"LineFollowerModule '{self.name}' closed.") + ColorFollowerModule.LOGGER.info("Stopping color following...") + await self._stop_color_following_internal() + ColorFollowerModule.LOGGER.info("Stopped.") - async def _line_follower_loop(self): - """ - The core line following control logic loop. - """ - print("Line follower control loop started.") - counter = 0 # counter to increase robustness + async def _color_following_loop(self): + ColorFollowerModule.LOGGER.info("Color following loop started.") - while self._running_loop and counter <= 3: + while self._running_loop: try: - if await is_color_in_front(self.camera, self.detector): - print("going straight") - # Moves the base slowly forward in a straight line + # Check for color in front + if await self._is_color_in_front(): + ColorFollowerModule.LOGGER.info("Moving forward.") await self.base.set_power(Vector3(y=self.linear_power), Vector3()) - counter = 0 - # If there is green to the left, turns the base left at a continuous, slow speed - elif await is_color_there(self.camera, self.detector, "left"): - print("going left") + # Check for color to the left + elif await self._is_color_there("left"): + ColorFollowerModule.LOGGER.info("Turning left.") await self.base.set_power(Vector3(), Vector3(z=self.angular_power)) - counter = 0 - # If there is green to the right, turns the base right at a continuous, slow speed - elif await is_color_there(self.camera, self.detector, "right"): - print("going right") + # Check for color to the right + elif await self._is_color_there("right"): + ColorFollowerModule.LOGGER.info("Turning right.") await self.base.set_power(Vector3(), Vector3(z=-self.angular_power)) - counter = 0 else: - print(f"No color detected, counter: {counter}") - counter += 1 - # Optionally, stop or slow down if no color is detected + ColorFollowerModule.LOGGER.info("No color detected. Stopping.") await self.base.stop() except Exception as e: - print(f"Error in line follower loop: {e}") + ColorFollowerModule.LOGGER.error(f"Error in color following loop: {e}") - await asyncio.sleep(0.05) # Adjust sleep time for desired loop frequency + await asyncio.sleep(0.05) - print("The path is behind us and forward is only open wasteland.") - await stop_robot(self.base) # Stop the robot when the loop finishes - self._running_loop = False # Ensure loop state is reset + ColorFollowerModule.LOGGER.info("Color following loop finished.") + await self.base.stop() - async def start_line_following(self): - """ - Starts the background loop for line following. - """ + async def _start_color_following_internal(self): if not self._running_loop: self._running_loop = True - self._loop_task = asyncio.create_task(self._line_follower_loop()) - print("Requested to start line following loop.") + self._loop_task = asyncio.create_task(self._color_following_loop()) + ColorFollowerModule.LOGGER.info("Requested to start color following loop.") else: - print("Line following loop is already running.") + ColorFollowerModule.LOGGER.info("Color following loop is already running.") - async def stop_line_following(self): - """ - Stops the background loop for line following. - """ + async def _stop_color_following_internal(self): if self._running_loop: self._running_loop = False if self._loop_task: - await self._loop_task # Wait for the task to complete its current iteration and exit + await self._loop_task self._loop_task = None - print("Requested to stop line following loop.") - else: - print("Line following loop is not running.") + ColorFollowerModule.LOGGER.info("Requested to stop color following loop.") + + async def _is_color_in_front(self) -> bool: + frame = await self.camera.get_image() + detections = await self.detector.get_detections(frame) + return any(detection.class_name == "target_color" for detection in detections) + + async def _is_color_there(self, location: str) -> bool: + frame = await self.camera.get_image() + if location == "left": + # Crop logic for left side + pass + elif location == "right": + # Crop logic for right side + pass + # Implement detection logic here + detections = await self.detector.get_detections(frame) + return any(detection.class_name == "target_color" for detection in detections) # Register your module Registry.register_resource_creator( - LineFollowerAPI.SUBTYPE, - LineFollowerModule.MODEL, - ResourceCreatorRegistration(LineFollowerModule.new_resource, LineFollowerModule.validate_config) + ColorFollowerModule.MODEL, + ResourceCreatorRegistration(ColorFollowerModule.new, ColorFollowerModule.validate) ) async def main(): @@ -317,7 +265,7 @@ async def main(): if __name__ == "__main__": asyncio.run(main()) - print("Done.") + ColorFollowerModule.LOGGER.info("Done.") ``` ## Follow a colored object @@ -423,9 +371,9 @@ Add the following `services` configuration, replacing the `detect_color` value w ```python {class="line-numbers linkable-line-numbers"} import asyncio -from typing import List, Literal +from typing import Any, Mapping, List, Literal, Sequence, Tuple +from typing_extensions import Self -from viam.robot.client import RobotClient from viam.components.base import Base from viam.components.camera import Camera from viam.services.vision import VisionClient @@ -437,53 +385,7 @@ from viam.resource.registry import Registry, ResourceCreatorRegistration from viam.proto.app.v1 import ComponentConfig from viam.services.vision import Detection -class ObjectTrackingBaseAPI(ResourceBase): - """ - ObjectTrackingBaseAPI represents a custom API for controlling a base based on object tracking. - """ - SUBTYPE = Subtype("example-namespace", "example-module", "object_tracking_base") - - async def start_object_tracking(self): - raise NotImplementedError - - async def stop_object_tracking(self): - raise NotImplementedError - -def leftOrRight(detections: List[Detection], midpoint: float) -> Literal[0, 1, 2, -1]: - """ - Get largest detection box and see if its center is in the left, center, or right third. - Returns 0 for left, 1 for center, 2 for right, -1 if nothing detected. - """ - largest_area = 0 - largest_detection: Detection = None # Initialize with None or a default Detection object - - if not detections: - print("nothing detected :(") - return -1 - - for d in detections: - # Calculate area using x_max, x_min, y_max, y_min - area = (d.x_max - d.x_min) * (d.y_max - d.y_min) - if area > largest_area: - largest_area = area - largest_detection = d - - if largest_detection is None: # Should not happen if detections is not empty, but for safety - return -1 - - # Calculate center X of the largest detection - centerX = largest_detection.x_min + (largest_detection.x_max - largest_detection.x_min) / 2 - - # Determine if center is left, center, or right - if centerX < midpoint - midpoint / 6: - return 0 # on the left - elif centerX > midpoint + midpoint / 6: - return 2 # on the right - else: - return 1 # basically centered - -# Implement your custom control logic module -class ObjectTrackingBaseModule(Module, ObjectTrackingBaseAPI): +class ObjectTrackingBaseModule(Module): MODEL = Model("example-namespace", "example-module", "object_tracking_base_module") def __init__(self, name: str): @@ -491,33 +393,51 @@ class ObjectTrackingBaseModule(Module, ObjectTrackingBaseAPI): self.base: Base = None self.camera: Camera = None self.detector: VisionClient = None - self.camera_name: str = "my_camera" # Default camera name, adjust in config if needed + self.camera_name: str = "my_camera" self._running_loop = False self._loop_task = None - # Control parameters (can be configured via module config if desired) - self.spin_num = 10 # when turning, spin the motor this much - self.straight_num = 300 # when going straight, spin motor this much - self.vel = 500 # go this fast when moving motor - self.num_cycles = 200 # run the loop X times (module will run indefinitely if _running_loop is True) + self.spin_num = 10 + self.straight_num = 300 + self.vel = 500 + self.num_cycles = 200 @classmethod - def new_resource(cls, config: ComponentConfig): - # You can parse attributes from the config here if you want to make - # camera_name, spin_num, etc. configurable from the Viam app. - # For simplicity, using defaults/hardcoded names for now. - return cls(config.name) + def new(cls, config: ComponentConfig, dependencies: Mapping[str, ResourceBase]) -> Self: + instance = cls(config.name) + instance.reconfigure(config, dependencies) + return instance + + @classmethod + def validate(cls, config: ComponentConfig) -> Tuple[Sequence[str], Sequence[str]]: + required_attributes = [] + optional_attributes = ["camera_name"] + + camera_name = config.attributes.fields["camera_name"].string_value if "camera_name" in config.attributes.fields else "my_camera" + + dependencies = [camera_name] + return dependencies, [] + + def reconfigure(self, config: ComponentConfig, dependencies: Mapping[str, ResourceBase]): + self.camera_name = config.attributes.fields["camera_name"].string_value if "camera_name" in config.attributes.fields else self.camera_name + + for dependency_name, dependency in dependencies.items(): + if dependency_name.subtype == "camera" and dependency_name.name == self.camera_name: + self.camera = dependency + elif dependency_name.subtype == "base": + self.base = dependency + + if not self.camera: + raise ValueError(f"Camera '{self.camera_name}' dependency not found.") + if not self.base: + raise ValueError("Base dependency not found.") async def start(self): """ Called when the module starts. Get references to components. """ print(f"ObjectTrackingBaseModule '{self.name}' starting...") - # Access components directly from the robot object provided by the module framework - self.base = await Base.from_robot(self.robot, "my_base") - self.camera = await Camera.from_robot(self.robot, self.camera_name) - # Replace "my_color_detector" with your actual vision service name self.detector = await VisionClient.from_robot(self.robot, "my_object_detector") print(f"ObjectTrackingBaseModule '{self.name}' started.") @@ -529,14 +449,41 @@ class ObjectTrackingBaseModule(Module, ObjectTrackingBaseAPI): await self.stop_object_tracking() print(f"ObjectTrackingBaseModule '{self.name}' closed.") + def left_or_right(self, detections: List[Detection], midpoint: float) -> Literal[0, 1, 2, -1]: + """ + Get largest detection box and see if its center is in the left, center, or right third. + Returns 0 for left, 1 for center, 2 for right, -1 if nothing detected. + """ + largest_area = 0 + largest_detection: Detection = None + + if not detections: + return -1 + + for d in detections: + area = (d.x_max - d.x_min) * (d.y_max - d.y_min) + if area > largest_area: + largest_area = area + largest_detection = d + + if largest_detection is None: + return -1 + + centerX = largest_detection.x_min + (largest_detection.x_max - largest_detection.x_min) / 2 + + if centerX < midpoint - midpoint / 6: + return 0 # on the left + elif centerX > midpoint + midpoint / 6: + return 2 # on the right + else: + return 1 # basically centered + async def _object_tracking_loop(self): """ The core object tracking and base control logic loop. """ print("Object tracking control loop started.") - # Get initial frame to determine midpoint for detection logic - # This assumes the camera resolution doesn't change during operation initial_frame = await self.camera.get_image(mime_type="image/jpeg") pil_initial_frame = viam_to_pil_image(initial_frame) midpoint = pil_initial_frame.size[0] / 2 @@ -546,32 +493,32 @@ class ObjectTrackingBaseModule(Module, ObjectTrackingBaseAPI): try: detections = await self.detector.get_detections_from_camera(self.camera_name) - answer = leftOrRight(detections, midpoint) + answer = self.left_or_right(detections, midpoint) if answer == 0: print("Detected object on left, spinning left.") - await self.base.spin(self.spin_num, self.vel) # CCW is positive + await self.base.spin(self.spin_num, self.vel) await self.base.move_straight(self.straight_num, self.vel) elif answer == 1: print("Detected object in center, moving straight.") await self.base.move_straight(self.straight_num, self.vel) elif answer == 2: print("Detected object on right, spinning right.") - await self.base.spin(-self.spin_num, self.vel) # CW is negative + await self.base.spin(-self.spin_num, self.vel) await self.base.move_straight(self.straight_num, self.vel) - else: # answer == -1 (nothing detected) + else: print("No object detected, stopping base.") - await self.base.stop() # Stop if nothing is detected + await self.base.stop() except Exception as e: print(f"Error in object tracking loop: {e}") cycle_count += 1 - await asyncio.sleep(0.1) # Small delay to prevent busy-waiting + await asyncio.sleep(0.1) print("Object tracking loop finished or stopped.") - await self.base.stop() # Ensure base stops when loop ends - self._running_loop = False # Reset state + await self.base.stop() + self._running_loop = False async def start_object_tracking(self): """ @@ -591,7 +538,7 @@ class ObjectTrackingBaseModule(Module, ObjectTrackingBaseAPI): if self._running_loop: self._running_loop = False if self._loop_task: - await self._loop_task # Wait for the task to complete its current iteration and exit + await self._loop_task # Wait for the task to complete its current iteration and exit self._loop_task = None print("Requested to stop object tracking loop.") else: @@ -599,9 +546,8 @@ class ObjectTrackingBaseModule(Module, ObjectTrackingBaseAPI): # Register your module Registry.register_resource_creator( - ObjectTrackingBaseAPI.SUBTYPE, ObjectTrackingBaseModule.MODEL, - ResourceCreatorRegistration(ObjectTrackingBaseModule.new_resource, ObjectTrackingBaseModule.validate_config) + ResourceCreatorRegistration(ObjectTrackingBaseModule.new, ObjectTrackingBaseModule.validate) ) async def main(): From 1c8701df1fe7fc9f15d3b85d8f3300abf295db02 Mon Sep 17 00:00:00 2001 From: nathan contino Date: Fri, 18 Jul 2025 16:06:49 -0400 Subject: [PATCH 04/15] Clean up configuration, add instructions to create, deploy, and test your module from scratch --- .../data-ai/ai/make-decisions-autonomously.md | 520 ++++++++++++------ 1 file changed, 361 insertions(+), 159 deletions(-) diff --git a/docs/data-ai/ai/make-decisions-autonomously.md b/docs/data-ai/ai/make-decisions-autonomously.md index a9e2ad02c5..b6bc9a1f17 100644 --- a/docs/data-ai/ai/make-decisions-autonomously.md +++ b/docs/data-ai/ai/make-decisions-autonomously.md @@ -14,7 +14,7 @@ Use the [vision service API](/dev/reference/apis/services/vision/) to make infer ## Follow a line -This module uses a vision service and a motor to program a machine to follow a line. +This module uses a vision service and a motor to program a machine to follow a line of a configurable color. ### Prerequisites @@ -35,8 +35,7 @@ Add the following `components` configuration to create board, base, and motor co "name": "my-board", "model": "pi", "api": "rdk:component:board", - "attributes": {}, - "depends_on": [] + "attributes": {} }, { "name": "leftm", @@ -49,8 +48,7 @@ Add the following `components` configuration to create board, base, and motor co }, "board": "my-board", "max_rpm": 200 - }, - "depends_on": ["my-board"] + } }, { "name": "rightm", @@ -65,8 +63,7 @@ Add the following `components` configuration to create board, base, and motor co }, "board": "my-board", "max_rpm": 200 - }, - "depends_on": ["my-board"] + } }, { "name": "scuttlebase", @@ -77,8 +74,7 @@ Add the following `components` configuration to create board, base, and motor co "wheel_circumference_mm": 258, "left": ["leftm"], "right": ["rightm"] - }, - "depends_on": ["leftm", "rightm"] + } } ``` @@ -111,8 +107,78 @@ Finally, add the following `services` configuration for your vision service, rep } ``` +### Create your module + +In a terminal, run the following command: + +```sh {id="terminal-prompt" class="command-line" data-prompt="$"} +viam module generate +``` + +Enter the following configuration for your new module: + +- **module name**: "autonomous_example_module" +- **language**: Python +- **visibility**: private +- **organization ID**: your organization ID, found on the Viam organization settings page +- **resource to be added to the module**: Generic Service +- **model name**: "line_follower" +- **Enable cloud build**: yes +- **Register module**: yes + +Create a file called reload.sh in the root directory of your newly-generated module. +Copy and paste the following code into reload.sh: + +```bash +#!/usr/bin/env bash + +# bash safe mode. look at `set --help` to see what these are doing +set -euxo pipefail + +cd $(dirname $0) +MODULE_DIR=$(dirname $0) +VIRTUAL_ENV=$MODULE_DIR/venv +PYTHON=$VIRTUAL_ENV/bin/python +./setup.sh + +# Be sure to use `exec` so that termination signals reach the python process, +# or handle forwarding termination signals manually +exec $PYTHON src/main.py $@ +``` + +In a terminal, run the following command to make reload.sh executable: + +```sh {id="terminal-prompt" class="command-line" data-prompt="$"} +chmod +x reload.sh +``` + +Create a virtual Python environment with the necessary packages by running the module setup script from within the module directory: + +```sh {id="terminal-prompt" class="command-line" data-prompt="$"} +sh setup.sh +``` + +Edit your meta.json, replacing the `"entrypoint"`, `"build"`, and `"path"` fields as follows: + +```json {class="line-numbers linkable-line-numbers" data-start="13" data-line="1, 4, 6" } + "entrypoint": "reload.sh", + "first_run": "", + "build": { + "build": "rm -f module.tar.gz && tar czf module.tar.gz requirements.txt src/*.py src/models/*.py meta.json setup.sh reload.sh", + "setup": "./setup.sh", + "path": "module.tar.gz", + "arch": [ + "linux/amd64", + "linux/arm64" + ] + } +``` + ### Code +Replace the contents of src/models/line_follower.py with the following code. +Replace the `` placeholder with your organization namespace. + ```python {class="line-numbers linkable-line-numbers"} import asyncio from typing import Any, Mapping, Sequence, Tuple @@ -129,8 +195,8 @@ from viam.proto.common import ResourceName from viam.services.vision import VisionClient from viam.components.base import Base, Vector3 -class ColorFollowerModule(Module, ResourceBase): - MODEL = Model(ModelFamily("example", "color-follower"), "color-follower-module") +class LineFollower(Module, ResourceBase): + MODEL = Model(ModelFamily("", "autonomous_example_module"), "line-follower") LOGGER = getLogger(__name__) def __init__(self, name: str): @@ -144,7 +210,7 @@ class ColorFollowerModule(Module, ResourceBase): self.angular_power = 0.3 @classmethod - def new(cls, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]) -> Self: + def new_resource(cls, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]) -> Self: instance = cls(config.name) instance.reconfigure(config, dependencies) return instance @@ -178,53 +244,53 @@ class ColorFollowerModule(Module, ResourceBase): if not self.detector: raise ValueError(f"Vision service '{self.detector_name}' dependency not found.") - ColorFollowerModule.LOGGER.info("Reconfigured.") + LineFollower.LOGGER.info("Reconfigured.") async def start(self): - ColorFollowerModule.LOGGER.info("Starting color following...") + LineFollower.LOGGER.info("Starting color following...") await self._start_color_following_internal() async def close(self): - ColorFollowerModule.LOGGER.info("Stopping color following...") + LineFollower.LOGGER.info("Stopping color following...") await self._stop_color_following_internal() - ColorFollowerModule.LOGGER.info("Stopped.") + LineFollower.LOGGER.info("Stopped.") async def _color_following_loop(self): - ColorFollowerModule.LOGGER.info("Color following loop started.") + LineFollower.LOGGER.info("Color following loop started.") while self._running_loop: try: # Check for color in front if await self._is_color_in_front(): - ColorFollowerModule.LOGGER.info("Moving forward.") + LineFollower.LOGGER.info("Moving forward.") await self.base.set_power(Vector3(y=self.linear_power), Vector3()) # Check for color to the left elif await self._is_color_there("left"): - ColorFollowerModule.LOGGER.info("Turning left.") + LineFollower.LOGGER.info("Turning left.") await self.base.set_power(Vector3(), Vector3(z=self.angular_power)) # Check for color to the right elif await self._is_color_there("right"): - ColorFollowerModule.LOGGER.info("Turning right.") + LineFollower.LOGGER.info("Turning right.") await self.base.set_power(Vector3(), Vector3(z=-self.angular_power)) else: - ColorFollowerModule.LOGGER.info("No color detected. Stopping.") + LineFollower.LOGGER.info("No color detected. Stopping.") await self.base.stop() except Exception as e: - ColorFollowerModule.LOGGER.error(f"Error in color following loop: {e}") + LineFollower.LOGGER.error(f"Error in color following loop: {e}") await asyncio.sleep(0.05) - ColorFollowerModule.LOGGER.info("Color following loop finished.") + LineFollower.LOGGER.info("Color following loop finished.") await self.base.stop() async def _start_color_following_internal(self): if not self._running_loop: self._running_loop = True self._loop_task = asyncio.create_task(self._color_following_loop()) - ColorFollowerModule.LOGGER.info("Requested to start color following loop.") + LineFollower.LOGGER.info("Requested to start color following loop.") else: - ColorFollowerModule.LOGGER.info("Color following loop is already running.") + LineFollower.LOGGER.info("Color following loop is already running.") async def _stop_color_following_internal(self): if self._running_loop: @@ -232,7 +298,7 @@ class ColorFollowerModule(Module, ResourceBase): if self._loop_task: await self._loop_task self._loop_task = None - ColorFollowerModule.LOGGER.info("Requested to stop color following loop.") + LineFollower.LOGGER.info("Requested to stop color following loop.") async def _is_color_in_front(self) -> bool: frame = await self.camera.get_image() @@ -253,8 +319,8 @@ class ColorFollowerModule(Module, ResourceBase): # Register your module Registry.register_resource_creator( - ColorFollowerModule.MODEL, - ResourceCreatorRegistration(ColorFollowerModule.new, ColorFollowerModule.validate) + LineFollower.MODEL, + ResourceCreatorRegistration(LineFollower.new_resource, LineFollower.validate) ) async def main(): @@ -265,12 +331,21 @@ async def main(): if __name__ == "__main__": asyncio.run(main()) - ColorFollowerModule.LOGGER.info("Done.") + LineFollower.LOGGER.info("Done.") +``` + +### Run your module + +Find the [Part ID](/dev/reference/apis/fleet/#find-part-id) for your machine. +To deploy your module on your machine, run the following command, replacing `` with your Part ID: + +```sh {id="terminal-prompt" class="command-line" data-prompt="$"} +viam module reload --part-id ``` ## Follow a colored object -This module uses a vision service and a motor to program a machine to follow an object. +This module uses a vision service and a motor to program a machine to follow an object of a configurable color. ### Prerequisites @@ -291,8 +366,7 @@ Add the following `components` configuration to create board, base, and motor co "name": "my-board", "model": "pi", "api": "rdk:component:board", - "attributes": {}, - "depends_on": [] + "attributes": {} }, { "name": "leftm", @@ -305,8 +379,7 @@ Add the following `components` configuration to create board, base, and motor co }, "board": "my-board", "max_rpm": 200 - }, - "depends_on": ["my-board"] + } }, { "name": "rightm", @@ -321,8 +394,7 @@ Add the following `components` configuration to create board, base, and motor co }, "board": "my-board", "max_rpm": 200 - }, - "depends_on": ["my-board"] + } }, { "name": "my_base", @@ -333,8 +405,7 @@ Add the following `components` configuration to create board, base, and motor co "wheel_circumference_mm": 258, "left": ["leftm"], "right": ["rightm"] - }, - "depends_on": ["leftm", "rightm"] + } } ``` @@ -364,11 +435,81 @@ Add the following `services` configuration, replacing the `detect_color` value w "detect_color": "#a13b4c", // replace with the color of your object "hue_tolerance_pct": 0.06 } -}, +} +``` + +### Create your module + +In a terminal, run the following command: + +```sh {id="terminal-prompt" class="command-line" data-prompt="$"} +viam module generate +``` + +Enter the following configuration for your new module: + +- **module name**: "autonomous_example_module" +- **language**: Python +- **visibility**: private +- **organization ID**: your organization ID, found on the Viam organization settings page +- **resource to be added to the module**: Generic Service +- **model name**: "object_follower" +- **Enable cloud build**: yes +- **Register module**: yes + +Create a file called reload.sh in the root directory of your newly-generated module. +Copy and paste the following code into reload.sh: + +```bash +#!/usr/bin/env bash + +# bash safe mode. look at `set --help` to see what these are doing +set -euxo pipefail + +cd $(dirname $0) +MODULE_DIR=$(dirname $0) +VIRTUAL_ENV=$MODULE_DIR/venv +PYTHON=$VIRTUAL_ENV/bin/python +./setup.sh + +# Be sure to use `exec` so that termination signals reach the python process, +# or handle forwarding termination signals manually +exec $PYTHON src/main.py $@ +``` + +In a terminal, run the following command to make reload.sh executable: + +```sh {id="terminal-prompt" class="command-line" data-prompt="$"} +chmod +x reload.sh +``` + +Create a virtual Python environment with the necessary packages by running the module setup script from within the module directory: + +```sh {id="terminal-prompt" class="command-line" data-prompt="$"} +sh setup.sh +``` + +Edit your meta.json, replacing the `"entrypoint"`, `"build"`, and `"path"` fields as follows: + +```json {class="line-numbers linkable-line-numbers" data-start="13" data-line="1, 4, 6" } + "entrypoint": "reload.sh", + "first_run": "", + "build": { + "build": "rm -f module.tar.gz && tar czf module.tar.gz requirements.txt src/*.py src/models/*.py meta.json setup.sh reload.sh", + "setup": "./setup.sh", + "path": "module.tar.gz", + "arch": [ + "linux/amd64", + "linux/arm64" + ] + } ``` ### Code +Replace the contents of src/models/object_follower.py with the following code. +Replace the `` placeholder with your organization namespace. + ```python {class="line-numbers linkable-line-numbers"} import asyncio from typing import Any, Mapping, List, Literal, Sequence, Tuple @@ -385,8 +526,8 @@ from viam.resource.registry import Registry, ResourceCreatorRegistration from viam.proto.app.v1 import ComponentConfig from viam.services.vision import Detection -class ObjectTrackingBaseModule(Module): - MODEL = Model("example-namespace", "example-module", "object_tracking_base_module") +class ObjectFollower(Module): + MODEL = Model("", "autonomous_example_module", "object_follower") def __init__(self, name: str): super().__init__(name) @@ -404,7 +545,7 @@ class ObjectTrackingBaseModule(Module): self.num_cycles = 200 @classmethod - def new(cls, config: ComponentConfig, dependencies: Mapping[str, ResourceBase]) -> Self: + def new_resource(cls, config: ComponentConfig, dependencies: Mapping[str, ResourceBase]) -> Self: instance = cls(config.name) instance.reconfigure(config, dependencies) return instance @@ -437,17 +578,17 @@ class ObjectTrackingBaseModule(Module): """ Called when the module starts. Get references to components. """ - print(f"ObjectTrackingBaseModule '{self.name}' starting...") + ObjectFollower.LOGGER.info(f"ObjectFollower '{self.name}' starting...") self.detector = await VisionClient.from_robot(self.robot, "my_object_detector") - print(f"ObjectTrackingBaseModule '{self.name}' started.") + ObjectFollower.LOGGER.info(f"ObjectFollower '{self.name}' started.") async def close(self): """ Called when the module is shutting down. Clean up tasks. """ - print(f"ObjectTrackingBaseModule '{self.name}' closing...") + ObjectFollower.LOGGER.info(f"ObjectFollower '{self.name}' closing...") await self.stop_object_tracking() - print(f"ObjectTrackingBaseModule '{self.name}' closed.") + ObjectFollower.LOGGER.info(f"ObjectFollower '{self.name}' closed.") def left_or_right(self, detections: List[Detection], midpoint: float) -> Literal[0, 1, 2, -1]: """ @@ -482,7 +623,7 @@ class ObjectTrackingBaseModule(Module): """ The core object tracking and base control logic loop. """ - print("Object tracking control loop started.") + ObjectFollower.LOGGER.info("Object tracking control loop started.") initial_frame = await self.camera.get_image(mime_type="image/jpeg") pil_initial_frame = viam_to_pil_image(initial_frame) @@ -496,27 +637,27 @@ class ObjectTrackingBaseModule(Module): answer = self.left_or_right(detections, midpoint) if answer == 0: - print("Detected object on left, spinning left.") + ObjectFollower.LOGGER.info("Detected object on left, spinning left.") await self.base.spin(self.spin_num, self.vel) await self.base.move_straight(self.straight_num, self.vel) elif answer == 1: - print("Detected object in center, moving straight.") + ObjectFollower.LOGGER.info("Detected object in center, moving straight.") await self.base.move_straight(self.straight_num, self.vel) elif answer == 2: - print("Detected object on right, spinning right.") + ObjectFollower.LOGGER.info("Detected object on right, spinning right.") await self.base.spin(-self.spin_num, self.vel) await self.base.move_straight(self.straight_num, self.vel) else: - print("No object detected, stopping base.") + ObjectFollower.LOGGER.info("No object detected, stopping base.") await self.base.stop() except Exception as e: - print(f"Error in object tracking loop: {e}") + ObjectFollower.LOGGER.info(f"Error in object tracking loop: {e}") cycle_count += 1 await asyncio.sleep(0.1) - print("Object tracking loop finished or stopped.") + ObjectFollower.LOGGER.info("Object tracking loop finished or stopped.") await self.base.stop() self._running_loop = False @@ -527,9 +668,9 @@ class ObjectTrackingBaseModule(Module): if not self._running_loop: self._running_loop = True self._loop_task = asyncio.create_task(self._object_tracking_loop()) - print("Requested to start object tracking loop.") + ObjectFollower.LOGGER.info("Requested to start object tracking loop.") else: - print("Object tracking loop is already running.") + ObjectFollower.LOGGER.info("Object tracking loop is already running.") async def stop_object_tracking(self): """ @@ -540,14 +681,14 @@ class ObjectTrackingBaseModule(Module): if self._loop_task: await self._loop_task # Wait for the task to complete its current iteration and exit self._loop_task = None - print("Requested to stop object tracking loop.") + ObjectFollower.LOGGER.info("Requested to stop object tracking loop.") else: - print("Object tracking loop is not running.") + ObjectFollower.LOGGER.info("Object tracking loop is not running.") # Register your module Registry.register_resource_creator( - ObjectTrackingBaseModule.MODEL, - ResourceCreatorRegistration(ObjectTrackingBaseModule.new, ObjectTrackingBaseModule.validate) + ObjectFollower.MODEL, + ResourceCreatorRegistration(ObjectFollower.new_resource, ObjectFollower.validate) ) async def main(): @@ -558,100 +699,184 @@ async def main(): if __name__ == "__main__": asyncio.run(main()) - print("Done.") + ObjectFollower.LOGGER.info("Done.") +``` + +### Run your module + +Find the [Part ID](/dev/reference/apis/fleet/#find-part-id) for your machine. +To deploy your module on your machine, run the following command, replacing `` with your Part ID: + +```sh {id="terminal-prompt" class="command-line" data-prompt="$"} +viam module reload --part-id ``` -### Notify when a certain object appears in a video feed +## Notify when a certain object appears in a video feed This module uses a vision service to program a machine to send a notification when a certain object appears in a video feed. +This example detects people with an IR camera, but you can use a different camera, ML model, or vision service to detect any object with the same logic. ### Prerequisites - An SBC, for example a Raspberry Pi 4 -- A webcam +- An IR camera, for example a [Raspberry Pi Camera Module 2 NoIR](https://www.raspberrypi.com/products/pi-noir-camera-v2/) ### Configure your machine Follow the [setup guide](/operate/get-started/setup/) to create a new machine. -Connect your SCUTTLE base to your SBC. -Add the following `components` configuration: +Connect your camera to your SBC. +Add the following `components` configuration for your camera: ```json - +{ + "name": "my_camera", + "model": "viam:camera:csi", + "attributes": { + "width_px": 1920, + "height_px": 1080, + "frame_rate": 30 + }, + "depends_on": [], + "namespace": "rdk", + "type": "camera" +} ``` -Connect your webcam to your SBC. -Add the following `components` configuration for your webcam: +Add the following `services` configuration: ```json { - "name": "my_camera", - "model": "webcam", - "api": "rdk:component:camera", + "name": "ir-person-mlmodel", + "type": "mlmodel", + "namespace": "rdk", + "model": "viam-labs:mlmodel:near-ir-person", + "attributes": {} +}, +{ + "name": "my-object-detector", + "type": "vision", + "model": "mlmodel", "attributes": { - "video_path": "" + "mlmodel_name": "ir-person-mlmodel" } } ``` +### Create your module + +In a terminal, run the following command: + +```sh {id="terminal-prompt" class="command-line" data-prompt="$"} +viam module generate +``` + +Enter the following configuration for your new module: + +- **module name**: "autonomous_example_module" +- **language**: Python +- **visibility**: private +- **organization ID**: your organization ID, found on the Viam organization settings page +- **resource to be added to the module**: Generic Service +- **model name**: "email_notifier" +- **Enable cloud build**: yes +- **Register module**: yes + +Create a file called reload.sh in the root directory of your newly-generated module. +Copy and paste the following code into reload.sh: + +```bash +#!/usr/bin/env bash + +# bash safe mode. look at `set --help` to see what these are doing +set -euxo pipefail + +cd $(dirname $0) +MODULE_DIR=$(dirname $0) +VIRTUAL_ENV=$MODULE_DIR/venv +PYTHON=$VIRTUAL_ENV/bin/python +./setup.sh + +# Be sure to use `exec` so that termination signals reach the python process, +# or handle forwarding termination signals manually +exec $PYTHON src/main.py $@ +``` + +In a terminal, run the following command to make reload.sh executable: + +```sh {id="terminal-prompt" class="command-line" data-prompt="$"} +chmod +x reload.sh +``` + +Create a virtual Python environment with the necessary packages by running the module setup script from within the module directory: + +```sh {id="terminal-prompt" class="command-line" data-prompt="$"} +sh setup.sh +``` + +Edit your meta.json, replacing the `"entrypoint"`, `"build"`, and `"path"` fields as follows: + +```json {class="line-numbers linkable-line-numbers" data-start="13" data-line="1, 4, 6" } + "entrypoint": "reload.sh", + "first_run": "", + "build": { + "build": "rm -f module.tar.gz && tar czf module.tar.gz requirements.txt src/*.py src/models/*.py meta.json setup.sh reload.sh", + "setup": "./setup.sh", + "path": "module.tar.gz", + "arch": [ + "linux/amd64", + "linux/arm64" + ] + } +``` ### Code +Replace the contents of src/models/email_notifier.py with the following code. +Replace the `` placeholder with your organization namespace. ```python import asyncio import os -from typing import List, Literal, Mapping, Any +from typing import List, Mapping, Any from viam.robot.client import RobotClient from viam.components.camera import Camera from viam.services.vision import VisionClient -from viam.media.utils.pil import pil_to_viam_image, viam_to_pil_image from viam.module.module import Module -from viam.resource.types import Model, Subtype -from viam.resource.base import ResourceBase +from viam.resource.types import Model from viam.resource.registry import Registry, ResourceCreatorRegistration from viam.proto.app.v1 import ComponentConfig -from viam.services.vision import Detection from viam.services.generic import Generic import smtplib from email.mime.text import MIMEText -class EmailNotifierModule(Module, Generic): - MODEL = Model("example-namespace", "example-module", "email_notifier_generic") +class EmailNotifier(Module, Generic): + MODEL = Model("", "autonomous_example_module", "email_notifier") def __init__(self, name: str): super().__init__(name) self.camera: Camera = None self.detector: VisionClient = None - self.camera_name: str = "my_camera" # Default camera name, adjust in config if needed - self.detector_name: str = "my_object_detector" # Default vision service name - self.target_object_name: str = "person" # The object to detect for notification + self.notification_sent: bool = False - # Email configuration (sensitive info should ideally be managed securely, e.g., environment variables) + # Email configuration self.sender_email: str = os.getenv("SENDER_EMAIL", "your_email@example.com") self.sender_password: str = os.getenv("SENDER_PASSWORD", "your_email_password") self.receiver_email: str = os.getenv("RECEIVER_EMAIL", "recipient_email@example.com") self.smtp_server: str = os.getenv("SMTP_SERVER", "smtp.example.com") - self.smtp_port: int = int(os.getenv("SMTP_PORT", 587)) # Typically 587 for TLS + self.smtp_port: int = int(os.getenv("SMTP_PORT", 587)) self._running_loop = False self._loop_task = None - self._notification_sent = False @classmethod def new_resource(cls, config: ComponentConfig): - # Parse attributes from the config here to make them configurable module = cls(config.name) if "camera_name" in config.attributes.fields: module.camera_name = config.attributes.fields["camera_name"].string_value if "detector_name" in config.attributes.fields: - module.detector_name = config.attributes.fields["detector_name"].string_value - if "target_object_name" in config.attributes.fields: - module.target_object_name = config.attributes.fields["target_object_name"].string_value - - # Email configuration can also be set via config, but environment variables are often preferred for secrets + module.camera_name = config.attributes.fields["detector_name"].string_value if "sender_email" in config.attributes.fields: module.sender_email = config.attributes.fields["sender_email"].string_value if "sender_password" in config.attributes.fields: @@ -666,26 +891,17 @@ class EmailNotifierModule(Module, Generic): return module async def start(self): - """ - Called when the module starts. Get references to components. - """ - print(f"EmailNotifierModule '{self.name}' starting...") + EmailNotifier.LOGGER.info(f"'{self.name}' starting...") self.camera = await Camera.from_robot(self.robot, self.camera_name) self.detector = await VisionClient.from_robot(self.robot, self.detector_name) - print(f"EmailNotifierModule '{self.name}' started. Monitoring for '{self.target_object_name}'.") + EmailNotifier.LOGGER.info(f"'{self.name}' started. Monitoring for detections.") async def close(self): - """ - Called when the module is shutting down. Clean up tasks. - """ - print(f"EmailNotifierModule '{self.name}' closing...") - await self._stop_detection_monitoring_internal() # Call internal stop method - print(f"EmailNotifierModule '{self.name}' closed.") + EmailNotifier.LOGGER.info(f"'{self.name}' closing...") + await self._stop_detection_monitoring_internal() + EmailNotifier.LOGGER.info(f"'{self.name}' closed.") def _send_email(self, subject: str, body: str): - """ - Helper function to send an email. - """ try: msg = MIMEText(body) msg['Subject'] = subject @@ -693,109 +909,95 @@ class EmailNotifierModule(Module, Generic): msg['To'] = self.receiver_email with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: - server.starttls() # Secure the connection + server.starttls() server.login(self.sender_email, self.sender_password) server.send_message(msg) - print(f"Email sent successfully to {self.receiver_email}: '{subject}'") - self._notification_sent = True # Mark that notification has been sent + EmailNotifier.LOGGER.info(f"Email sent successfully to {self.receiver_email}: '{subject}'") + self.notification_sent = True except Exception as e: - print(f"Failed to send email: {e}") - self._notification_sent = False # Reset if sending failed + EmailNotifier.LOGGER.info(f"Failed to send email: {e}") + self.notification_sent = False async def _detection_monitoring_loop(self): - """ - The core object detection monitoring and email notification logic loop. - """ - print("Detection monitoring loop started.") + EmailNotifier.LOGGER.info("Detection monitoring loop started.") while self._running_loop: try: detections = await self.detector.get_detections_from_camera(self.camera_name) - object_detected = False - for d in detections: - if d.class_name == self.target_object_name: - object_detected = True - break - - if object_detected and not self._notification_sent: - subject = f"Viam Module Alert: {self.target_object_name} Detected!" - body = f"A {self.target_object_name} was detected by the vision service '{self.detector_name}' on camera '{self.camera_name}'." - print(f"Detected '{self.target_object_name}'. Sending email notification...") + if detections and not self.notification_sent: + subject = "Viam Module Alert: Detection Found!" + body = "A detection was found by the vision service." + EmailNotifier.LOGGER.info("Detection found. Sending email notification...") self._send_email(subject, body) - elif not object_detected and self._notification_sent: - # Reset notification status if the object is no longer detected, - # allowing another notification if it reappears. - print(f"'{self.target_object_name}' no longer detected. Resetting notification status.") - self._notification_sent = False - elif object_detected and self._notification_sent: - print(f"'{self.target_object_name}' still detected, but notification already sent.") - else: # not object_detected and not self._notification_sent - print(f"'{self.target_object_name}' not detected.") + elif not detections and self.notification_sent: + EmailNotifier.LOGGER.info("No detections found. Resetting notification status.") + self.notification_sent = False + elif detections and self.notification_sent: + EmailNotifier.LOGGER.info("Detection still present, but notification already sent.") + else: + EmailNotifier.LOGGER.info("No detections.") except Exception as e: - print(f"Error in detection monitoring loop: {e}") + EmailNotifier.LOGGER.info(f"Error in detection monitoring loop: {e}") - await asyncio.sleep(5) # Check every 5 seconds + await asyncio.sleep(5) - print("Detection monitoring loop finished or stopped.") - self._notification_sent = False # Reset state when loop stops + EmailNotifier.LOGGER.info("Detection monitoring loop finished or stopped.") + self.notification_sent = False async def _start_detection_monitoring_internal(self): - """ - Internal method to start the background loop. - """ if not self._running_loop: self._running_loop = True self._loop_task = asyncio.create_task(self._detection_monitoring_loop()) - print("Requested to start detection monitoring loop.") + EmailNotifier.LOGGER.info("Requested to start detection monitoring loop.") return {"status": "started"} else: - print("Detection monitoring loop is already running.") + EmailNotifier.LOGGER.info("Detection monitoring loop is already running.") return {"status": "already_running"} async def _stop_detection_monitoring_internal(self): - """ - Internal method to stop the background loop. - """ if self._running_loop: self._running_loop = False if self._loop_task: - await self._loop_task # Wait for the task to complete its current iteration and exit + await self._loop_task self._loop_task = None - print("Requested to stop detection monitoring loop.") + EmailNotifier.LOGGER.info("Requested to stop detection monitoring loop.") return {"status": "stopped"} else: - print("Detection monitoring loop is not running.") + EmailNotifier.LOGGER.info("Detection monitoring loop is not running.") return {"status": "not_running"} async def do_command(self, command: Mapping[str, Any], *, timeout: float | None = None, **kwargs) -> Mapping[str, Any]: - """ - Implement the do_command method to expose custom functionality. - """ if "start_monitoring" in command: - print("Received 'start_monitoring' command via do_command.") + EmailNotifier.LOGGER.info("Received 'start_monitoring' command via do_command.") return await self._start_detection_monitoring_internal() elif "stop_monitoring" in command: - print("Received 'stop_monitoring' command via do_command.") + EmailNotifier.LOGGER.info("Received 'stop_monitoring' command via do_command.") return await self._stop_detection_monitoring_internal() else: raise NotImplementedError(f"Command '{command}' not recognized.") # Register your module Registry.register_resource_creator( - Generic.SUBTYPE, # Register as a Generic service - EmailNotifierModule.MODEL, - ResourceCreatorRegistration(EmailNotifierModule.new_resource, EmailNotifierModule.validate_config) + Generic.SUBTYPE, + EmailNotifier.MODEL, + ResourceCreatorRegistration(EmailNotifier.new_resource, EmailNotifier.validate_config) ) async def main(): - """ - Main entry point for the Viam module. - """ await Module.serve() if __name__ == "__main__": asyncio.run(main()) - print("Done.") + EmailNotifier.LOGGER.info("Done.") +``` + +### Run your module + +Find the [Part ID](/dev/reference/apis/fleet/#find-part-id) for your machine. +To deploy your module on your machine, run the following command, replacing `` with your Part ID: + +```sh {id="terminal-prompt" class="command-line" data-prompt="$"} +viam module reload --part-id ``` \ No newline at end of file From 44c27531b79b73b71dc8b7d8d1c385a9f48172cb Mon Sep 17 00:00:00 2001 From: nathan contino Date: Fri, 18 Jul 2025 16:28:27 -0400 Subject: [PATCH 05/15] More cleanup --- .../data-ai/ai/make-decisions-autonomously.md | 112 +++++++++++++----- 1 file changed, 83 insertions(+), 29 deletions(-) diff --git a/docs/data-ai/ai/make-decisions-autonomously.md b/docs/data-ai/ai/make-decisions-autonomously.md index b6bc9a1f17..75754702ca 100644 --- a/docs/data-ai/ai/make-decisions-autonomously.md +++ b/docs/data-ai/ai/make-decisions-autonomously.md @@ -96,9 +96,9 @@ Finally, add the following `services` configuration for your vision service, rep ```json { - "name": "green_detector", + "name": "my_line_detector", "api": "rdk:service:vision", - "model": "my_line_detector", + "model": "color_detector", "attributes": { "segment_size_px": 100, "detect_color": "#19FFD9", // replace with the color of your line @@ -184,6 +184,7 @@ import asyncio from typing import Any, Mapping, Sequence, Tuple from typing_extensions import Self +from viam.components.base import Base from viam.components.camera import Camera from viam.logging import getLogger from viam.module.module import Module @@ -217,32 +218,32 @@ class LineFollower(Module, ResourceBase): @classmethod def validate(cls, config: ComponentConfig) -> Tuple[Sequence[str], Sequence[str]]: - required_attributes = [] - optional_attributes = [ - "camera_name", - "detector_name", - ] - - camera_name = config.attributes.fields["camera_name"].string_value if "camera_name" in config.attributes.fields else "my_camera" - detector_name = config.attributes.fields["detector_name"].string_value if "detector_name" in config.attributes.fields else "my_detector" + camera_name = config.attributes.fields["camera_name"].string_value + detector_name = config.attributes.fields["detector_name"].string_value + base_name = config.attributes.fields["base_name"].string_value - dependencies = [camera_name, detector_name] + dependencies = [camera_name, detector_name, base_name] return dependencies, [] def reconfigure(self, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]): - self.camera_name = config.attributes.fields["camera_name"].string_value if "camera_name" in config.attributes.fields else "my_camera" - self.detector_name = config.attributes.fields["detector_name"].string_value if "detector_name" in config.attributes.fields else "my_detector" + self.camera_name = config.attributes.fields["camera_name"].string_value + self.detector_name = config.attributes.fields["detector_name"].string_value + self.detector_name = config.attributes.fields["base_name"].string_value for dependency_name, dependency in dependencies.items(): if dependency_name.subtype == "camera" and dependency_name.name == self.camera_name: self.camera = dependency elif dependency_name.subtype == "vision" and dependency_name.name == self.detector_name: self.detector = dependency + elif dependency_name.subtype == "base" and dependency_name.name == self.base_name: + self.base = dependency if not self.camera: raise ValueError(f"Camera '{self.camera_name}' dependency not found.") if not self.detector: raise ValueError(f"Vision service '{self.detector_name}' dependency not found.") + if not self.base: + raise ValueError(f"Base '{self.base_name}' dependency not found.") LineFollower.LOGGER.info("Reconfigured.") @@ -343,6 +344,22 @@ To deploy your module on your machine, run the following command, replacing ` ``` +Add the following `services` configuration for your new module: + +```json +{ + "name": "generic-1", + "api": "rdk:service:generic", + "model": ":autonomous_example_module:line_follower", + "attributes": { + "detector_name": "my_object_detector", + "camera_name": "my_camera" + } +} +``` + +Give your machine a few moments to load the new configuration, and you can begin testing your module. + ## Follow a colored object This module uses a vision service and a motor to program a machine to follow an object of a configurable color. @@ -534,7 +551,6 @@ class ObjectFollower(Module): self.base: Base = None self.camera: Camera = None self.detector: VisionClient = None - self.camera_name: str = "my_camera" self._running_loop = False self._loop_task = None @@ -552,43 +568,50 @@ class ObjectFollower(Module): @classmethod def validate(cls, config: ComponentConfig) -> Tuple[Sequence[str], Sequence[str]]: - required_attributes = [] - optional_attributes = ["camera_name"] + camera_name = config.attributes.fields["camera_name"].string_value + detector_name = config.attributes.fields["detector_name"].string_value + base_name = config.attributes.fields["base_name"].string_value - camera_name = config.attributes.fields["camera_name"].string_value if "camera_name" in config.attributes.fields else "my_camera" - - dependencies = [camera_name] + dependencies = [camera_name, detector_name, base_name] return dependencies, [] - def reconfigure(self, config: ComponentConfig, dependencies: Mapping[str, ResourceBase]): - self.camera_name = config.attributes.fields["camera_name"].string_value if "camera_name" in config.attributes.fields else self.camera_name + def reconfigure(self, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]): + self.camera_name = config.attributes.fields["camera_name"].string_value + self.detector_name = config.attributes.fields["detector_name"].string_value + self.detector_name = config.attributes.fields["base_name"].string_value for dependency_name, dependency in dependencies.items(): if dependency_name.subtype == "camera" and dependency_name.name == self.camera_name: self.camera = dependency - elif dependency_name.subtype == "base": + elif dependency_name.subtype == "vision" and dependency_name.name == self.detector_name: + self.detector = dependency + elif dependency_name.subtype == "base" and dependency_name.name == self.base_name: self.base = dependency if not self.camera: raise ValueError(f"Camera '{self.camera_name}' dependency not found.") + if not self.detector: + raise ValueError(f"Vision service '{self.detector_name}' dependency not found.") if not self.base: - raise ValueError("Base dependency not found.") + raise ValueError(f"Base '{self.base_name}' dependency not found.") + + LineFollower.LOGGER.info("Reconfigured.") async def start(self): """ Called when the module starts. Get references to components. """ - ObjectFollower.LOGGER.info(f"ObjectFollower '{self.name}' starting...") - self.detector = await VisionClient.from_robot(self.robot, "my_object_detector") - ObjectFollower.LOGGER.info(f"ObjectFollower '{self.name}' started.") + ObjectFollower.LOGGER.info(f"'{self.name}' starting...") + await self.start_object_tracking() + ObjectFollower.LOGGER.info(f"'{self.name}' started.") async def close(self): """ Called when the module is shutting down. Clean up tasks. """ - ObjectFollower.LOGGER.info(f"ObjectFollower '{self.name}' closing...") + ObjectFollower.LOGGER.info(f"'{self.name}' closing...") await self.stop_object_tracking() - ObjectFollower.LOGGER.info(f"ObjectFollower '{self.name}' closed.") + ObjectFollower.LOGGER.info(f"'{self.name}' closed.") def left_or_right(self, detections: List[Detection], midpoint: float) -> Literal[0, 1, 2, -1]: """ @@ -711,6 +734,21 @@ To deploy your module on your machine, run the following command, replacing ` ``` +Add the following `services` configuration for your new model: + +```json +{ + "name": "generic-1", + "api": "rdk:service:generic", + "model": ":autonomous_example_module:line_follower", + "attributes": { + "camera_name": "my_camera" + } +} +``` + +Give your machine a few moments to load the new configuration, and you can begin testing your module. + ## Notify when a certain object appears in a video feed This module uses a vision service to program a machine to send a notification when a certain object appears in a video feed. @@ -1000,4 +1038,20 @@ To deploy your module on your machine, run the following command, replacing ` -``` \ No newline at end of file +``` + +Add the following `services` configuration for your new model: + +```json +{ + "name": "generic-1", + "api": "rdk:service:generic", + "model": ":autonomous_example_module:email_notifier", + "attributes": { + "detector_name": "my-object-detector", + "camera_name": "my_camera" + } +} +``` + +Give your machine a few moments to load the new configuration, and you can begin testing your module. \ No newline at end of file From 44566bb400381b9f919c82fc06676f59dd54af8d Mon Sep 17 00:00:00 2001 From: nathan contino Date: Fri, 18 Jul 2025 16:30:25 -0400 Subject: [PATCH 06/15] Lint --- .../data-ai/ai/make-decisions-autonomously.md | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/docs/data-ai/ai/make-decisions-autonomously.md b/docs/data-ai/ai/make-decisions-autonomously.md index 75754702ca..f4cf9b6c99 100644 --- a/docs/data-ai/ai/make-decisions-autonomously.md +++ b/docs/data-ai/ai/make-decisions-autonomously.md @@ -338,7 +338,7 @@ if __name__ == "__main__": ### Run your module Find the [Part ID](/dev/reference/apis/fleet/#find-part-id) for your machine. -To deploy your module on your machine, run the following command, replacing `` with your Part ID: +To deploy your module on your machine, run the following command, replacing `` with your Part ID: ```sh {id="terminal-prompt" class="command-line" data-prompt="$"} viam module reload --part-id @@ -444,14 +444,14 @@ Add the following `services` configuration, replacing the `detect_color` value w ```json { - "name": "my_color_detector", - "api": "rdk:service:vision", - "model": "my_object_detector", - "attributes": { - "segment_size_px": 100, - "detect_color": "#a13b4c", // replace with the color of your object - "hue_tolerance_pct": 0.06 - } + "name": "my_color_detector", + "api": "rdk:service:vision", + "model": "my_object_detector", + "attributes": { + "segment_size_px": 100, + "detect_color": "#a13b4c", // replace with the color of your object + "hue_tolerance_pct": 0.06 + } } ``` @@ -728,7 +728,7 @@ if __name__ == "__main__": ### Run your module Find the [Part ID](/dev/reference/apis/fleet/#find-part-id) for your machine. -To deploy your module on your machine, run the following command, replacing `` with your Part ID: +To deploy your module on your machine, run the following command, replacing `` with your Part ID: ```sh {id="terminal-prompt" class="command-line" data-prompt="$"} viam module reload --part-id @@ -1034,7 +1034,7 @@ if __name__ == "__main__": ### Run your module Find the [Part ID](/dev/reference/apis/fleet/#find-part-id) for your machine. -To deploy your module on your machine, run the following command, replacing `` with your Part ID: +To deploy your module on your machine, run the following command, replacing `` with your Part ID: ```sh {id="terminal-prompt" class="command-line" data-prompt="$"} viam module reload --part-id @@ -1054,4 +1054,4 @@ Add the following `services` configuration for your new model: } ``` -Give your machine a few moments to load the new configuration, and you can begin testing your module. \ No newline at end of file +Give your machine a few moments to load the new configuration, and you can begin testing your module. From 3110bc49a32f8ed8a4f17d8d9ca60b75fbec8fb0 Mon Sep 17 00:00:00 2001 From: nathan contino Date: Mon, 21 Jul 2025 16:48:45 -0400 Subject: [PATCH 07/15] Switch to hard hat detection --- .../data-ai/ai/make-decisions-autonomously.md | 60 +++++-------------- 1 file changed, 16 insertions(+), 44 deletions(-) diff --git a/docs/data-ai/ai/make-decisions-autonomously.md b/docs/data-ai/ai/make-decisions-autonomously.md index f4cf9b6c99..45e594e90f 100644 --- a/docs/data-ai/ai/make-decisions-autonomously.md +++ b/docs/data-ai/ai/make-decisions-autonomously.md @@ -152,12 +152,6 @@ In a terminal, run the following command to make reload.sh executab chmod +x reload.sh ``` -Create a virtual Python environment with the necessary packages by running the module setup script from within the module directory: - -```sh {id="terminal-prompt" class="command-line" data-prompt="$"} -sh setup.sh -``` - Edit your meta.json, replacing the `"entrypoint"`, `"build"`, and `"path"` fields as follows: ```json {class="line-numbers linkable-line-numbers" data-start="13" data-line="1, 4, 6" } @@ -352,7 +346,7 @@ Add the following `services` configuration for your new module: "api": "rdk:service:generic", "model": ":autonomous_example_module:line_follower", "attributes": { - "detector_name": "my_object_detector", + "detector_name": "my_line_detector", "camera_name": "my_camera" } } @@ -444,7 +438,7 @@ Add the following `services` configuration, replacing the `detect_color` value w ```json { - "name": "my_color_detector", + "name": "my_object_detector", "api": "rdk:service:vision", "model": "my_object_detector", "attributes": { @@ -500,12 +494,6 @@ In a terminal, run the following command to make reload.sh executab chmod +x reload.sh ``` -Create a virtual Python environment with the necessary packages by running the module setup script from within the module directory: - -```sh {id="terminal-prompt" class="command-line" data-prompt="$"} -sh setup.sh -``` - Edit your meta.json, replacing the `"entrypoint"`, `"build"`, and `"path"` fields as follows: ```json {class="line-numbers linkable-line-numbers" data-start="13" data-line="1, 4, 6" } @@ -742,7 +730,8 @@ Add the following `services` configuration for your new model: "api": "rdk:service:generic", "model": ":autonomous_example_module:line_follower", "attributes": { - "camera_name": "my_camera" + "camera_name": "my_camera", + "detector_name": "my_object_detector" } } ``` @@ -751,13 +740,13 @@ Give your machine a few moments to load the new configuration, and you can begin ## Notify when a certain object appears in a video feed -This module uses a vision service to program a machine to send a notification when a certain object appears in a video feed. -This example detects people with an IR camera, but you can use a different camera, ML model, or vision service to detect any object with the same logic. +This module uses a vision service to program a machine to send a notification when a vision service detects an object. +This example detects people wearing hard hats, but you can use a different ML model or vision service to detect any object with the same logic. ### Prerequisites - An SBC, for example a Raspberry Pi 4 -- An IR camera, for example a [Raspberry Pi Camera Module 2 NoIR](https://www.raspberrypi.com/products/pi-noir-camera-v2/) +- An webcam ### Configure your machine @@ -769,15 +758,11 @@ Add the following `components` configuration for your camera: ```json { "name": "my_camera", - "model": "viam:camera:csi", + "model": "webcam", + "api": "rdk:component:camera", "attributes": { - "width_px": 1920, - "height_px": 1080, - "frame_rate": 30 - }, - "depends_on": [], - "namespace": "rdk", - "type": "camera" + "video_path": "" + } } ``` @@ -785,18 +770,11 @@ Add the following `services` configuration: ```json { - "name": "ir-person-mlmodel", - "type": "mlmodel", - "namespace": "rdk", - "model": "viam-labs:mlmodel:near-ir-person", - "attributes": {} -}, -{ - "name": "my-object-detector", - "type": "vision", - "model": "mlmodel", + "name": "hard_hat_detector_vision_service", + "api": "rdk:service:vision", + "model": "viam-labs:vision:yolov8", "attributes": { - "mlmodel_name": "ir-person-mlmodel" + "model_location": "keremberke/yolov8n-hard-hat-detection" } } ``` @@ -846,12 +824,6 @@ In a terminal, run the following command to make reload.sh executab chmod +x reload.sh ``` -Create a virtual Python environment with the necessary packages by running the module setup script from within the module directory: - -```sh {id="terminal-prompt" class="command-line" data-prompt="$"} -sh setup.sh -``` - Edit your meta.json, replacing the `"entrypoint"`, `"build"`, and `"path"` fields as follows: ```json {class="line-numbers linkable-line-numbers" data-start="13" data-line="1, 4, 6" } @@ -1048,7 +1020,7 @@ Add the following `services` configuration for your new model: "api": "rdk:service:generic", "model": ":autonomous_example_module:email_notifier", "attributes": { - "detector_name": "my-object-detector", + "detector_name": "hard_hat_detector_vision_service", "camera_name": "my_camera" } } From 4358880d929dcb400d6b75275e90dabdc68ae68f Mon Sep 17 00:00:00 2001 From: nathan contino Date: Mon, 21 Jul 2025 16:55:29 -0400 Subject: [PATCH 08/15] Add aliases --- docs/data-ai/reference/apis/data-client.md | 4 +++- docs/data-ai/reference/apis/data-management-client.md | 2 ++ docs/data-ai/reference/apis/ml-model-client.md | 2 ++ docs/data-ai/reference/apis/ml-training-client.md | 2 ++ docs/data-ai/reference/apis/vision-client.md | 2 ++ 5 files changed, 11 insertions(+), 1 deletion(-) diff --git a/docs/data-ai/reference/apis/data-client.md b/docs/data-ai/reference/apis/data-client.md index fd21d77239..fd0b8d9ff7 100644 --- a/docs/data-ai/reference/apis/data-client.md +++ b/docs/data-ai/reference/apis/data-client.md @@ -5,4 +5,6 @@ weight: 30 type: "docs" layout: "empty" canonical: "/dev/reference/apis/data-client/" ---- +aliases: + - /data-ai/reference/data-client/ +--- \ No newline at end of file diff --git a/docs/data-ai/reference/apis/data-management-client.md b/docs/data-ai/reference/apis/data-management-client.md index 154075a0e3..47e51a7588 100644 --- a/docs/data-ai/reference/apis/data-management-client.md +++ b/docs/data-ai/reference/apis/data-management-client.md @@ -5,4 +5,6 @@ weight: 30 type: "docs" layout: "empty" canonical: "/dev/reference/apis/services/data/" +aliases: + - /data-ai/reference/data-management-client/ --- diff --git a/docs/data-ai/reference/apis/ml-model-client.md b/docs/data-ai/reference/apis/ml-model-client.md index bc23270295..9fb5ae2e0b 100644 --- a/docs/data-ai/reference/apis/ml-model-client.md +++ b/docs/data-ai/reference/apis/ml-model-client.md @@ -5,4 +5,6 @@ weight: 30 type: "docs" layout: "empty" canonical: "/dev/reference/apis/services/ml/" +aliases: + - /data-ai/reference/ml-model-client/ --- diff --git a/docs/data-ai/reference/apis/ml-training-client.md b/docs/data-ai/reference/apis/ml-training-client.md index abb7edd607..fa05383b2d 100644 --- a/docs/data-ai/reference/apis/ml-training-client.md +++ b/docs/data-ai/reference/apis/ml-training-client.md @@ -5,4 +5,6 @@ weight: 40 type: "docs" layout: "empty" canonical: "/dev/reference/apis/services/ml/" +aliases: + - /data-ai/reference/ml-training-client/ --- diff --git a/docs/data-ai/reference/apis/vision-client.md b/docs/data-ai/reference/apis/vision-client.md index fdac1ddd3c..13d2fce799 100644 --- a/docs/data-ai/reference/apis/vision-client.md +++ b/docs/data-ai/reference/apis/vision-client.md @@ -5,4 +5,6 @@ weight: 30 type: "docs" layout: "empty" canonical: "/dev/reference/apis/services/vision/" +aliases: + - /data-ai/reference/vision-client/ --- From 171e3cbaff22b6c340b0b97d17f5104875293526 Mon Sep 17 00:00:00 2001 From: nathan contino Date: Mon, 21 Jul 2025 16:59:05 -0400 Subject: [PATCH 09/15] Polish code --- docs/data-ai/ai/make-decisions-autonomously.md | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/docs/data-ai/ai/make-decisions-autonomously.md b/docs/data-ai/ai/make-decisions-autonomously.md index 45e594e90f..dfd77e4eb0 100644 --- a/docs/data-ai/ai/make-decisions-autonomously.md +++ b/docs/data-ai/ai/make-decisions-autonomously.md @@ -178,22 +178,21 @@ import asyncio from typing import Any, Mapping, Sequence, Tuple from typing_extensions import Self -from viam.components.base import Base +from viam.components.base import Base, ResourceBase, Vector3 from viam.components.camera import Camera from viam.logging import getLogger from viam.module.module import Module from viam.resource.types import Model, ModelFamily -from viam.resource.base import ResourceBase from viam.resource.registry import Registry, ResourceCreatorRegistration from viam.proto.app.robot import ComponentConfig from viam.proto.common import ResourceName from viam.services.vision import VisionClient -from viam.components.base import Base, Vector3 class LineFollower(Module, ResourceBase): MODEL = Model(ModelFamily("", "autonomous_example_module"), "line-follower") LOGGER = getLogger(__name__) + def __init__(self, name: str): super().__init__(name) self.camera: Camera = None @@ -520,13 +519,12 @@ import asyncio from typing import Any, Mapping, List, Literal, Sequence, Tuple from typing_extensions import Self -from viam.components.base import Base +from viam.components.base import Base, ResourceBase from viam.components.camera import Camera from viam.services.vision import VisionClient from viam.media.utils.pil import pil_to_viam_image, viam_to_pil_image from viam.module.module import Module from viam.resource.types import Model, Subtype -from viam.resource.base import ResourceBase from viam.resource.registry import Registry, ResourceCreatorRegistration from viam.proto.app.v1 import ComponentConfig from viam.services.vision import Detection @@ -534,6 +532,7 @@ from viam.services.vision import Detection class ObjectFollower(Module): MODEL = Model("", "autonomous_example_module", "object_follower") + def __init__(self, name: str): super().__init__(name) self.base: Base = None @@ -731,7 +730,8 @@ Add the following `services` configuration for your new model: "model": ":autonomous_example_module:line_follower", "attributes": { "camera_name": "my_camera", - "detector_name": "my_object_detector" + "detector_name": "my_object_detector", + "base_name": "my_base" } } ``` @@ -864,6 +864,7 @@ from email.mime.text import MIMEText class EmailNotifier(Module, Generic): MODEL = Model("", "autonomous_example_module", "email_notifier") + def __init__(self, name: str): super().__init__(name) self.camera: Camera = None From 2bcc01b6194adcdb54bca053862e53ae13677d59 Mon Sep 17 00:00:00 2001 From: nathan contino Date: Mon, 21 Jul 2025 17:02:49 -0400 Subject: [PATCH 10/15] Limit Python line lengths --- .../data-ai/ai/make-decisions-autonomously.md | 34 +++++++++++++------ 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/docs/data-ai/ai/make-decisions-autonomously.md b/docs/data-ai/ai/make-decisions-autonomously.md index dfd77e4eb0..b08cb57295 100644 --- a/docs/data-ai/ai/make-decisions-autonomously.md +++ b/docs/data-ai/ai/make-decisions-autonomously.md @@ -189,7 +189,8 @@ from viam.proto.common import ResourceName from viam.services.vision import VisionClient class LineFollower(Module, ResourceBase): - MODEL = Model(ModelFamily("", "autonomous_example_module"), "line-follower") + MODEL = Model( + ModelFamily("", "autonomous_example_module"), "line-follower") LOGGER = getLogger(__name__) @@ -204,7 +205,9 @@ class LineFollower(Module, ResourceBase): self.angular_power = 0.3 @classmethod - def new_resource(cls, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]) -> Self: + def new_resource(cls, + config: ComponentConfig, + dependencies: Mapping[ResourceName, ResourceBase]) -> Self: instance = cls(config.name) instance.reconfigure(config, dependencies) return instance @@ -218,17 +221,22 @@ class LineFollower(Module, ResourceBase): dependencies = [camera_name, detector_name, base_name] return dependencies, [] - def reconfigure(self, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]): + def reconfigure(self, + config: ComponentConfig, + dependencies: Mapping[ResourceName, ResourceBase]): self.camera_name = config.attributes.fields["camera_name"].string_value self.detector_name = config.attributes.fields["detector_name"].string_value self.detector_name = config.attributes.fields["base_name"].string_value for dependency_name, dependency in dependencies.items(): - if dependency_name.subtype == "camera" and dependency_name.name == self.camera_name: + if dependency_name.subtype == "camera" and + dependency_name.name == self.camera_name: self.camera = dependency - elif dependency_name.subtype == "vision" and dependency_name.name == self.detector_name: + elif dependency_name.subtype == "vision" and + dependency_name.name == self.detector_name: self.detector = dependency - elif dependency_name.subtype == "base" and dependency_name.name == self.base_name: + elif dependency_name.subtype == "base" and + dependency_name.name == self.base_name: self.base = dependency if not self.camera: @@ -257,15 +265,18 @@ class LineFollower(Module, ResourceBase): # Check for color in front if await self._is_color_in_front(): LineFollower.LOGGER.info("Moving forward.") - await self.base.set_power(Vector3(y=self.linear_power), Vector3()) + await self.base.set_power( + Vector3(y=self.linear_power), Vector3()) # Check for color to the left elif await self._is_color_there("left"): LineFollower.LOGGER.info("Turning left.") - await self.base.set_power(Vector3(), Vector3(z=self.angular_power)) + await self.base.set_power( + Vector3(), Vector3(z=self.angular_power)) # Check for color to the right elif await self._is_color_there("right"): LineFollower.LOGGER.info("Turning right.") - await self.base.set_power(Vector3(), Vector3(z=-self.angular_power)) + await self.base.set_power( + Vector3(), Vector3(z=-self.angular_power)) else: LineFollower.LOGGER.info("No color detected. Stopping.") await self.base.stop() @@ -281,7 +292,8 @@ class LineFollower(Module, ResourceBase): async def _start_color_following_internal(self): if not self._running_loop: self._running_loop = True - self._loop_task = asyncio.create_task(self._color_following_loop()) + self._loop_task = + asyncio.create_task(self._color_following_loop()) LineFollower.LOGGER.info("Requested to start color following loop.") else: LineFollower.LOGGER.info("Color following loop is already running.") @@ -1027,4 +1039,6 @@ Add the following `services` configuration for your new model: } ``` +Define the `sender_email`, `sender_password`, `receiver_email`, `smtp_server`, and `smtp_port` variables in the model attributes or using environment variables on your machine. + Give your machine a few moments to load the new configuration, and you can begin testing your module. From 8f00ad19ca38e50431ed9ad28778d069825ae5c2 Mon Sep 17 00:00:00 2001 From: nathan contino Date: Mon, 21 Jul 2025 17:05:56 -0400 Subject: [PATCH 11/15] prettier fix --- docs/data-ai/reference/apis/data-client.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/data-ai/reference/apis/data-client.md b/docs/data-ai/reference/apis/data-client.md index fd0b8d9ff7..aa18f91fd1 100644 --- a/docs/data-ai/reference/apis/data-client.md +++ b/docs/data-ai/reference/apis/data-client.md @@ -7,4 +7,4 @@ layout: "empty" canonical: "/dev/reference/apis/data-client/" aliases: - /data-ai/reference/data-client/ ---- \ No newline at end of file +--- From 72d6d25bdae22e97edbdfef54297003b6b3e98d3 Mon Sep 17 00:00:00 2001 From: nathan contino Date: Mon, 21 Jul 2025 17:12:49 -0400 Subject: [PATCH 12/15] break up more lines --- .../data-ai/ai/make-decisions-autonomously.md | 67 ++++++++++++------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/docs/data-ai/ai/make-decisions-autonomously.md b/docs/data-ai/ai/make-decisions-autonomously.md index b08cb57295..5b577a9d9b 100644 --- a/docs/data-ai/ai/make-decisions-autonomously.md +++ b/docs/data-ai/ai/make-decisions-autonomously.md @@ -229,14 +229,14 @@ class LineFollower(Module, ResourceBase): self.detector_name = config.attributes.fields["base_name"].string_value for dependency_name, dependency in dependencies.items(): - if dependency_name.subtype == "camera" and - dependency_name.name == self.camera_name: + if (dependency_name.subtype == "camera" + and dependency_name.name == self.camera_name): self.camera = dependency - elif dependency_name.subtype == "vision" and - dependency_name.name == self.detector_name: + elif (dependency_name.subtype == "vision" + and dependency_name.name == self.detector_name): self.detector = dependency - elif dependency_name.subtype == "base" and - dependency_name.name == self.base_name: + elif (dependency_name.subtype == "base" + and dependency_name.name == self.base_name): self.base = dependency if not self.camera: @@ -542,7 +542,8 @@ from viam.proto.app.v1 import ComponentConfig from viam.services.vision import Detection class ObjectFollower(Module): - MODEL = Model("", "autonomous_example_module", "object_follower") + MODEL = Model( + ModelFamily("", "autonomous_example_module"), "object_follower") def __init__(self, name: str): @@ -874,7 +875,8 @@ import smtplib from email.mime.text import MIMEText class EmailNotifier(Module, Generic): - MODEL = Model("", "autonomous_example_module", "email_notifier") + MODEL = Model( + ModelFamily("", "autonomous_example_module"), "email_notifier") def __init__(self, name: str): @@ -884,11 +886,16 @@ class EmailNotifier(Module, Generic): self.notification_sent: bool = False # Email configuration - self.sender_email: str = os.getenv("SENDER_EMAIL", "your_email@example.com") - self.sender_password: str = os.getenv("SENDER_PASSWORD", "your_email_password") - self.receiver_email: str = os.getenv("RECEIVER_EMAIL", "recipient_email@example.com") - self.smtp_server: str = os.getenv("SMTP_SERVER", "smtp.example.com") - self.smtp_port: int = int(os.getenv("SMTP_PORT", 587)) + self.sender_email: str = + os.getenv("SENDER_EMAIL", "your_email@example.com") + self.sender_password: str = + os.getenv("SENDER_PASSWORD", "your_email_password") + self.receiver_email: str = + os.getenv("RECEIVER_EMAIL", "recipient_email@example.com") + self.smtp_server: str = + os.getenv("SMTP_SERVER", "smtp.example.com") + self.smtp_port: int = + int(os.getenv("SMTP_PORT", 587)) self._running_loop = False self._loop_task = None @@ -897,26 +904,35 @@ class EmailNotifier(Module, Generic): def new_resource(cls, config: ComponentConfig): module = cls(config.name) if "camera_name" in config.attributes.fields: - module.camera_name = config.attributes.fields["camera_name"].string_value + module.camera_name = + config.attributes.fields["camera_name"].string_value if "detector_name" in config.attributes.fields: - module.camera_name = config.attributes.fields["detector_name"].string_value + module.camera_name = + config.attributes.fields["detector_name"].string_value if "sender_email" in config.attributes.fields: - module.sender_email = config.attributes.fields["sender_email"].string_value + module.sender_email = + config.attributes.fields["sender_email"].string_value if "sender_password" in config.attributes.fields: - module.sender_password = config.attributes.fields["sender_password"].string_value + module.sender_password = + config.attributes.fields["sender_password"].string_value if "receiver_email" in config.attributes.fields: - module.receiver_email = config.attributes.fields["receiver_email"].string_value + module.receiver_email = + config.attributes.fields["receiver_email"].string_value if "smtp_server" in config.attributes.fields: - module.smtp_server = config.attributes.fields["smtp_server"].string_value + module.smtp_server = + config.attributes.fields["smtp_server"].string_value if "smtp_port" in config.attributes.fields: - module.smtp_port = int(config.attributes.fields["smtp_port"].number_value) + module.smtp_port = + int(config.attributes.fields["smtp_port"].number_value) return module async def start(self): EmailNotifier.LOGGER.info(f"'{self.name}' starting...") - self.camera = await Camera.from_robot(self.robot, self.camera_name) - self.detector = await VisionClient.from_robot(self.robot, self.detector_name) + self.camera = + await Camera.from_robot(self.robot, self.camera_name) + self.detector = + await VisionClient.from_robot(self.robot, self.detector_name) EmailNotifier.LOGGER.info(f"'{self.name}' started. Monitoring for detections.") async def close(self): @@ -946,7 +962,8 @@ class EmailNotifier(Module, Generic): while self._running_loop: try: - detections = await self.detector.get_detections_from_camera(self.camera_name) + detections = + await self.detector.get_detections_from_camera(self.camera_name) if detections and not self.notification_sent: subject = "Viam Module Alert: Detection Found!" @@ -991,7 +1008,9 @@ class EmailNotifier(Module, Generic): EmailNotifier.LOGGER.info("Detection monitoring loop is not running.") return {"status": "not_running"} - async def do_command(self, command: Mapping[str, Any], *, timeout: float | None = None, **kwargs) -> Mapping[str, Any]: + async def do_command(self, + command: Mapping[str, Any], *, + timeout: float | None = None, **kwargs) -> Mapping[str, Any]: if "start_monitoring" in command: EmailNotifier.LOGGER.info("Received 'start_monitoring' command via do_command.") return await self._start_detection_monitoring_internal() From 2c58f2d897b263ef31d7af8ee4394058d3188931 Mon Sep 17 00:00:00 2001 From: nathan contino Date: Mon, 21 Jul 2025 17:19:07 -0400 Subject: [PATCH 13/15] Reduce python line lenghts --- .../data-ai/ai/make-decisions-autonomously.md | 144 ++++++++++++------ 1 file changed, 97 insertions(+), 47 deletions(-) diff --git a/docs/data-ai/ai/make-decisions-autonomously.md b/docs/data-ai/ai/make-decisions-autonomously.md index 5b577a9d9b..2c526f22ce 100644 --- a/docs/data-ai/ai/make-decisions-autonomously.md +++ b/docs/data-ai/ai/make-decisions-autonomously.md @@ -561,31 +561,45 @@ class ObjectFollower(Module): self.num_cycles = 200 @classmethod - def new_resource(cls, config: ComponentConfig, dependencies: Mapping[str, ResourceBase]) -> Self: + def new_resource(cls, + config: ComponentConfig, + dependencies: Mapping[str, ResourceBase]) -> Self: instance = cls(config.name) instance.reconfigure(config, dependencies) return instance @classmethod - def validate(cls, config: ComponentConfig) -> Tuple[Sequence[str], Sequence[str]]: - camera_name = config.attributes.fields["camera_name"].string_value - detector_name = config.attributes.fields["detector_name"].string_value - base_name = config.attributes.fields["base_name"].string_value + def validate(cls, + config: ComponentConfig) -> Tuple[Sequence[str], Sequence[str]]: + camera_name = + config.attributes.fields["camera_name"].string_value + detector_name = + config.attributes.fields["detector_name"].string_value + base_name = + config.attributes.fields["base_name"].string_value dependencies = [camera_name, detector_name, base_name] return dependencies, [] - def reconfigure(self, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]): - self.camera_name = config.attributes.fields["camera_name"].string_value - self.detector_name = config.attributes.fields["detector_name"].string_value - self.detector_name = config.attributes.fields["base_name"].string_value + def reconfigure(self, + config: ComponentConfig, + dependencies: Mapping[ResourceName, ResourceBase]): + self.camera_name = + config.attributes.fields["camera_name"].string_value + self.detector_name = + config.attributes.fields["detector_name"].string_value + self.detector_name = + config.attributes.fields["base_name"].string_value for dependency_name, dependency in dependencies.items(): - if dependency_name.subtype == "camera" and dependency_name.name == self.camera_name: + if (dependency_name.subtype == "camera" + and dependency_name.name == self.camera_name): self.camera = dependency - elif dependency_name.subtype == "vision" and dependency_name.name == self.detector_name: + elif (dependency_name.subtype == "vision" + and dependency_name.name == self.detector_name): self.detector = dependency - elif dependency_name.subtype == "base" and dependency_name.name == self.base_name: + elif (dependency_name.subtype == "base" + and dependency_name.name == self.base_name): self.base = dependency if not self.camera: @@ -613,7 +627,9 @@ class ObjectFollower(Module): await self.stop_object_tracking() ObjectFollower.LOGGER.info(f"'{self.name}' closed.") - def left_or_right(self, detections: List[Detection], midpoint: float) -> Literal[0, 1, 2, -1]: + def left_or_right(self, + detections: List[Detection], + midpoint: float) -> Literal[0, 1, 2, -1]: """ Get largest detection box and see if its center is in the left, center, or right third. Returns 0 for left, 1 for center, 2 for right, -1 if nothing detected. @@ -646,41 +662,55 @@ class ObjectFollower(Module): """ The core object tracking and base control logic loop. """ - ObjectFollower.LOGGER.info("Object tracking control loop started.") + ObjectFollower.LOGGER.info( + "Object tracking control loop started.") - initial_frame = await self.camera.get_image(mime_type="image/jpeg") + initial_frame = + await self.camera.get_image(mime_type="image/jpeg") pil_initial_frame = viam_to_pil_image(initial_frame) midpoint = pil_initial_frame.size[0] / 2 cycle_count = 0 - while self._running_loop and (self.num_cycles == 0 or cycle_count < self.num_cycles): + while (self._running_loop + and (self.num_cycles == 0 or cycle_count < self.num_cycles)): try: - detections = await self.detector.get_detections_from_camera(self.camera_name) + detections = + await self.detector.get_detections_from_camera(self.camera_name) answer = self.left_or_right(detections, midpoint) if answer == 0: - ObjectFollower.LOGGER.info("Detected object on left, spinning left.") - await self.base.spin(self.spin_num, self.vel) - await self.base.move_straight(self.straight_num, self.vel) + ObjectFollower.LOGGER.info( + "Detected object on left, spinning left.") + await self.base.spin( + self.spin_num, self.vel) + await self.base.move_straight( + self.straight_num, self.vel) elif answer == 1: - ObjectFollower.LOGGER.info("Detected object in center, moving straight.") - await self.base.move_straight(self.straight_num, self.vel) + ObjectFollower.LOGGER.info( + "Detected object in center, moving straight.") + await self.base.move_straight( + self.straight_num, self.vel) elif answer == 2: - ObjectFollower.LOGGER.info("Detected object on right, spinning right.") + ObjectFollower.LOGGER.info( + "Detected object on right, spinning right.") await self.base.spin(-self.spin_num, self.vel) - await self.base.move_straight(self.straight_num, self.vel) + await self.base.move_straight( + self.straight_num, self.vel) else: - ObjectFollower.LOGGER.info("No object detected, stopping base.") + ObjectFollower.LOGGER.info( + "No object detected, stopping base.") await self.base.stop() except Exception as e: - ObjectFollower.LOGGER.info(f"Error in object tracking loop: {e}") + ObjectFollower.LOGGER.info( + f"Error in object tracking loop: {e}") cycle_count += 1 await asyncio.sleep(0.1) - ObjectFollower.LOGGER.info("Object tracking loop finished or stopped.") + ObjectFollower.LOGGER.info( + "Object tracking loop finished or stopped.") await self.base.stop() self._running_loop = False @@ -690,10 +720,13 @@ class ObjectFollower(Module): """ if not self._running_loop: self._running_loop = True - self._loop_task = asyncio.create_task(self._object_tracking_loop()) - ObjectFollower.LOGGER.info("Requested to start object tracking loop.") + self._loop_task = + asyncio.create_task(self._object_tracking_loop()) + ObjectFollower.LOGGER.info( + "Requested to start object tracking loop.") else: - ObjectFollower.LOGGER.info("Object tracking loop is already running.") + ObjectFollower.LOGGER.info( + "Object tracking loop is already running.") async def stop_object_tracking(self): """ @@ -702,18 +735,22 @@ class ObjectFollower(Module): if self._running_loop: self._running_loop = False if self._loop_task: - await self._loop_task # Wait for the task to complete its current iteration and exit + await self._loop_task # complete current iteration, exit self._loop_task = None - ObjectFollower.LOGGER.info("Requested to stop object tracking loop.") + ObjectFollower.LOGGER.info( + "Requested to stop object tracking loop.") else: - ObjectFollower.LOGGER.info("Object tracking loop is not running.") + ObjectFollower.LOGGER.info( + "Object tracking loop is not running.") # Register your module Registry.register_resource_creator( ObjectFollower.MODEL, - ResourceCreatorRegistration(ObjectFollower.new_resource, ObjectFollower.validate) + ResourceCreatorRegistration( + ObjectFollower.new_resource, ObjectFollower.validate) ) + async def main(): """ Main entry point for the Viam module. @@ -968,32 +1005,41 @@ class EmailNotifier(Module, Generic): if detections and not self.notification_sent: subject = "Viam Module Alert: Detection Found!" body = "A detection was found by the vision service." - EmailNotifier.LOGGER.info("Detection found. Sending email notification...") + EmailNotifier.LOGGER.info( + "Detection found. Sending email notification...") self._send_email(subject, body) elif not detections and self.notification_sent: - EmailNotifier.LOGGER.info("No detections found. Resetting notification status.") + EmailNotifier.LOGGER.info( + "No detections found. Resetting notification status.") self.notification_sent = False elif detections and self.notification_sent: - EmailNotifier.LOGGER.info("Detection still present, but notification already sent.") + EmailNotifier.LOGGER.info( + "Detection still present, but notification already sent.") else: - EmailNotifier.LOGGER.info("No detections.") + EmailNotifier.LOGGER.info( + "No detections.") except Exception as e: - EmailNotifier.LOGGER.info(f"Error in detection monitoring loop: {e}") + EmailNotifier.LOGGER.info( + f"Error in detection monitoring loop: {e}") await asyncio.sleep(5) - EmailNotifier.LOGGER.info("Detection monitoring loop finished or stopped.") + EmailNotifier.LOGGER.info( + "Detection monitoring loop finished or stopped.") self.notification_sent = False async def _start_detection_monitoring_internal(self): if not self._running_loop: self._running_loop = True - self._loop_task = asyncio.create_task(self._detection_monitoring_loop()) - EmailNotifier.LOGGER.info("Requested to start detection monitoring loop.") + self._loop_task = + asyncio.create_task(self._detection_monitoring_loop()) + EmailNotifier.LOGGER.info( + "Requested to start detection monitoring loop.") return {"status": "started"} else: - EmailNotifier.LOGGER.info("Detection monitoring loop is already running.") + EmailNotifier.LOGGER.info( + "Detection monitoring loop is already running.") return {"status": "already_running"} async def _stop_detection_monitoring_internal(self): @@ -1002,20 +1048,24 @@ class EmailNotifier(Module, Generic): if self._loop_task: await self._loop_task self._loop_task = None - EmailNotifier.LOGGER.info("Requested to stop detection monitoring loop.") + EmailNotifier.LOGGER.info( + "Requested to stop detection monitoring loop.") return {"status": "stopped"} else: - EmailNotifier.LOGGER.info("Detection monitoring loop is not running.") + EmailNotifier.LOGGER.info( + "Detection monitoring loop is not running.") return {"status": "not_running"} async def do_command(self, command: Mapping[str, Any], *, timeout: float | None = None, **kwargs) -> Mapping[str, Any]: if "start_monitoring" in command: - EmailNotifier.LOGGER.info("Received 'start_monitoring' command via do_command.") + EmailNotifier.LOGGER.info( + "Received 'start_monitoring' command via do_command.") return await self._start_detection_monitoring_internal() elif "stop_monitoring" in command: - EmailNotifier.LOGGER.info("Received 'stop_monitoring' command via do_command.") + EmailNotifier.LOGGER.info( + "Received 'stop_monitoring' command via do_command.") return await self._stop_detection_monitoring_internal() else: raise NotImplementedError(f"Command '{command}' not recognized.") From 505c13e2c0d9ba05b26bd1b930b7f0f7854313e5 Mon Sep 17 00:00:00 2001 From: nathan contino Date: Mon, 21 Jul 2025 17:33:06 -0400 Subject: [PATCH 14/15] Fix syntax errors caused by breaking up lines --- .../data-ai/ai/make-decisions-autonomously.md | 144 ++++++------------ 1 file changed, 48 insertions(+), 96 deletions(-) diff --git a/docs/data-ai/ai/make-decisions-autonomously.md b/docs/data-ai/ai/make-decisions-autonomously.md index 2c526f22ce..7cf609df88 100644 --- a/docs/data-ai/ai/make-decisions-autonomously.md +++ b/docs/data-ai/ai/make-decisions-autonomously.md @@ -265,18 +265,15 @@ class LineFollower(Module, ResourceBase): # Check for color in front if await self._is_color_in_front(): LineFollower.LOGGER.info("Moving forward.") - await self.base.set_power( - Vector3(y=self.linear_power), Vector3()) + await self.base.set_power(Vector3(y=self.linear_power), Vector3()) # Check for color to the left elif await self._is_color_there("left"): LineFollower.LOGGER.info("Turning left.") - await self.base.set_power( - Vector3(), Vector3(z=self.angular_power)) + await self.base.set_power(Vector3(), Vector3(z=self.angular_power)) # Check for color to the right elif await self._is_color_there("right"): LineFollower.LOGGER.info("Turning right.") - await self.base.set_power( - Vector3(), Vector3(z=-self.angular_power)) + await self.base.set_power(Vector3(), Vector3(z=-self.angular_power)) else: LineFollower.LOGGER.info("No color detected. Stopping.") await self.base.stop() @@ -292,8 +289,7 @@ class LineFollower(Module, ResourceBase): async def _start_color_following_internal(self): if not self._running_loop: self._running_loop = True - self._loop_task = - asyncio.create_task(self._color_following_loop()) + self._loop_task = asyncio.create_task(self._color_following_loop()) LineFollower.LOGGER.info("Requested to start color following loop.") else: LineFollower.LOGGER.info("Color following loop is already running.") @@ -571,12 +567,9 @@ class ObjectFollower(Module): @classmethod def validate(cls, config: ComponentConfig) -> Tuple[Sequence[str], Sequence[str]]: - camera_name = - config.attributes.fields["camera_name"].string_value - detector_name = - config.attributes.fields["detector_name"].string_value - base_name = - config.attributes.fields["base_name"].string_value + camera_name = config.attributes.fields["camera_name"].string_value + detector_name = config.attributes.fields["detector_name"].string_value + base_name = config.attributes.fields["base_name"].string_value dependencies = [camera_name, detector_name, base_name] return dependencies, [] @@ -584,12 +577,9 @@ class ObjectFollower(Module): def reconfigure(self, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]): - self.camera_name = - config.attributes.fields["camera_name"].string_value - self.detector_name = - config.attributes.fields["detector_name"].string_value - self.detector_name = - config.attributes.fields["base_name"].string_value + self.camera_name = config.attributes.fields["camera_name"].string_value + self.detector_name = config.attributes.fields["detector_name"].string_value + self.detector_name = config.attributes.fields["base_name"].string_value for dependency_name, dependency in dependencies.items(): if (dependency_name.subtype == "camera" @@ -662,8 +652,7 @@ class ObjectFollower(Module): """ The core object tracking and base control logic loop. """ - ObjectFollower.LOGGER.info( - "Object tracking control loop started.") + ObjectFollower.LOGGER.info("Object tracking control loop started.") initial_frame = await self.camera.get_image(mime_type="image/jpeg") @@ -680,31 +669,22 @@ class ObjectFollower(Module): answer = self.left_or_right(detections, midpoint) if answer == 0: - ObjectFollower.LOGGER.info( - "Detected object on left, spinning left.") - await self.base.spin( - self.spin_num, self.vel) - await self.base.move_straight( - self.straight_num, self.vel) + ObjectFollower.LOGGER.info("Detected object on left, spinning left.") + await self.base.spin(self.spin_num, self.vel) + await self.base.move_straight(self.straight_num, self.vel) elif answer == 1: - ObjectFollower.LOGGER.info( - "Detected object in center, moving straight.") - await self.base.move_straight( - self.straight_num, self.vel) + ObjectFollower.LOGGER.info("Detected object in center, moving straight.") + await self.base.move_straight(self.straight_num, self.vel) elif answer == 2: - ObjectFollower.LOGGER.info( - "Detected object on right, spinning right.") + ObjectFollower.LOGGER.info("Detected object on right, spinning right.") await self.base.spin(-self.spin_num, self.vel) - await self.base.move_straight( - self.straight_num, self.vel) + await self.base.move_straight(self.straight_num, self.vel) else: - ObjectFollower.LOGGER.info( - "No object detected, stopping base.") + ObjectFollower.LOGGER.info("No object detected, stopping base.") await self.base.stop() except Exception as e: - ObjectFollower.LOGGER.info( - f"Error in object tracking loop: {e}") + ObjectFollower.LOGGER.info(f"Error in object tracking loop: {e}") cycle_count += 1 await asyncio.sleep(0.1) @@ -722,11 +702,9 @@ class ObjectFollower(Module): self._running_loop = True self._loop_task = asyncio.create_task(self._object_tracking_loop()) - ObjectFollower.LOGGER.info( - "Requested to start object tracking loop.") + ObjectFollower.LOGGER.info("Requested to start object tracking loop.") else: - ObjectFollower.LOGGER.info( - "Object tracking loop is already running.") + ObjectFollower.LOGGER.info("Object tracking loop is already running.") async def stop_object_tracking(self): """ @@ -737,11 +715,9 @@ class ObjectFollower(Module): if self._loop_task: await self._loop_task # complete current iteration, exit self._loop_task = None - ObjectFollower.LOGGER.info( - "Requested to stop object tracking loop.") + ObjectFollower.LOGGER.info("Requested to stop object tracking loop.") else: - ObjectFollower.LOGGER.info( - "Object tracking loop is not running.") + ObjectFollower.LOGGER.info("Object tracking loop is not running.") # Register your module Registry.register_resource_creator( @@ -923,16 +899,11 @@ class EmailNotifier(Module, Generic): self.notification_sent: bool = False # Email configuration - self.sender_email: str = - os.getenv("SENDER_EMAIL", "your_email@example.com") - self.sender_password: str = - os.getenv("SENDER_PASSWORD", "your_email_password") - self.receiver_email: str = - os.getenv("RECEIVER_EMAIL", "recipient_email@example.com") - self.smtp_server: str = - os.getenv("SMTP_SERVER", "smtp.example.com") - self.smtp_port: int = - int(os.getenv("SMTP_PORT", 587)) + self.sender_email: str = os.getenv("SENDER_EMAIL", "your_email@example.com") + self.sender_password: str = os.getenv("SENDER_PASSWORD", "your_email_password") + self.receiver_email: str = os.getenv("RECEIVER_EMAIL", "recipient_email@example.com") + self.smtp_server: str = os.getenv("SMTP_SERVER", "smtp.example.com") + self.smtp_port: int = int(os.getenv("SMTP_PORT", 587)) self._running_loop = False self._loop_task = None @@ -941,26 +912,19 @@ class EmailNotifier(Module, Generic): def new_resource(cls, config: ComponentConfig): module = cls(config.name) if "camera_name" in config.attributes.fields: - module.camera_name = - config.attributes.fields["camera_name"].string_value + module.camera_name = config.attributes.fields["camera_name"].string_value if "detector_name" in config.attributes.fields: - module.camera_name = - config.attributes.fields["detector_name"].string_value + module.camera_name = config.attributes.fields["detector_name"].string_value if "sender_email" in config.attributes.fields: - module.sender_email = - config.attributes.fields["sender_email"].string_value + module.sender_email = config.attributes.fields["sender_email"].string_value if "sender_password" in config.attributes.fields: - module.sender_password = - config.attributes.fields["sender_password"].string_value + module.sender_password = config.attributes.fields["sender_password"].string_value if "receiver_email" in config.attributes.fields: - module.receiver_email = - config.attributes.fields["receiver_email"].string_value + module.receiver_email = config.attributes.fields["receiver_email"].string_value if "smtp_server" in config.attributes.fields: - module.smtp_server = - config.attributes.fields["smtp_server"].string_value + module.smtp_server = config.attributes.fields["smtp_server"].string_value if "smtp_port" in config.attributes.fields: - module.smtp_port = - int(config.attributes.fields["smtp_port"].number_value) + module.smtp_port = int(config.attributes.fields["smtp_port"].number_value) return module @@ -1005,28 +969,22 @@ class EmailNotifier(Module, Generic): if detections and not self.notification_sent: subject = "Viam Module Alert: Detection Found!" body = "A detection was found by the vision service." - EmailNotifier.LOGGER.info( - "Detection found. Sending email notification...") + EmailNotifier.LOGGER.info("Detection found. Sending email notification...") self._send_email(subject, body) elif not detections and self.notification_sent: - EmailNotifier.LOGGER.info( - "No detections found. Resetting notification status.") + EmailNotifier.LOGGER.info("No detections found. Resetting notification status.") self.notification_sent = False elif detections and self.notification_sent: - EmailNotifier.LOGGER.info( - "Detection still present, but notification already sent.") + EmailNotifier.LOGGER.info("Detection still present, but notification already sent.") else: - EmailNotifier.LOGGER.info( - "No detections.") + EmailNotifier.LOGGER.info("No detections.") except Exception as e: - EmailNotifier.LOGGER.info( - f"Error in detection monitoring loop: {e}") + EmailNotifier.LOGGER.info(f"Error in detection monitoring loop: {e}") await asyncio.sleep(5) - EmailNotifier.LOGGER.info( - "Detection monitoring loop finished or stopped.") + EmailNotifier.LOGGER.info("Detection monitoring loop finished or stopped.") self.notification_sent = False async def _start_detection_monitoring_internal(self): @@ -1034,12 +992,10 @@ class EmailNotifier(Module, Generic): self._running_loop = True self._loop_task = asyncio.create_task(self._detection_monitoring_loop()) - EmailNotifier.LOGGER.info( - "Requested to start detection monitoring loop.") + EmailNotifier.LOGGER.info("Requested to start detection monitoring loop.") return {"status": "started"} else: - EmailNotifier.LOGGER.info( - "Detection monitoring loop is already running.") + EmailNotifier.LOGGER.info("Detection monitoring loop is already running.") return {"status": "already_running"} async def _stop_detection_monitoring_internal(self): @@ -1048,24 +1004,20 @@ class EmailNotifier(Module, Generic): if self._loop_task: await self._loop_task self._loop_task = None - EmailNotifier.LOGGER.info( - "Requested to stop detection monitoring loop.") + EmailNotifier.LOGGER.info("Requested to stop detection monitoring loop.") return {"status": "stopped"} else: - EmailNotifier.LOGGER.info( - "Detection monitoring loop is not running.") + EmailNotifier.LOGGER.info("Detection monitoring loop is not running.") return {"status": "not_running"} async def do_command(self, command: Mapping[str, Any], *, timeout: float | None = None, **kwargs) -> Mapping[str, Any]: if "start_monitoring" in command: - EmailNotifier.LOGGER.info( - "Received 'start_monitoring' command via do_command.") + EmailNotifier.LOGGER.info("Received 'start_monitoring' command via do_command.") return await self._start_detection_monitoring_internal() elif "stop_monitoring" in command: - EmailNotifier.LOGGER.info( - "Received 'stop_monitoring' command via do_command.") + EmailNotifier.LOGGER.info("Received 'stop_monitoring' command via do_command.") return await self._stop_detection_monitoring_internal() else: raise NotImplementedError(f"Command '{command}' not recognized.") From 0c2e26391796cdb09c0dce181e139b7198244a8b Mon Sep 17 00:00:00 2001 From: nathan contino Date: Mon, 21 Jul 2025 17:38:08 -0400 Subject: [PATCH 15/15] Fix syntax --- .../data-ai/ai/make-decisions-autonomously.md | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/docs/data-ai/ai/make-decisions-autonomously.md b/docs/data-ai/ai/make-decisions-autonomously.md index 7cf609df88..7d901f0fb8 100644 --- a/docs/data-ai/ai/make-decisions-autonomously.md +++ b/docs/data-ai/ai/make-decisions-autonomously.md @@ -586,10 +586,10 @@ class ObjectFollower(Module): and dependency_name.name == self.camera_name): self.camera = dependency elif (dependency_name.subtype == "vision" - and dependency_name.name == self.detector_name): + and dependency_name.name == self.detector_name): self.detector = dependency elif (dependency_name.subtype == "base" - and dependency_name.name == self.base_name): + and dependency_name.name == self.base_name): self.base = dependency if not self.camera: @@ -654,8 +654,7 @@ class ObjectFollower(Module): """ ObjectFollower.LOGGER.info("Object tracking control loop started.") - initial_frame = - await self.camera.get_image(mime_type="image/jpeg") + initial_frame = await self.camera.get_image(mime_type="image/jpeg") pil_initial_frame = viam_to_pil_image(initial_frame) midpoint = pil_initial_frame.size[0] / 2 @@ -663,8 +662,7 @@ class ObjectFollower(Module): while (self._running_loop and (self.num_cycles == 0 or cycle_count < self.num_cycles)): try: - detections = - await self.detector.get_detections_from_camera(self.camera_name) + detections = await self.detector.get_detections_from_camera(self.camera_name) answer = self.left_or_right(detections, midpoint) @@ -700,8 +698,7 @@ class ObjectFollower(Module): """ if not self._running_loop: self._running_loop = True - self._loop_task = - asyncio.create_task(self._object_tracking_loop()) + self._loop_task = asyncio.create_task(self._object_tracking_loop()) ObjectFollower.LOGGER.info("Requested to start object tracking loop.") else: ObjectFollower.LOGGER.info("Object tracking loop is already running.") @@ -930,10 +927,8 @@ class EmailNotifier(Module, Generic): async def start(self): EmailNotifier.LOGGER.info(f"'{self.name}' starting...") - self.camera = - await Camera.from_robot(self.robot, self.camera_name) - self.detector = - await VisionClient.from_robot(self.robot, self.detector_name) + self.camera = await Camera.from_robot(self.robot, self.camera_name) + self.detector = await VisionClient.from_robot(self.robot, self.detector_name) EmailNotifier.LOGGER.info(f"'{self.name}' started. Monitoring for detections.") async def close(self): @@ -990,8 +985,7 @@ class EmailNotifier(Module, Generic): async def _start_detection_monitoring_internal(self): if not self._running_loop: self._running_loop = True - self._loop_task = - asyncio.create_task(self._detection_monitoring_loop()) + self._loop_task = asyncio.create_task(self._detection_monitoring_loop()) EmailNotifier.LOGGER.info("Requested to start detection monitoring loop.") return {"status": "started"} else: