Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 32 additions & 10 deletions modules/cluster_estimation/cluster_estimation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
covariance of each landing pad estimation.
"""

# pylint: disable=duplicate-code

import numpy as np
import sklearn
import sklearn.datasets
Expand All @@ -20,6 +22,20 @@ class ClusterEstimation:
works by predicting 'cluster centres' from groups of closely placed landing pad
detections.

ATTRIBUTES
----------
min_activation_threshold: int
Minimum total data points before model runs.

min_new_points_to_run: int
Minimum number of new data points that must be collected before running model.

random_state: int
Seed for randomizer, to get consistent results.

local_logger: Logger
For logging error and debug messages.

METHODS
-------
run()
Expand All @@ -42,6 +58,10 @@ class ClusterEstimation:
Removes any cluster with covariances much higher than the lowest covariance value.
"""

# pylint: disable=too-many-instance-attributes

# pylint: disable=too-many-instance-attributes

__create_key = object()

# VGMM Hyperparameters
Expand Down Expand Up @@ -167,6 +187,10 @@ def run(
List containing ObjectInWorld objects, containing position and covariance value.
None if conditions not met and model not ran or model failed to converge.
"""
# in use, all detections will have the same label, so the
# first element's label was arbitrarily selected
label = detections[0].label

# Store new input data
self.__current_bucket += self.__convert_detections_to_point(detections)

Expand Down Expand Up @@ -199,6 +223,7 @@ def run(

# Filter out all clusters after __WEIGHT_DROP_THRESHOLD weight drop occurs
viable_clusters = [model_output[0]]
print(f"len(model_output) = {len(model_output)}")
for i in range(1, len(model_output)):
if model_output[i][1] / model_output[i - 1][1] < self.__WEIGHT_DROP_THRESHOLD:
break
Expand All @@ -211,21 +236,18 @@ def run(
model_output = self.__filter_by_covariances(model_output)

# Create output list of remaining valid clusters
detections_in_world = []
objects_in_world = []
for cluster in model_output:
result, landing_pad = object_in_world.ObjectInWorld.create(
cluster[0][0],
cluster[0][1],
cluster[2],
cluster[0][0], cluster[0][1], cluster[2], label
)

if result:
detections_in_world.append(landing_pad)
objects_in_world.append(landing_pad)
else:
self.__logger.warning("Failed to create ObjectInWorld object")

self.__logger.info(detections_in_world)
return True, detections_in_world
self.__logger.error("Failed to create ObjectInWorld object")
return False, None
return True, objects_in_world

def __decide_to_run(self, run_override: bool) -> bool:
"""
Expand Down Expand Up @@ -288,7 +310,7 @@ def __sort_by_weights(
@staticmethod
def __convert_detections_to_point(
detections: "list[detection_in_world.DetectionInWorld]",
) -> "list[tuple[float, float]]":
) -> "list[tuple[float, float, int]]":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we change this function signature? You never changed the function so it still returns a (float, float)

"""
Convert DetectionInWorld input object to a list of points- (x,y) positions, to store.

Expand Down
177 changes: 177 additions & 0 deletions modules/cluster_estimation/cluster_estimation_by_label.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
"""
Take in bounding box coordinates from Geolocation and use to estimate landing pad locations.
Returns an array of classes, each containing the x coordinate, y coordinate, and spherical
covariance of each landing pad estimation.
"""

from .. import detection_in_world
from .. import object_in_world
from ..common.modules.logger import logger
from . import cluster_estimation


class ClusterEstimationByLabel:
"""
Estimate landing pad locations based on landing pad ground detection. Estimation
works by predicting 'cluster centres' from groups of closely placed landing pad
detections.
ATTRIBUTES
----------
min_activation_threshold: int
Minimum total data points before model runs. Must be at least max_num_components.
min_new_points_to_run: int
Minimum number of new data points that must be collected before running model.
max_num_components: int
Max number of real landing pads. Must be at least 1.
random_state: int
Seed for randomizer, to get consistent results.
local_logger: Logger
For logging error and debug messages.
METHODS
-------
run()
Take in list of object detections and return dictionary of labels to
to corresponging clusters of estimated object locations if number of
detections is sufficient, or if manually forced to run.
"""

# pylint: disable=too-many-instance-attributes

__create_key = object()

@classmethod
def create(
cls,
min_activation_threshold: int,
min_new_points_to_run: int,
max_num_components: int,
random_state: int,
local_logger: logger.Logger,
) -> "tuple[bool, ClusterEstimationByLabel | None]":
"""
Data requirement conditions for estimation model to run.
"""

# At least 1 point for model to fit
if min_activation_threshold < max_num_components:
return False, None

if min_new_points_to_run < 0:
return False, None

if max_num_components < 1:
return False, None

if random_state < 0:
return False, None

return True, ClusterEstimationByLabel(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check ClusterEstimation's restrictions. Either apply them again here or by invoking creating cluster estimation

cls.__create_key,
min_activation_threshold,
min_new_points_to_run,
max_num_components,
random_state,
local_logger,
)

def __init__(
self,
class_private_create_key: object,
min_activation_threshold: int,
min_new_points_to_run: int,
max_num_components: int,
random_state: int,
local_logger: logger.Logger,
) -> None:
"""
Private constructor, use create() method.
"""
assert (
class_private_create_key is ClusterEstimationByLabel.__create_key
), "Use create() method"

# Requirements to decide to run
self.__min_activation_threshold = min_activation_threshold
self.__min_new_points_to_run = min_new_points_to_run
self.__max_num_components = max_num_components
self.__random_state = random_state
self.__local_logger = local_logger

# Cluster model corresponding to each label
# Each cluster estimation object stores the detections given to in its __all_points bucket across runs
self.__label_to_cluster_estimation_model: dict[
int, cluster_estimation.ClusterEstimation
] = {}

def run(
self,
input_detections: "list[detection_in_world.DetectionInWorld]",
run_override: bool,
) -> "tuple[True, dict[int, list[object_in_world.ObjectInWorld]]] | tuple[False, None]":
"""
Take in list of detections and return list of estimated object locations
if number of detections is sufficient, or if manually forced to run.
PARAMETERS
----------
input_detections: list[DetectionInWorld]
List containing DetectionInWorld objects which holds real-world positioning data to run
clustering on.
run_override: bool
Forces ClusterEstimation to predict if data is available, regardless of any other
requirements.
RETURNS
-------
model_ran: bool
True if ClusterEstimation object successfully ran its estimation model, False otherwise.
labels_to_object_clusters: dict[int, list[object_in_world.ObjectInWorld] or None.
Dictionary where the key is a label and the value is a list of all cluster detections with that label
"""
label_to_detections: dict[int, list[detection_in_world.DetectionInWorld]] = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment saying sorting detections by label

# Sorting detections by label
for detection in input_detections:
if not detection.label in label_to_detections:
label_to_detections[detection.label] = []
label_to_detections[detection.label].append(detection)

labels_to_object_clusters: dict[int, list[object_in_world.ObjectInWorld]] = {}
for label, detections in label_to_detections.items():
# create cluster estimation for label if it doesn't exist
if not label in self.__label_to_cluster_estimation_model:
result, cluster_model = cluster_estimation.ClusterEstimation.create(
self.__min_activation_threshold,
self.__min_new_points_to_run,
self.__max_num_components,
self.__random_state,
self.__local_logger,
)
if not result:
self.__local_logger.error(
f"Failed to create cluster estimation for label {label}"
)
return False, None
self.__label_to_cluster_estimation_model[label] = cluster_model
# runs cluster estimation for specific label
result, clusters = self.__label_to_cluster_estimation_model[label].run(
detections,
run_override,
)
if not result:
self.__local_logger.error(
f"Failed to run cluster estimation model for label {label}"
)
return False, None
if not label in labels_to_object_clusters:
labels_to_object_clusters[label] = []
labels_to_object_clusters[label] += clusters
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this is redundant? Every single time it will go into the if because you started with an empty dictionary, and it will loop through each label once. You should be able to just set it. The only thing you may need to worry about is reference copy vs shallow copy vs deep copy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the labels_to_object_clusters dictionary starts off empty and is a temporary dictionary that is recreated every run, so it needs to be inititialized every time an unused label is used.


return True, labels_to_object_clusters
31 changes: 21 additions & 10 deletions modules/cluster_estimation/cluster_estimation_worker.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this worker to use the new ClusterEstimationByLabel class.

Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""
Gets detections in world space and outputs estimations of objects.
Take in bounding box coordinates from Geolocation and use to estimate landing pad locations.
Returns an array of classes, each containing the x coordinate, y coordinate, and spherical
covariance of each landing pad estimation.
"""

import os
Expand Down Expand Up @@ -32,20 +34,29 @@ def cluster_estimation_worker(
min_new_points_to_run: int
Minimum number of new data points that must be collected before running model.

max_num_components: int
Max number of real landing pads.

random_state: int
Seed for randomizer, to get consistent results.

input_queue: queue_proxy_wrapper.QueuePRoxyWrapper
Data queue.
METHODS
-------
run()
Take in list of landing pad detections and return list of estimated landing pad locations
if number of detections is sufficient, or if manually forced to run.

__decide_to_run()
Decide when to run cluster estimation model.

__sort_by_weights()
Sort input model output list by weights in descending order.

__convert_detections_to_point()
Convert DetectionInWorld input object to a [x,y] position to store.

output_queue: queue_proxy_wrapper.QueuePRoxyWrapper
Data queue.
__filter_by_points_ownership()
Removes any clusters that don't have any points belonging to it.

worker_controller: worker_controller.WorkerController
How the main process communicates to this worker process.
__filter_by_covariances()
Removes any cluster with covariances much higher than the lowest covariance value.
"""
worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
Expand Down
10 changes: 7 additions & 3 deletions modules/object_in_world.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class ObjectInWorld:

@classmethod
def create(
cls, location_x: float, location_y: float, spherical_variance: float
cls, location_x: float, location_y: float, spherical_variance: float, label: int
) -> "tuple[bool, ObjectInWorld | None]":
"""
location_x, location_y: Location of the object.
Expand All @@ -21,14 +21,17 @@ def create(
if spherical_variance < 0.0:
return False, None

return True, ObjectInWorld(cls.__create_key, location_x, location_y, spherical_variance)
return True, ObjectInWorld(
cls.__create_key, location_x, location_y, spherical_variance, label
)

def __init__(
self,
class_private_create_key: object,
location_x: float,
location_y: float,
spherical_variance: float,
label: int,
) -> None:
"""
Private constructor, use create() method.
Expand All @@ -38,12 +41,13 @@ def __init__(
self.location_x = location_x
self.location_y = location_y
self.spherical_variance = spherical_variance
self.label = label

def __str__(self) -> str:
"""
To string.
"""
return f"{self.__class__}, location_x: {self.location_x}, location_y: {self.location_y}, spherical_variance: {self.spherical_variance}"
return f"{self.__class__}, location_x: {self.location_x}, location_y: {self.location_y}, spherical_variance: {self.spherical_variance}, label: {self.label}"

def __repr__(self) -> str:
"""
Expand Down
Loading