Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the old SPA context which we no longer use #453

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 4 additions & 22 deletions src/murfey/client/analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from murfey.client.context import Context
from murfey.client.contexts.clem import CLEMContext
from murfey.client.contexts.spa import SPAContext, SPAModularContext
from murfey.client.contexts.spa import SPAModularContext
from murfey.client.contexts.spa_metadata import SPAMetadataContext
from murfey.client.contexts.tomo import TomographyContext
from murfey.client.instance_environment import MurfeyInstanceEnvironment
Expand Down Expand Up @@ -159,23 +159,7 @@ def _find_context(self, file_path: Path) -> bool:
if split_file_name[0].startswith("FoilHole"):
if not self._context:
logger.info("Acquisition software: EPU")
if self._environment:
try:
cfg = get_machine_config_client(
str(self._environment.url.geturl()),
instrument_name=self._environment.instrument_name,
demo=self._environment.demo,
)
except Exception as e:
logger.error(f"Exception encountered: {e}")
cfg = {}
else:
cfg = {}
self._context = (
SPAModularContext("epu", self._basepath)
if cfg.get("modular_spa")
else SPAContext("epu", self._basepath)
)
self._context = SPAModularContext("epu", self._basepath)
self.parameters_model = ProcessingParametersSPA
return True

Expand Down Expand Up @@ -325,8 +309,7 @@ def _analyse(self):
"form": dc_metadata,
"dependencies": (
spa_form_dependencies
if isinstance(self._context, SPAContext)
or isinstance(
if isinstance(
self._context, SPAModularContext
)
else {}
Expand Down Expand Up @@ -385,8 +368,7 @@ def _analyse(self):
"form": dc_metadata,
"dependencies": (
spa_form_dependencies
if isinstance(self._context, SPAContext)
or isinstance(
if isinstance(
self._context, SPAModularContext
)
else {}
Expand Down
106 changes: 1 addition & 105 deletions src/murfey/client/contexts/spa.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def _get_xml_list_index(key: str, xml_list: list) -> int:
raise ValueError(f"Key not found in XML list: {key}")


class _SPAContext(Context):
class SPAModularContext(Context):
user_params = [
ProcessingParameter(
"dose_per_frame",
Expand Down Expand Up @@ -326,7 +326,6 @@ def __init__(self, acquisition_software: str, basepath: Path):
super().__init__("SPA", acquisition_software)
self._basepath = basepath
self._processing_job_stash: dict = {}
self._preprocessing_triggers: dict = {}
self._foil_holes: Dict[int, List[int]] = {}

def gather_metadata(
Expand Down Expand Up @@ -558,8 +557,6 @@ def gather_metadata(
) or True
return metadata


class SPAModularContext(_SPAContext):
def _position_analysis(
self,
transferred_file: Path,
Expand Down Expand Up @@ -856,104 +853,3 @@ def _launch_spa_pipeline(
url: str = "",
):
return


class SPAContext(_SPAContext):
def _register_data_collection(
self,
tag: str,
url: str,
data: dict,
environment: MurfeyInstanceEnvironment,
):
logger.info(f"registering data collection with data {data}")
environment.id_tag_registry["data_collection"].append(tag)
image_directory = str(environment.default_destinations[Path(tag)])
json = {
"voltage": data["voltage"],
"pixel_size_on_image": data["pixel_size_on_image"],
"experiment_type": data["experiment_type"],
"image_size_x": data["image_size_x"],
"image_size_y": data["image_size_y"],
"file_extension": data["file_extension"],
"acquisition_software": data["acquisition_software"],
"image_directory": image_directory,
"tag": tag,
"source": tag,
"magnification": data["magnification"],
"total_exposed_dose": data.get("total_exposed_dose"),
"c2aperture": data.get("c2aperture"),
"exposure_time": data.get("exposure_time"),
"slit_width": data.get("slit_width"),
"phase_plate": data.get("phase_plate", False),
}
capture_post(url, json=json)

def post_transfer(
self,
transferred_file: Path,
environment: MurfeyInstanceEnvironment | None = None,
**kwargs,
) -> bool:
return True

def _register_processing_job(
self,
tag: str,
environment: MurfeyInstanceEnvironment,
parameters: Dict[str, Any] | None = None,
):
logger.info(f"registering processing job with parameters: {parameters}")
parameters = parameters or {}
environment.id_tag_registry["processing_job"].append(tag)
proc_url = f"{str(environment.url.geturl())}/visits/{environment.visit}/{environment.murfey_session}/register_processing_job"
machine_config = get_machine_config_client(
str(environment.url.geturl()),
instrument_name=environment.instrument_name,
demo=environment.demo,
)
image_directory = str(
Path(machine_config.get("rsync_basepath", "."))
/ environment.default_destinations[Path(tag)]
)
if self._acquisition_software == "epu":
import_images = f"{Path(image_directory).resolve()}/GridSquare*/Data/*{parameters['file_extension']}"
else:
import_images = (
f"{Path(image_directory).resolve()}/*{parameters['file_extension']}"
)
msg: Dict[str, Any] = {
"tag": tag,
"source": tag,
"recipe": "ispyb-relion",
"parameters": {
"acquisition_software": parameters["acquisition_software"],
"voltage": parameters["voltage"],
"gain_ref": parameters["gain_ref"],
"dose_per_frame": parameters["dose_per_frame"],
"eer_grouping": parameters["eer_fractionation"],
"import_images": import_images,
"angpix": float(parameters["pixel_size_on_image"]) * 1e10,
"symmetry": parameters["symmetry"],
"boxsize": parameters["boxsize"],
"downscale": parameters["downscale"],
"small_boxsize": parameters["small_boxsize"],
"mask_diameter": parameters["mask_diameter"],
"use_cryolo": parameters["use_cryolo"],
"estimate_particle_diameter": parameters["estimate_particle_diameter"],
},
}
if parameters["particle_diameter"]:
msg["parameters"]["particle_diameter"] = parameters["particle_diameter"]
capture_post(proc_url, json=msg)

def _launch_spa_pipeline(
self,
tag: str,
jobid: int,
environment: MurfeyInstanceEnvironment,
url: str = "",
):
environment.id_tag_registry["auto_proc_program"].append(tag)
data = {"job_id": jobid}
capture_post(url, json=data)
91 changes: 0 additions & 91 deletions src/murfey/client/contexts/tomo.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import requests
import xmltodict
from pydantic import BaseModel

import murfey.util.eer
from murfey.client.context import Context, ProcessingParameter
Expand All @@ -17,7 +16,6 @@
MovieTracker,
MurfeyID,
MurfeyInstanceEnvironment,
global_env_lock,
)
from murfey.util import authorised_requests, capture_post, get_machine_config_client
from murfey.util.mdoc import get_block, get_global_data, get_num_blocks
Expand Down Expand Up @@ -65,15 +63,6 @@ def _construct_tilt_series_name(file_path: Path) -> str:
return "_".join(split_name[:-5])


class ProcessFileIncomplete(BaseModel):
dest: Path
source: Path
image_number: int
mc_uuid: int
tag: str
description: str = ""


class TomographyContext(Context):
user_params = [
ProcessingParameter(
Expand Down Expand Up @@ -101,7 +90,6 @@ def __init__(self, acquisition_software: str, basepath: Path):
self._aligned_tilt_series: List[str] = []
self._data_collection_stash: list = []
self._processing_job_stash: dict = {}
self._preprocessing_triggers: dict = {}
self._lock: RLock = RLock()

def _flush_data_collections(self):
Expand All @@ -120,12 +108,6 @@ def _flush_data_collections(self):
capture_post(dc_data[0], json=data)
self._data_collection_stash = []

def _flush_processing_job(self, tag: str):
if proc_data := self._processing_job_stash.get(tag):
for pd in proc_data:
requests.post(pd[0], json=pd[1])
self._processing_job_stash.pop(tag)

def _flush_processing_jobs(self):
logger.info(
f"Flushing {len(self._processing_job_stash.keys())} processing job API calls"
Expand All @@ -135,75 +117,6 @@ def _flush_processing_jobs(self):
requests.post(pd[0], json=pd[1])
self._processing_job_stash = {}

def _flush_preprocess(self, tag: str, app_id: int):
if tag_tr := self._preprocessing_triggers.get(tag):
for tr in tag_tr:
process_file = self._complete_process_file(tr[1], tr[2], app_id)
if process_file:
capture_post(tr[0], json=process_file)
self._preprocessing_triggers.pop(tag)

def _complete_process_file(
self,
incomplete_process_file: ProcessFileIncomplete,
environment: MurfeyInstanceEnvironment,
app_id: int,
) -> dict:
try:
with global_env_lock:
tag = incomplete_process_file.tag

eer_fractionation_file = None
if environment.data_collection_parameters.get("num_eer_frames"):
response = requests.post(
f"{str(environment.url.geturl())}/visits/{environment.visit}/{environment.murfey_session}/eer_fractionation_file",
json={
"num_frames": environment.data_collection_parameters[
"num_eer_frames"
],
"fractionation": environment.data_collection_parameters[
"eer_fractionation"
],
"dose_per_frame": environment.data_collection_parameters[
"dose_per_frame"
],
"fractionation_file_name": "eer_fractionation_tomo.txt",
},
)
eer_fractionation_file = response.json()["eer_fractionation_file"]

new_dict = {
"path": str(incomplete_process_file.dest),
"description": incomplete_process_file.description,
"size": incomplete_process_file.source.stat().st_size,
"timestamp": incomplete_process_file.source.stat().st_ctime,
"processing_job": environment.processing_job_ids[tag][
"em-tomo-preprocess"
],
"data_collection_id": environment.data_collection_ids[tag],
"image_number": incomplete_process_file.image_number,
"pixel_size": environment.data_collection_parameters[
"pixel_size_on_image"
],
"autoproc_program_id": app_id,
"mc_uuid": incomplete_process_file.mc_uuid,
"dose_per_frame": environment.data_collection_parameters.get(
"dose_per_frame"
),
"mc_binning": environment.data_collection_parameters.get(
"motion_corr_binning", 1
),
"gain_ref": environment.data_collection_parameters.get("gain_ref"),
"voltage": environment.data_collection_parameters.get(
"voltage", 300
),
"eer_fractionation_file": eer_fractionation_file,
}
return new_dict
except KeyError:
logger.warning("Key error encountered in _complete_process_file")
return {}

def _file_transferred_to(
self, environment: MurfeyInstanceEnvironment, source: Path, file_path: Path
):
Expand Down Expand Up @@ -441,14 +354,10 @@ def _add_tilt(
preproc_data = {
"path": str(file_transferred_to),
"description": "",
"data_collection_id": environment.data_collection_ids.get(tilt_series),
"image_number": environment.movies[file_transferred_to].movie_number,
"pixel_size": environment.data_collection_parameters.get(
"pixel_size_on_image", 0
),
"autoproc_program_id": environment.autoproc_program_ids.get(
tilt_series, {}
).get("em-tomo-preprocess"),
"dose_per_frame": environment.data_collection_parameters.get(
"dose_per_frame", 0
),
Expand Down
Loading
Loading