Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ b1_ws/build/
b1_ws/install/
b1_ws/log/
b1_ws/src/ros2

# Virtual environments.
*.venv*
317 changes: 294 additions & 23 deletions 01_dsp/dspml_pipeline/dspml_pipeline/data/frame_loader.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,37 @@
"""
File:
frame_loader.py

Description:
Tools for loading datasets from the GOPHERS pipeline.

Authors:
jLab
Eric Vetha
nubby

Date:
24 Feb 2026

Version:
1.0.9
"""
import logging
logger = logging.getLogger(__name__)

import numpy as np
from pathlib import Path
from ..setup_logging import setup_logging
import json
import os
import pandas as pd
import sys
from scipy import signal


THRESHOLD = 50 # For anomoly removal


class FrameLoader:
"""
FrameLoader class for processing radar data into standardized input (X) and output (y) matrices for regression tasks.
Expand Down Expand Up @@ -38,10 +59,15 @@ class FrameLoader:
y (np.ndarray): Corresponding labels (targets).
"""

def __init__(self, dataset_dirs:list, target_dir:str,
data_log:str = "data-log.csv",
folder_name:str = "Sample #", label_name:str = "Bulk Density (g/cm^3)",
verbose:bool = False):
def __init__(self,
target_dir: str,
data_log: str = "data-log.csv",
dataset: np.ndarray = None,
dataset_dirs: list = [],
folder_name: str = "Sample #",
label_name: str = "Bulk Density (g/cm^3)",
labels: np.ndarray = None,
verbose: bool = False):
"""
Initializes the FrameLoader instance based on the provided directories.

Expand All @@ -57,21 +83,252 @@ def __init__(self, dataset_dirs:list, target_dir:str,
self.dataset_dirs = dataset_dirs
self.target_dir = target_dir
self.data_log = data_log
self.X = None
self.y = None
self.label_name = label_name
self.folder_name = folder_name

# Validate dataset directory
for i in self.dataset_dirs:
if not Path(i).exists():
logger.error(f"Dataset {i} does not exist.")
data_log_i = Path(i) / data_log
if not data_log_i.exists():
logger.error(f"Data log file {data_log_i} does not exist.")
# Import data from the dataset directories if provided.
if dataset_dirs:
# No input datastreams given.
self.X = None
self.y = None

# Validate dataset directory
for i in self.dataset_dirs:
if not Path(i).exists():
logger.error(f"Dataset {i} does not exist.")
if not os.path.isdir(i):
logger.error(f"Path {i} does not point to a dataset directory.")
data_log_i = Path(i) / data_log
if not data_log_i.exists():
logger.warning(f"Data log file {data_log_i} does not exist; "
f"checking for preprocessed dataset...")
if not self._is_dataset_preprocessed(i):
logger.warning(f"Dataset {i} is invalid.")
sys.exit(1)
else:
logger.info(f"Dataset {i} initialized.")
# Directly import tuples of dataset and labels if given.
elif ((len(dataset) > 0 and len(labels) > 0) and (len(labels) == len(dataset))):
self.X = dataset
self.y = labels
else:
print(f"Cannot load dataset.")
sys.exit(1)

def _is_dataset_preprocessed(self, path: str):
"""
Check for the existence of X.npy, y.npy, features.csv, and results.csv files in the path provided.

Args:
path (str)

Returns:
preprocessed? (bool)
"""
required_files = ["X.npy", "y.npy", "features.csv", "results.csv"]
current_files = os.listdir(path)
if not set(required_files) == set(current_files):
return False
# TODO: Extract data here?
return True

def _is_new_dataset_valid(self, dataset_dir: str) -> bool:
"""
Confirm whether a dataset contains the required raw radar frames for processing.

Args:
dataset_dir (str): Relative path to dataset in question.

Returns:
valid (bool): Is the dataset valid for extracting radar frame data?
"""
required_file = "data-log.csv"
capture_files = [] # Keep track of the number of available radar captures.

# First check that all required files are in the base directory.
current_files = set(os.listdir(dataset_dir))
if not required_file in current_files:
return False

# Next look for a number of raw radar scans greater than zero.
dataset_path = Path(dataset_dir)
subdirs = [d for d in dataset_path.iterdir()
if d.is_dir() and not d.name.startswith('.')]
for i, folder in enumerate(subdirs):
capture_files.append(sorted(folder.glob("*.frames")))
if len(capture_files) == 0:
return False

return True

def _is_preprocessed_dataset_valid(self, dataset_dir: str) -> bool:
"""
Confirm whether a dataset contains the required preprocessed radar scans.

Args:
dataset_dir (str): Relative path to dataset in question.

Returns:
valid (bool): Is the dataset valid for use of preprocessed radar data?

Todo:
* Check the contents of the numpy files for validity.
"""
required_files = ["X.npy", "y.npy"]
current_files = set(os.listdir(dataset_dir))
if not set(required_files).issubset(current_files):
return False
return True

def extract_single_dataset(self, dataset_dir: Path) -> tuple[np.ndarray, np.ndarray]:
"""
Extracts the features (X) and labels (y) from a given directory.

Args:
dataset_dir (str): String of relative path to dataset source directory.

Returns:
frame_data (np.ndarray): Processed radar data (features) from one dataset source.
labels (np.ndarray): Corresponding labels (targets) from one dataset source.
"""
new_frame_data = []
new_labels = []

logging.info(f"Extracting data from {dataset_dir}.")

dataset_dir = Path(dataset_dir)
subdirs = [d for d in dataset_dir.iterdir()
if d.is_dir() and not d.name.startswith('.')]
data_log = dataset_dir / self.data_log

# Get the labels from the data log.
try:
df = pd.read_csv(data_log)
df[self.folder_name] = df[self.folder_name].astype(str)
df[self.label_name] = df[self.label_name].astype(float)
logger.info(f"Loaded data log with {len(df)} samples")
except Exception as e:
logger.warning(f"Expected CSV format: columns include '{self.folder_name}' and '{self.label_name}'; "
f"attempting to load preprocessed {self.folder_name} dataset...")
return [], []

# In each subdirectory.
for i, folder in enumerate(subdirs):

capture_files = sorted(folder.glob("*.frames"))

logger.info(f"Processing {len(capture_files)} files in {folder.name}")

if not capture_files:
logger.warning(f"No .frames files found in {folder.name}")
continue

# Find the row in df corresponding to this folder name
sample_row = df[df['Sample #'] == folder.name]
if sample_row.empty:
logger.error(f"No matching sample for folder {folder.name} in data log")
sys.exit(1)
else:
bulk_density = sample_row.iloc[0][self.label_name]

# Process each capture file
params = None
for capture_file in capture_files:
try:
frame_data, params = process_frames(folder, capture_file.name)

if frame_data is None:
logger.warning(f"Failed to process: {capture_file.name}")
continue

# Anomoly removal. Replaces values that deviate from the median by more
# than a threshold with the median. This has been done since the beginning
# of the project because of odd spikes in the raw DAC output that causes
# large deviations in the data.
median = np.median(frame_data, axis=1, keepdims=True)
mask = np.abs(frame_data - median) > THRESHOLD
frame_data_clean = frame_data.copy()
frame_data_clean[mask] = np.broadcast_to(median, frame_data.shape)[mask]

# DDC
ddc_frame_data = np.zeros_like(frame_data_clean, dtype=np.complex64)
for i in range(frame_data_clean.shape[1]):
ddc_frame_data[:, i] = novelda_digital_downconvert(frame_data_clean[:, i])

try:
new_frame_data.append(ddc_frame_data)
new_labels.append(bulk_density)
except:
logger.error(f"Failed to stack radar data from {capture_file.name}")
sys.exit(1)

# Outputs warning when problem occurs while processing, but continues processing other radar data.
except Exception as e:
logger.warning(f"Error processing {capture_file.name}: {e}")

# Save radar parameters
if params and len(capture_files) > 0:
params_file = folder / "radar_params.json"
with open(params_file, 'w') as f:
json.dump(params, f)
logger.info(f"Saved parameters: {params_file.name}")

return new_frame_data, new_labels

def load_preprocessed_dataset(self, dataset_dir: str) -> tuple:
"""
Load proprocessed datasets.
"""
print(dataset_dir)
X_path = Path(dataset_dir) / "X.npy"
y_path = Path(dataset_dir) / "y.npy"

def extract_data(self):
# Load dataset if it has already been processed into .npy files.
X = np.load(X_path)
y = np.load(y_path)

logger.info(f"Loaded from existing dataset: X={X.shape}, y={y.shape}")

return X.tolist(), y.tolist()

def load(self, new: bool) -> tuple:
"""
Loads and combines the specified datasets based on both existence of raw data and user specs.

Args:
new (bool) Load raw radar frames? If False, load .npy files if they exist.

Returns:
X, y (tuple[np.ndarray, np.ndarray])
"""
X = []
y = []

logger.info("Starting frame processing")

for dataset_dir in self.dataset_dirs:
# Load raw radar scans into new dataset here.
if new and self._is_new_dataset_valid(dataset_dir=dataset_dir):
X_new, y_new = self.extract_single_dataset(dataset_dir=dataset_dir)
# Try to load preprocessed dataset if raw scans unavailable.
elif self._is_preprocessed_dataset_valid(dataset_dir):
X_new, y_new = self.load_preprocessed_dataset(dataset_dir)
else:
logger.error(f"Neither existing radar scans nor valid preprocessed "
f"dataset were found for the following dataset:\r\n"
f"\t+ Target:\t\t{self.target_dir}\r\n"
f"\t+ Dataset dir:\t{dataset_dir}")
sys.exit(1)
# Append the new radar scans and labels to the broader dataset.
X += X_new
y += y_new

self.X = np.stack(X)
self.y = np.stack(y)

return self.X, self.y

def extract_data(self) -> tuple:
"""
Extracts the features (X) and labels (y) from the provided directries.

Expand All @@ -87,6 +344,7 @@ def extract_data(self):

# Iterate through each dataset dir
for i in self.dataset_dirs:
logging.info(f"Extracting data from {i}.")
dataset_dir = Path(i)
subdirs = [d for d in dataset_dir.iterdir()
if d.is_dir() and not d.name.startswith('.')]
Expand All @@ -99,8 +357,9 @@ def extract_data(self):
df[self.label_name] = df[self.label_name].astype(float)
logger.info(f"Loaded data log with {len(df)} samples")
except Exception as e:
logger.error(f"Expected CSV format: columns include '{self.folder_name}' and '{self.label_name}'")
sys.exit(1)
logger.warning(f"Expected CSV format: columns include '{self.folder_name}' and '{self.label_name}'; "
f"attempting to load preprocessed {self.folder_name} dataset...")
return [], []

# In each subdirectory
for i, folder in enumerate(subdirs):
Expand Down Expand Up @@ -185,13 +444,14 @@ def save_dataset(self):
logger.info(f"Raw dataset saved as X.npy and y.npy")
logger.info(f"Saved shapes: X={self.X.shape}, y={self.y.shape}")

def load_dataset(dataset_dir:str):
def load_dataset(dataset_dir: str, fl: FrameLoader):
"""
Loads data that has already been processed. Assumes the features are named X.npy and the
labels are named y.npy.

Args:
dataset_dir: Directory containing the capture file.
fl: FrameLoader object for given dataset.

Returns:
X (np.ndarray): Processed radar data (features).
Expand All @@ -202,11 +462,22 @@ def load_dataset(dataset_dir:str):
y_path = Path(dataset_dir) / "y.npy"

if not X_path.exists() or not y_path.exists():
logger.error("X.npy and/or y.npy not found in the dataset directory")
sys.exit(1)

X = np.load(X_path)
y = np.load(y_path)
logger.warning("X.npy and/or y.npy not found in the dataset directory; generating...")
X, y = fl.extract_data()
if len(X) == 0 or len(y) == 0:
logger.error("X.npy and/or y.npy could not be generated.")
sys.exit(1)
fl.save_dataset()
# If the dataset still does not exists, exit.
if not X_path.exists() or not y_path.exists():
logger.error("X.npy and/or y.npy could not be generated.")
sys.exit(1)
else:
# Load dataset if it has already been processed into .npy files.
print(X_path)
print(y_path)
X = np.load(X_path)
y = np.load(y_path)

logger.info(f"Loaded from existing dataset: X={X.shape}, y={y.shape}")

Expand Down Expand Up @@ -480,4 +751,4 @@ def novelda_digital_downconvert(raw_frame:np.ndarray):
# Baseband signal using convolution (provides downcoverted, filtered analytic signal)
baseband_signal = signal.convolve(mixed, window, mode='same')

return baseband_signal
return baseband_signal
Loading