Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate eicm.preprocessing.cellvoyager with hcs.cellvoyager #202

Open
imagejan opened this issue Dec 19, 2024 · 0 comments
Open

Consolidate eicm.preprocessing.cellvoyager with hcs.cellvoyager #202

imagejan opened this issue Dec 19, 2024 · 0 comments

Comments

@imagejan
Copy link
Member

Currently, we have:

For metadata

hcs

def _parse_metadata(self) -> pd.DataFrame:
mrf_file = join(self._acquisition_dir, "MeasurementDetail.mrf")
if not exists(mrf_file):
msg = f"MeasurementDetail.mrf not found in: {self._acquisition_dir}"
raise ValueError(msg)
mrf_tree = parse(mrf_file)
mrf_root = mrf_tree.getroot()
channels = []
for channel in mrf_root.findall(BTS_NS + "MeasurementChannel"):
row = {
key.replace(BTS_NS, ""): value for key, value in channel.attrib.items()
}
channels.append(row)
mes_file = join(
self._acquisition_dir,
mrf_root.attrib[BTS_NS + "MeasurementSettingFileName"],
)
if not exists(mes_file):
msg = f"Settings file not found: {mes_file}"
raise ValueError(msg)
mes_tree = parse(mes_file)
mes_root = mes_tree.getroot()
channel_settings = []
for channel in mes_root.find(BTS_NS + "ChannelList").findall(
BTS_NS + "Channel"
):
row = {
key.replace(BTS_NS, ""): value for key, value in channel.attrib.items()
}
channel_settings.append(row)
return pd.merge(
pd.DataFrame(channels),
pd.DataFrame(channel_settings),
left_on="Ch",
right_on="Ch",
)

and

def get_channel_metadata(self) -> dict[int, ChannelMetadata]:
metadata = self._parse_metadata()
ch_metadata = {}
for _i, row in metadata.iterrows():
index = int(row["Ch"]) - 1
ch_metadata[index] = ChannelMetadata(
channel_index=index,
channel_name=row["Ch"],
display_color=row["Color"],
spatial_calibration_x=row["HorizontalPixelDimension"],
spatial_calibration_y=row["VerticalPixelDimension"],
spatial_calibration_units="um",
z_spacing=self.get_z_spacing(),
wavelength=self.__parse_filter_wavelength(row["Acquisition"]),
exposure_time=row["ExposureTime"],
exposure_time_unit="ms",
objective=row["Objective"],
)
assert min(ch_metadata.keys()) == 0, "Channel indices must start at 0."
return ch_metadata

eicm

def get_metadata(input_dir: str):
"""
Extract acquisition date, pixel-size information, acquisition camera
index, and channel information (laser, filter and objective) from a
Yokogawa CV7000 or CV8000 acquisition. The information is extracted from
mse- and mrf-files written by Yokogawa.
:param input_dir: location of the Yokogawa acquisition
:return: acquisition_date, pixel_size, pixel_size_unit, channel_information
"""
mrf_file = join(input_dir, "MeasurementDetail.mrf")
mrf_tree = ET.parse(mrf_file)
mrf_root = mrf_tree.getroot()
mrf_ns = mrf_root.tag.replace("MeasurementDetail", "")
date_format_str = mrf_root.attrib[mrf_ns + "BeginTime"]
date = parser.parse(date_format_str)
acquisition_date_str = date.strftime("%Y-%m-%d")
channels = {}
for child in mrf_root:
ch = child.get(mrf_ns + "Ch")
if ch is not None:
channels[ch] = {
"pixel_size": child.get(mrf_ns + "HorizontalPixelDimension"),
"cam_index": child.get(mrf_ns + "CameraNumber"),
}
pixel_size = float(mrf_root[1].attrib.get(mrf_ns + "HorizontalPixelDimension"))
pixel_size_unit = "micron"
mes_file = glob(join(input_dir, "*.mes"))[0]
mes_tree = ET.parse(mes_file)
mes_root = mes_tree.getroot()
mes_ns = mes_root.tag.replace("MeasurementSetting", "")
for child in mes_root[2]:
ch = child.get(mes_ns + "Ch")
if ch is not None:
channel_dict = channels[ch]
channel_dict["objective"] = child.get(mes_ns + "Objective").replace(
" ", "-"
)
channel_dict["filter"] = child.get(mes_ns + "Acquisition").replace("/", "-")
channel_dict["laser"] = child[0].text
return acquisition_date_str, pixel_size, pixel_size_unit, channels

For acquisition files

hcs

def _parse_files(self) -> pd.DataFrame:
mlf_file = join(self._acquisition_dir, "MeasurementData.mlf")
if not exists(mlf_file):
msg = f"MeasurementData.mlf not found in: {self._acquisition_dir}"
raise ValueError(msg)
mlf_tree = parse(mlf_file)
mlf_root = mlf_tree.getroot()
files = []
for record in mlf_root.findall(BTS_NS + "MeasurementRecord"):
row = {
key.replace(BTS_NS, ""): value for key, value in record.attrib.items()
}
if row.pop("Type") == "IMG":
row |= {
"path": join(self._acquisition_dir, record.text),
"well": chr(ord("@") + int(row.pop("Row")))
+ row.pop("Column").zfill(2),
}
files.append(row)
record.clear()
files = pd.DataFrame(files)
files["TimePoint"] = files["TimePoint"].astype(int)
files["ZIndex"] = files["ZIndex"].astype(int)
return files

eicm

def create_table(files: List[str], plate_name: str) -> pd.DataFrame:
"""
Create table of file-names with columns for plate, well, time-point,
field, L, action, Z and channel.
:param files: image file list
:param plate_name: Name of the plate
:return: table
"""
plate = []
well = []
timepoint = []
field = []
lines = []
action = []
z = []
channel = []
path = []
for file in files:
p, w, t, f, line, a, z_, c = parse_filename(file, plate_name)
plate.append(p)
well.append(w)
timepoint.append(t)
field.append(f)
lines.append(line)
action.append(a)
z.append(z_)
channel.append(c)
path.append(file)
return pd.DataFrame(
{
"plate": plate,
"well": well,
"timepoint": timepoint,
"field": field,
"line": lines,
"action": action,
"z": z,
"channel": channel,
"path": path,
}
)


We should try to consolidate those two code bases (and maybe even use the metadata model from https://github.com/tlambert03/ome-types to standardize the way we handle metadata, see #198):

  • factor out into common public utility methods (e.g. in faim_ipa.hcs.cellvoyager.utils)
  • use those public methods from within the StackAcquisition class
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant