diff --git a/Modaliteter/FTV/append_exposure_params_to_io_calibration_series.py b/Modaliteter/FTV/append_exposure_params_to_io_calibration_series.py new file mode 100644 index 0000000..b619a2d --- /dev/null +++ b/Modaliteter/FTV/append_exposure_params_to_io_calibration_series.py @@ -0,0 +1,181 @@ +import datetime as dt +import sys +from pathlib import Path +from typing import List + +path_to_repo = Path(__file__).parents[2] +sys.path.append(str(path_to_repo)) + + +import shutil +from pathlib import Path +from tqdm import tqdm +import matplotlib.pyplot as plt +import numpy as np +import pydicom +from analys_private_sprid_ej import create_calibration_plot + +from utils.plot_utils import export_flatfield_image +from utils.intraoral_utils import _fetch_latest_dose_measurement + + +def _append_exposure_params_to_io_calibration_series( + path_clinics_raw: Path, + path_clinics_parsed: Path, + exp_times: List[int], + ma: int, + run_tanqa_offline=False, + export_flatfield_images=True, + append_dose_measurements=True, +) -> None: + dose_dict = dict() + # select clinic from user input + clinics_list = [] + for item in path_clinics_raw.iterdir(): + if item.name in [ + "dosmätningar", + "raw_folder.txt", + "klinik_namn", + ]: + continue + clinics_list.append(item) + + print("\nselect clinic by index:\n") + for i in range(len(clinics_list)): + print(i, clinics_list[i].name) + + selected_clinic_index = input("\n") + selected_clinic = clinics_list[int(selected_clinic_index)] + # select measurement lab from user input + labs_list = [] + for item in selected_clinic.iterdir(): + labs_list.append(item) + + print("\nselect lab by index:\n") + for i in range(len(labs_list)): + print(i, labs_list[i].name) + + selected_lab_index = input("\n") + selected_lab = labs_list[int(selected_lab_index)] + + # get list of dose measurements + if append_dose_measurements: + dose_measurements = [] + for dose_measurement in (path_clinics_raw / "dosmätningar").iterdir(): + if "Date" in dose_measurement.name: + continue + dose_measurements.append(dose_measurement) + + # copy dosemeasurements to parsed folder + for src in (path_clinics_raw / "dosmätningar").iterdir(): + dest = path_clinics_parsed / "dosmätningar" / src.name + shutil.copyfile(src, dest) + + print(f"parsing clinic: {selected_clinic.name}") + print(f"parsing lab: {selected_lab.name}") + + for x_ray_tube in selected_lab.iterdir(): + print(f"parsing x-ray tube: {x_ray_tube.name}") + + calib_path = x_ray_tube / "Kalibrering" + + sensor_id_list = [] + for sensor in calib_path.iterdir(): + if str(sensor.name) in ["sensorid_mall"]: + continue + + sensor_id_list.append(str(sensor.name)) + print(f"parsing sensor: {sensor.name}") + + for kv in sensor.iterdir(): + if str(kv.name) in "readme.txt": + continue + + path_res_folder = path_clinics_parsed / f"{selected_clinic.name}_parsed" + sub_path = Path(*kv.parts[7:]) + + dcm_res_path = path_res_folder / sub_path + dcm_res_path.mkdir( + parents=True, + exist_ok=True, + ) + + if export_flatfield_images: + plots_path = ( + path_res_folder / selected_lab / x_ray_tube.name / "Kalibrering" / sensor.name / kv.name + ) + plots_path.mkdir(exist_ok=True) + + dcm_files = [] # for DICOM files + acq_times = [] # for acquisition times (for sorting) + + for dcm_file in tqdm(kv.iterdir()): + if ".dcm" in str(dcm_file.name): + dcm_files.append(pydicom.dcmread(dcm_file)) + + for i in range(len(dcm_files)): + acq_times.append(int(dcm_files[i].AcquisitionTime)) + + sort_order = np.argsort(acq_times) + + for i in range(len(dcm_files)): + folder_kilovoltage = int(kv.name[:2]) + dcm_files[sort_order[i]].ExposureTime = exp_times[i] + dcm_files[sort_order[i]].KVP = folder_kilovoltage + dcm_files[sort_order[i]].XRayTubeCurrent = ma + + if append_dose_measurements: + dose = _fetch_latest_dose_measurement( + dcm_file=dcm_files[sort_order[i]], dose_measurements=dose_measurements, lab=selected_lab + ) + + dcm_files[sort_order[i]].EntranceDoseInmGy = dose[kv.name.lower()][i] + + parsed_file_name = f"{kv.name}_{ma}ma_{exp_times[i]}ms.dcm" + + dcm_files[sort_order[i]].save_as(dcm_res_path / parsed_file_name) + + if export_flatfield_images: + export_flatfield_image( + file=dcm_files[sort_order[i]], file_name=parsed_file_name, dcm_res_path=dcm_res_path + ) + + if run_tanqa_offline: + create_calibration_plot( + main_folder=path_clinics_parsed, + output_dir=path_res_folder.parent / "plots" / path_res_folder.name, + sensor_ids=sensor_id_list, + ) + + print("parsing completed") + + +path_clinics_raw = Path(r"V:\Enhetsytor\5-1-1-3. Strålningsfysik\Radiologi\FTV\Nya sensorer 2022 raw") + + +path_clinics_parsed = ( + Path("C:/") + / "Users" + / "maxh01" + / "OneDrive - Region Västerbotten" + / "Strålningsfysik - Dokument" + / "Radiologi" + / "Modaliteter" + / "FTV" + / "Nya sensorer 2022" + / "Nya sensorer 2022 parsed" +) + +exp_times = [20, 25, 32, 40, 50, 63, 80, 100, 125, 160, 200, 250] +ma = 7 + + +_append_exposure_params_to_io_calibration_series( + path_clinics_raw=path_clinics_raw, + path_clinics_parsed=path_clinics_parsed, + exp_times=exp_times, + ma=ma, + run_tanqa_offline=False, + export_flatfield_images=True, + append_dose_measurements=True, +) diff --git a/Modaliteter/FTV/parse_and_analyze_new_sensors.py b/Modaliteter/FTV/parse_and_analyze_new_sensors.py deleted file mode 100644 index f4dd29e..0000000 --- a/Modaliteter/FTV/parse_and_analyze_new_sensors.py +++ /dev/null @@ -1,229 +0,0 @@ -import datetime as dt -import sys -from pathlib import Path -from typing import List - -path_to_repo = Path(__file__).parents[2] -sys.path.append(str(path_to_repo)) - - -import shutil -from pathlib import Path - -import matplotlib.pyplot as plt -import numpy as np -import pandas as pd -import pydicom -from analys_private_sprid_ej import create_calibration_plot - -from utils.dit_utils import _calculate_image_lims_exclude_cornerbox -from utils.plot_utils import _create_colorbar - - -def export_flatfield_image(file, file_name, dcm_res_path): - a = 1 - mat = file.pixel_array - cmap = plt.cm.binary - lims = _calculate_image_lims_exclude_cornerbox(mat) - - fig, ax = plt.subplots(figsize=(20, 15)) - im = ax.imshow(mat, cmap=cmap, vmin=lims[0], vmax=lims[1], interpolation=None) - ax.set_xlabel(file.DetectorID) - - _create_colorbar(fig, im, ax, width="1%", bbox_to_anchor=(0.05, 0, 1, 1)) - path_export = dcm_res_path.parent / "flat_field_plots" - path_export.mkdir(exist_ok=True) - plt.savefig(path_export / (file_name.split(".")[0] + ".png"), dpi=500) - plt.close() - pass - - -def _fetch_dose_from_measurements(dcm_file, dose_measurements, lab): - date_raw = dcm_file.AcquisitionDate - date_sensor_exposure = dt.datetime(year=int(date_raw[:4]), month=int(date_raw[4:6]), day=int(date_raw[6:8])) - - print(f"date of sensor exposure: {date_sensor_exposure}") - - dose_dose_measurements_in_lab = [] - date_dose_measurements_in_lab = [] - # fetch list of dose measurements from the same lab - for item in dose_measurements: - if lab.name.replace(" ", "")[3:] in item.name: - dose_dose_measurements_in_lab.append(item) - - # create datetimes for those measurements - for item in dose_dose_measurements_in_lab: - date_raw = item.name.split("_")[2].replace(".xlsx", "") - - date = dt.datetime(year=int(date_raw[:4]), month=int(date_raw[4:6]), day=int(date_raw[6:8])) - date_dose_measurements_in_lab.append(date) - - # time dt in between sensor esposure and measurement date - delta_t = [date_sensor_exposure - date_dose_measurement for date_dose_measurement in date_dose_measurements_in_lab] - delta_t_int = [time.days for time in delta_t] - # closest measurement dt - min_pos_dt = min([i for i in delta_t_int if i >= 0]) - # index to that measurement date - dose_index = delta_t_int.index(min_pos_dt) - - dose_dict = dict() - for kv in ["60kv", "70kv"]: - dose_dict[kv] = pd.read_excel( - dose_measurements[dose_index], - kv, - )["dose_mGy"] - - print(f"appending: {dose_dose_measurements_in_lab[dose_index].name}") - - return dose_dict - - -def parse_onepix_data_for_new_clinic( - path_clinics_raw: Path, - path_clinics_parsed: Path, - exp_times: List[int], - ma: int, -) -> None: - dose_dict = dict() - # select clinic from user input - clinics_list = [] - for item in path_clinics_raw.iterdir(): - if item.name in [ - "dosmätningar", - "raw_folder.txt", - "klinik_namn", - ]: - continue - clinics_list.append(item) - - print("\nselect clinic by index:\n") - for i in range(len(clinics_list)): - print(i, clinics_list[i].name) - - selected_clinic_index = input("\n") - selected_clinic = clinics_list[int(selected_clinic_index)] - # select measurement lab from user input - labs_list = [] - for item in selected_clinic.iterdir(): - labs_list.append(item) - - print("\nselect lab by index:\n") - for i in range(len(labs_list)): - print(i, labs_list[i].name) - - selected_lab_index = input("\n") - selected_lab = labs_list[int(selected_lab_index)] - - # get list of dose measurements - dose_measurements = [] - for dose_measurement in (path_clinics_raw / "dosmätningar").iterdir(): - if "Date" in dose_measurement.name: - continue - dose_measurements.append(dose_measurement) - - # copy dosemeasurements to parsed folder - for src in (path_clinics_raw / "dosmätningar").iterdir(): - dest = path_clinics_parsed / "dosmätningar" / src.name - shutil.copyfile(src, dest) - - print(f"parsing clinic: {selected_clinic.name}") - print(f"parsing lab: {selected_lab.name}") - - for x_ray_tube in selected_lab.iterdir(): - print(f"parsing x-ray tube: {x_ray_tube.name}") - - calib_path = x_ray_tube / "Kalibrering" - - sensor_id_list = [] - for sensor in calib_path.iterdir(): - if str(sensor.name) in ["sensorid_mall"]: - continue - - sensor_id_list.append(str(sensor.name)) - print(f"parsing sensor: {sensor.name}") - - for kv in sensor.iterdir(): - if str(kv.name) in "readme.txt": - continue - - path_res_folder = path_clinics_parsed / f"{selected_clinic.name}_parsed" - sub_path = Path(*kv.parts[7:]) - - dcm_res_path = path_res_folder / sub_path - dcm_res_path.mkdir( - parents=True, - exist_ok=True, - ) - - plots_path = path_res_folder / selected_lab / x_ray_tube.name / 'Kalibrering' / sensor.name / kv.name - plots_path.mkdir(exist_ok=True) - dcm_files = [] # for DICOM files - acq_times = [] # for acquisition times (for sorting) - - for dcm_file in kv.iterdir(): - if ".dcm" in str(dcm_file.name): - dcm_files.append(pydicom.dcmread(dcm_file)) - - for i in range(len(dcm_files)): - acq_times.append(int(dcm_files[i].AcquisitionTime)) - - sort_order = np.argsort(acq_times) - - for i in range(len(dcm_files)): - folder_kilovoltage = int(kv.name[:2]) - dcm_files[sort_order[i]].ExposureTime = exp_times[i] - dcm_files[sort_order[i]].KVP = folder_kilovoltage - dcm_files[sort_order[i]].XRayTubeCurrent = ma - - dose = _fetch_dose_from_measurements( - dcm_file=dcm_files[sort_order[i]], dose_measurements=dose_measurements, lab=selected_lab - ) - - dcm_files[sort_order[i]].EntranceDoseInmGy = dose[kv.name.lower()][i] - - parsed_file_name = f"{kv.name}_{ma}ma_{exp_times[i]}ms.dcm" - - dcm_files[sort_order[i]].save_as(dcm_res_path / parsed_file_name) - a = 1 - export_flatfield_image( - file=dcm_files[sort_order[i]], file_name=parsed_file_name, dcm_res_path=dcm_res_path - ) - - create_calibration_plot( - main_folder=path_clinics_parsed, - output_dir=path_res_folder.parent / 'plots' / path_res_folder.name, - sensor_ids=sensor_id_list, - ) - - print("parsing completed") - - -path_clinics_raw = ( - Path(r'V:\Enhetsytor\5-1-1-3. Strålningsfysik\Radiologi\FTV\Nya sensorer 2022 raw') -) - -path_clinics_parsed = ( - Path("C:/") - / "Users" - / "maxh01" - / "Region Västerbotten" - / "Strålningsfysik - Dokument" - / "Radiologi" - / "Modaliteter" - / "FTV" - / "Nya sensorer 2022" - / "Nya sensorer 2022 parsed" -) - -exp_times = [20, 25, 32, 40, 50, 63, 80, 100, 125, 160, 200, 250] -ma = 7 - - -parse_onepix_data_for_new_clinic( - path_clinics_raw=path_clinics_raw, - path_clinics_parsed=path_clinics_parsed, - exp_times=exp_times, - ma=ma, -) - - diff --git a/requirements.txt b/requirements.txt index 75c15be..2793edf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,5 @@ pytest rich hvplot bokeh -openpyxl \ No newline at end of file +openpyxl +tqdm diff --git a/utils/intraoral_utils.py b/utils/intraoral_utils.py new file mode 100644 index 0000000..91ca1f5 --- /dev/null +++ b/utils/intraoral_utils.py @@ -0,0 +1,42 @@ +import datetime as dt +import pandas as pd + + +def _fetch_latest_dose_measurement(dcm_file, dose_measurements, lab): + date_raw = dcm_file.AcquisitionDate + date_sensor_exposure = dt.datetime(year=int(date_raw[:4]), month=int(date_raw[4:6]), day=int(date_raw[6:8])) + + print(f"date of sensor exposure: {date_sensor_exposure}") + + dose_dose_measurements_in_lab = [] + date_dose_measurements_in_lab = [] + # fetch list of dose measurements from the same lab + for item in dose_measurements: + if lab.name.replace(" ", "")[3:] in item.name: + dose_dose_measurements_in_lab.append(item) + + # create datetimes for those measurements + for item in dose_dose_measurements_in_lab: + date_raw = item.name.split("_")[2].replace(".xlsx", "") + + date = dt.datetime(year=int(date_raw[:4]), month=int(date_raw[4:6]), day=int(date_raw[6:8])) + date_dose_measurements_in_lab.append(date) + + # time dt in between sensor esposure and measurement date + delta_t = [date_sensor_exposure - date_dose_measurement for date_dose_measurement in date_dose_measurements_in_lab] + delta_t_int = [time.days for time in delta_t] + # closest measurement dt + min_pos_dt = min([i for i in delta_t_int if i >= 0]) + # index to that measurement date + dose_index = delta_t_int.index(min_pos_dt) + + dose_dict = dict() + for kv in ["60kv", "70kv"]: + dose_dict[kv] = pd.read_excel( + dose_measurements[dose_index], + kv, + )["dose_mGy"] + + print(f"appending: {dose_dose_measurements_in_lab[dose_index].name}") + + return dose_dict diff --git a/utils/plot_utils.py b/utils/plot_utils.py index 42836e8..9663977 100644 --- a/utils/plot_utils.py +++ b/utils/plot_utils.py @@ -2,6 +2,72 @@ import matplotlib.pyplot as plt import numpy as np from mpl_toolkits.axes_grid1.inset_locator import inset_axes +from utils.dit_utils import _calculate_image_lims_exclude_cornerbox +from pathlib import Path +import matplotlib.pyplot as plt + + +def export_flatfield_image( + file, + file_name, + dcm_res_path, + cmap="binary", + figsize=(20, 15), + interpolation=None, + use_title=False, + lims_fn=_calculate_image_lims_exclude_cornerbox, + add_colorbar=True, + colorbar_width="1%", + colorbar_bbox_to_anchor=(0.05, 0, 1, 1), + out_subdir="flat_field_plots", + out_ext=".png", + dpi=500, + bbox_inches="tight", + pad_inches=0.05, + ensure_dir=True, +): + mat = file.pixel_array + lims = lims_fn(mat) + + fig, ax = plt.subplots(figsize=figsize) + im = ax.imshow( + mat, + cmap=cmap, + vmin=lims[0], + vmax=lims[1], + interpolation=interpolation, + ) + + label = getattr(file, "DetectorID", None) + if label is not None: + if use_title: + ax.set_title(label) + else: + ax.set_xlabel(label) + + if add_colorbar: + _create_colorbar( + fig, + im, + ax, + width=colorbar_width, + bbox_to_anchor=colorbar_bbox_to_anchor, + ) + + path_export = Path(dcm_res_path).parent / out_subdir + if ensure_dir: + path_export.mkdir(parents=True, exist_ok=True) + + out_path = path_export / f"{Path(file_name).stem}{out_ext}" + fig.savefig( + out_path, + dpi=dpi, + bbox_inches=bbox_inches, + pad_inches=pad_inches, + ) + plt.close(fig) + + return out_path def _create_colorbar( @@ -115,7 +181,7 @@ def _create_subplot(ax, mat, lims, cmap, xlabel, ylabel, title): def _set_ax_shape(ax, shape, white=True): - return ax.imshow(np.ones(shape), cmap=plt.cm.gray if white else plt.cm.binary, vmin=0, vmax=1) + return ax.imshow(np.ones(shape), cmap="gray" if white else "binary", vmin=0, vmax=1) def _plotdpi(dpi=124):