Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,36 @@
queue_max_size: 10

video_input:
camera_name: 0
worker_period: 1.0 # seconds
save_prefix: "log_image"
camera_option: 0 # 0 is for opencv camera (from camera_factory.py)
width: 1920
height: 1200
# camera_config: # For Picamera
# exposure_time: 250 # microseconds
# analogue_gain: 64.0 # Sets ISO, 1.0 for normal, 64.0 for max, 0.0 for min
# contrast: 1.0 # Contrast, 1.0 for nomral, 32.0 for max, 0.0 for min
# lens_position: null # Focal length, 1/m (0 for infinity, null for auto focus)
camera_config: # For CV camera (regular cameras)
device_index: 0
save_prefix: "log_image" # Leave as empty string to not log any images

detect_target:
worker_count: 1
option: 0 # 0 is for Ultralytics (from detect_target_factory.py)
device: 0
model_path: "tests/model_example/yolov8s_ultralytics_pretrained_default.pt" # TODO: update
model_path: "tests/model_example/yolov8s_ultralytics_pretrained_default.pt" # See autonomy OneDrive for latest model
save_prefix: "log_comp"

flight_interface:
address: "tcp:127.0.0.1:14550"
timeout: 10.0 # seconds
# port 5762 connects directly to the simulated auto pilot, which is more realistic
# than connecting to port 14550, which is the ground station
address: "tcp:localhost:5762"
timeout: 30.0 # seconds
baud_rate: 57600 # symbol rate
worker_period: 0.1 # seconds

data_merge:
timeout: 10.0 # seconds
timeout: 30.0 # seconds

geolocation:
resolution_x: 1920
Expand Down
28 changes: 22 additions & 6 deletions main_2024.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
# Used in type annotation of flight interface output
# pylint: disable-next=unused-import
from modules import odometry_and_time
from modules.common.modules.camera import camera_factory
from modules.common.modules.camera import camera_opencv
from modules.common.modules.camera import camera_picamera2
from modules.communications import communications_worker
from modules.detect_target import detect_target_factory
from modules.detect_target import detect_target_worker
Expand Down Expand Up @@ -81,14 +84,24 @@ def main() -> int:
# pylint: disable=invalid-name
QUEUE_MAX_SIZE = config["queue_max_size"]

VIDEO_INPUT_CAMERA_NAME = config["video_input"]["camera_name"]
VIDEO_INPUT_WORKER_PERIOD = config["video_input"]["worker_period"]
VIDEO_INPUT_SAVE_NAME_PREFIX = config["video_input"]["save_prefix"]
VIDEO_INPUT_SAVE_PREFIX = str(pathlib.Path(logging_path, VIDEO_INPUT_SAVE_NAME_PREFIX))
VIDEO_INPUT_OPTION = camera_factory.CameraOption(config["video_input"]["camera_option"])
VIDEO_INPUT_WIDTH = config["video_input"]["width"]
VIDEO_INPUT_HEIGHT = config["video_input"]["height"]
if VIDEO_INPUT_OPTION == camera_factory.CameraOption.OPENCV:
VIDEO_INPUT_CAMERA_CONFIG = camera_opencv.ConfigOpenCV(
**config["video_input"]["camera_config"]
)
elif VIDEO_INPUT_OPTION == camera_factory.CameraOption.PICAM2:
VIDEO_INPUT_CAMERA_CONFIG = camera_picamera2.ConfigPiCamera2(
**config["video_input"]["camera_config"]
)
VIDEO_INPUT_SAVE_PREFIX = config["video_input"]["save_prefix"]

DETECT_TARGET_WORKER_COUNT = config["detect_target"]["worker_count"]
DETECT_TARGET_OPTION_INT = config["detect_target"]["option"]
DETECT_TARGET_OPTION = detect_target_factory.DetectTargetOption(DETECT_TARGET_OPTION_INT)
DETECT_TARGET_OPTION = detect_target_factory.DetectTargetOption(
config["detect_target"]["option"]
)
DETECT_TARGET_DEVICE = "cpu" if args.cpu else config["detect_target"]["device"]
DETECT_TARGET_MODEL_PATH = config["detect_target"]["model_path"]
DETECT_TARGET_OVERRIDE_FULL_PRECISION = args.full
Expand Down Expand Up @@ -199,8 +212,11 @@ def main() -> int:
count=1,
target=video_input_worker.video_input_worker,
work_arguments=(
VIDEO_INPUT_CAMERA_NAME,
VIDEO_INPUT_WORKER_PERIOD,
VIDEO_INPUT_OPTION,
VIDEO_INPUT_WIDTH,
VIDEO_INPUT_HEIGHT,
VIDEO_INPUT_CAMERA_CONFIG,
VIDEO_INPUT_SAVE_PREFIX,
),
input_queues=[],
Expand Down
59 changes: 54 additions & 5 deletions modules/video_input/video_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,72 @@
"""

from .. import image_and_time
from ..common.modules.camera import camera_device
from ..common.modules.camera import base_camera
from ..common.modules.camera import camera_factory
from ..common.modules.camera import camera_opencv
from ..common.modules.camera import camera_picamera2
from ..common.modules.logger import logger


class VideoInput:
"""
Combines image and timestamp together.
"""

def __init__(self, camera_name: "int | str", save_name: str = "") -> None:
self.device = camera_device.CameraDevice(camera_name, 1, save_name)
__create_key = object()

def run(self) -> "tuple[bool, image_and_time.ImageAndTime | None]":
@classmethod
def create(
cls,
camera_option: camera_factory.CameraOption,
width: int,
height: int,
config: camera_opencv.ConfigOpenCV | camera_picamera2.ConfigPiCamera2,
save_prefix: str,
local_logger: logger.Logger,
) -> "tuple[True, VideoInput] | tuple[False, None]":
"""
camera_option specifies which camera driver to use.
width is the width of the images the camera takes in pixels.
height is the height of the images the camera takes in pixels.
camera_config specifies camera settings.
save_prefix is name of iamge log files. Leave as empty string for not logging the images.
"""
result, camera = camera_factory.create_camera(camera_option, width, height, config)
if not result:
local_logger.error(
f"camera factory failed. Current configs were: {camera_option=}, {width=}, {height=}, {config=}. Please try a different set of configs."
)
return False, None
return True, VideoInput(cls.__create_key, camera, save_prefix, local_logger)

def __init__(
self,
class_private_create_key: object,
camera: base_camera.BaseCameraDevice,
save_prefix: str,
local_logger: logger.Logger,
) -> None:
"""
Private constructor, use create() method.
"""
assert class_private_create_key is VideoInput.__create_key, "Use create() method."

self.__device = camera
self.__save_prefix = save_prefix
self.__logger = local_logger

def run(self) -> "tuple[True, image_and_time.ImageAndTime] | tuple[False, None]":
"""
Returns a possible ImageAndTime with current timestamp.
"""
result, image = self.device.get_image()
result, image = self.__device.run()
if not result:
self.__logger.warning("Failed to take image")
return False, None

# If __save_prefix is not empty string, then save image
if self.__save_prefix:
self.__logger.save_image(image, self.__save_prefix)

return image_and_time.ImageAndTime.create(image)
40 changes: 33 additions & 7 deletions modules/video_input/video_input_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,56 @@
Gets images from the camera.
"""

import os
import pathlib
import time

from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from . import video_input
from ..common.modules.camera import camera_factory
from ..common.modules.camera import camera_opencv
from ..common.modules.camera import camera_picamera2
from ..common.modules.logger import logger


def video_input_worker(
camera_name: "int | str",
period: float,
save_name: str,
period: int,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move period with the worker parameters. Also should be float.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It currently is a worker parameter? I changed it to float

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant immediately above output_queue . Then the parameters for the class object and the worker are grouped separately.

camera_option: camera_factory.CameraOption,
width: int,
height: int,
camera_config: camera_opencv.ConfigOpenCV | camera_picamera2.ConfigPiCamera2,
save_prefix: str,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
controller: worker_controller.WorkerController,
) -> None:
"""
Worker process.
camera_name is initial setting.
period is minimum period between loops.
save_name is path for logging.
period is the minimum period between image captures.
output_queue is the data queue.
controller is how the main process communicates to this worker process.
"""
input_device = video_input.VideoInput(camera_name, save_name)
worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True)
if not result:
print("ERROR: Worker failed to create logger")
return

assert local_logger is not None

local_logger.info("Logger initialized")

result, input_device = video_input.VideoInput.create(
camera_option, width, height, camera_config, save_prefix, local_logger
)
if not result:
local_logger.error("Worker failed to create class object")
return

# Get Pylance to stop complaining
assert input_device is not None

while not controller.is_exit_requested():
controller.check_pause()
Expand Down
25 changes: 21 additions & 4 deletions tests/integration/test_video_input_hardware.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,38 @@
Simple hardware test, requires camera.
"""

import pathlib

from modules.common.modules.camera import camera_factory
from modules.common.modules.camera import camera_opencv
from modules.common.modules.logger import logger
from modules.video_input import video_input


CAMERA = 0
# Modify as needed
CAMERA = camera_factory.CameraOption.OPENCV
WIDTH = 1920
HEIGHT = 1200
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integration test is failing for me, I suggest a resolution of 640x480 .

CONFIG = camera_opencv.ConfigOpenCV(0)
SAVE_PREFIX = "" # Not saving any pictures


def main() -> int:
"""
Main function.
"""
# Logger
test_name = pathlib.Path(__file__).stem
result, local_logger = logger.Logger.create(test_name, False)
assert result
assert local_logger is not None

# Setup
# TODO: Common change logging option
camera = video_input.VideoInput(
CAMERA,
result, camera = video_input.VideoInput.create(
CAMERA, WIDTH, HEIGHT, CONFIG, SAVE_PREFIX, local_logger
)
assert result
assert camera is not None

# Run
result, image = camera.run()
Expand Down
20 changes: 18 additions & 2 deletions tests/integration/test_video_input_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,21 @@
import queue
import time

from modules.common.modules.camera import camera_factory
from modules.common.modules.camera import camera_opencv
from modules.video_input import video_input_worker
from modules import image_and_time
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller


# Modify these settings as needed
VIDEO_INPUT_WORKER_PERIOD = 1.0
CAMERA = 0
CAMERA = camera_factory.CameraOption.OPENCV
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also failing here, but for some reason there is a conversion which results in this being float in the worker function.

WIDTH = 1920
HEIGHT = 1200
CONFIG = camera_opencv.ConfigOpenCV(0)
SAVE_PREFIX = "" # Not saving any pictures


def main() -> int:
Expand All @@ -29,7 +36,16 @@ def main() -> int:

worker = mp.Process(
target=video_input_worker.video_input_worker,
args=(CAMERA, VIDEO_INPUT_WORKER_PERIOD, "", out_queue, controller),
args=(
VIDEO_INPUT_WORKER_PERIOD,
CAMERA,
WIDTH,
HEIGHT,
CONFIG,
SAVE_PREFIX,
out_queue,
controller,
),
)

# Run
Expand Down
Loading