From 7677757d7f68dcf57579f74733b15890505dc7ca Mon Sep 17 00:00:00 2001 From: Justin Goodrich Date: Mon, 29 Jul 2024 23:13:51 -0400 Subject: [PATCH] Miscallenous updates to tpx3awkward. Various clean-ups, optimized handling of KDTree neighbor list, updated conversion functions, and removed file-finding logic specific to CHX's current ophyd object. --- tpx3awkward/_utils.py | 611 ++++++++--------------------- tpx3awkward/examples/example.ipynb | 351 +++++++++++++++++ 2 files changed, 508 insertions(+), 454 deletions(-) create mode 100644 tpx3awkward/examples/example.ipynb diff --git a/tpx3awkward/_utils.py b/tpx3awkward/_utils.py index 8e656f0..7f6498a 100644 --- a/tpx3awkward/_utils.py +++ b/tpx3awkward/_utils.py @@ -1,19 +1,15 @@ import os -import numpy as np from pathlib import Path +from typing import TypeVar, Union, Dict, List, Tuple +import numpy as np from numpy.typing import NDArray -from typing import TypeVar, Union, Dict, Set, List, Iterable import numba import pandas as pd from scipy.spatial import KDTree import multiprocessing from tqdm import tqdm import warnings -import gc -try: - from pyCHX.chx_packages import db, get_sid_filenames -except ImportError: - warnings.warn("Could not import pyCHX package. Proceeding without it...") + IA = NDArray[np.uint64] UnSigned = TypeVar("UnSigned", IA, np.uint64) @@ -29,28 +25,26 @@ def raw_as_numpy(fpath: Union[str, Path]) -> IA: ---------- """ - return np.fromfile(fpath, dtype=" UnSigned: - return v >> np.uint64(shift) & np.uint64(2**width - 1) -@numba.jit(nopython=True) +@numba.jit(nopython=True, cache=True) def matches_nibble(data, nibble) -> numba.boolean: return (int(data) >> 60) == nibble -@numba.jit(nopython=True) +@numba.jit(nopython=True, cache=True) def is_packet_header(v: UnSigned) -> UnSigned: """Identify packet headers by magic number (TPX3 as ascii on lowest 8 bytes]""" return get_block(v, 32, 0) == 861425748 -@numba.jit(nopython=True) +@numba.jit(nopython=True, cache=True) def classify_array(data: IA) -> NDArray[np.uint8]: """ Create an array the same size as the data classifying 64bit uint by type. @@ -78,7 +72,7 @@ def classify_array(data: IA) -> NDArray[np.uint8]: return output -@numba.jit(nopython=True) +@numba.jit(nopython=True, cache=True) def _shift_xy(chip, row, col): # TODO sort out if this needs to be paremeterized out = np.zeros(2, "u4") @@ -100,7 +94,7 @@ def _shift_xy(chip, row, col): return out -@numba.jit(nopython=True) +@numba.jit(nopython=True, cache=True) def decode_xy(msg, chip): # these names and math are adapted from c++ code l_pix_addr = (msg >> np.uint(44)) & np.uint(0xFFFF) @@ -127,12 +121,12 @@ def decode_xy(msg, chip): return rowcol[1], rowcol[0] -@numba.jit(nopython=True) +@numba.jit(nopython=True, cache=True) def get_spidr(msg): return msg & np.uint(0xFFFF) -@numba.jit(nopython=True) +@numba.jit(nopython=True, cache=True) def decode_message(msg, chip, heartbeat_time: np.uint64 = 0): """Decode TPX3 packages of the second type corresponding to photon events (id'd via 0xB upper nibble) @@ -197,7 +191,8 @@ def decode_message(msg, chip, heartbeat_time: np.uint64 = 0): return x, y, ToT, ts -@numba.jit(nopython=True) + +@numba.jit(nopython=True, cache=True) def _ingest_raw_data(data): chips = np.zeros_like(data, dtype=np.uint8) @@ -281,37 +276,12 @@ def _ingest_raw_data(data): # Sort the timestamps - indx = np.argsort(ts[:photon_count], kind="mergesort") - chips = chips[indx] - x, y, tot, ts = x[indx], y[indx], tot[indx], ts[indx] + indx = np.argsort(ts[:photon_count], kind="mergesort") # is mergesort the best here? wondering if this could be optimized + x, y, tot, ts, chips = x[indx], y[indx], tot[indx], ts[indx], chips[indx] return x, y, tot, ts, chips -def ingest_from_files(fpaths: List[Union[str, Path]]) -> Iterable[Dict[str, NDArray]]: - """Parse values out of a sequence of timepix3 files with rollover of SPIDR counter. - - Parameters - ---------- - fpaths : A sorted sequence of tpx3 filepaths. - - Returns - ------- - An iterable over the parsing results, each encapsulated in a dictionary - """ - - for fpath in fpaths: - data = raw_as_numpy(fpath) - x, y, tot, ts, chips = _ingest_raw_data(data) - yield { - k.strip(): v - for k, v in zip( - "x, y, tot, timestamp, chips".split(","), - (x, y, tot, ts, chips), - ) - } - - def ingest_raw_data(data: IA) -> Dict[str, NDArray]: """ Parse values out of raw timepix3 data stream. @@ -328,17 +298,14 @@ def ingest_raw_data(data: IA) -> Dict[str, NDArray]: """ return { k.strip(): v - for k, v in zip("x, y, ToT, ts, chip_number".split(","), _ingest_raw_data(data)) + for k, v in zip("x, y, ToT, t, chip".split(","), _ingest_raw_data(data)) } -# ^-- tom wrote -# v-- justin wrote + """ Some basic functions that help take the initial output of ingest_raw_data and finish the processing. """ - - -def raw_to_sorted_df(fpath: Union[str, Path]) -> pd.DataFrame: +def tpx_to_raw_df(fpath: Union[str, Path]) -> pd.DataFrame: """ Parses a .tpx3 file and returns the raw data after timesorting. @@ -353,28 +320,7 @@ def raw_to_sorted_df(fpath: Union[str, Path]) -> pd.DataFrame: DataFrame of raw events from the .tpx3 file. """ raw_df = pd.DataFrame(ingest_raw_data(raw_as_numpy(fpath))) - return raw_df.sort_values("timestamp").reset_index(drop=True) - - -def condense_raw_df(df: pd.DataFrame) -> pd.DataFrame: - """ - Condenses the raw dataframe with only key information necesary for the analysis. Returns a dataframe with timestamp (renamed to t), x, y, and ToT. - - Parameters - ---------- - df : pd.DataFrame - DataFrame generated using raw_to_sorted_df(). - - Returns - ------- - pd.DataFrame - Dataframe condensed to only contain pertinent information for analysis. - """ - cdf = df[["timestamp", "x", "y", "ToT"]] - cdf = cdf.rename( - columns={"timestamp": "t"} - ) # obviously not necessary, just easier to type 't' a lot than 'timestamp' - return cdf + return raw_df.sort_values("t").reset_index(drop=True) # should we specify the sorting algorithm? at this point, it should be sorted anyway, but I think dataframes need to be explicitly sorted for use in e.g. merge_asof? def drop_zero_tot(df: pd.DataFrame) -> pd.DataFrame: @@ -391,26 +337,25 @@ def drop_zero_tot(df: pd.DataFrame) -> pd.DataFrame: pd.DataFrame df with only the events with ToT > 0 """ - fdf = df[df["ToT"] > 0] - return fdf + return df[df["ToT"] > 0] """ Functions to help perform clustering and centroiding on raw data. """ -TIMESTAMP_VALUE = ((1e-9) / 4096) * 25 -MICROSECOND = 10 ** (-6) +TIMESTAMP_VALUE = 1.5625*1e-9 # each raw timestamp is 1.5625 seconds +MICROSECOND = 1e-6 # We have had decent success with these values, but do not know for sure if they are optimal. -DEFAULT_CLUSTER_RADIUS = 2 -DEFAULT_CLUSTER_TW_MICROSECONDS = 0.5 +DEFAULT_CLUSTER_RADIUS = 3 +DEFAULT_CLUSTER_TW_MICROSECONDS = 0.3 DEFAULT_CLUSTER_TW = int(DEFAULT_CLUSTER_TW_MICROSECONDS * MICROSECOND / TIMESTAMP_VALUE) -def neighbor_set_from_df( +def cluster_df( df: pd.DataFrame, tw: int = DEFAULT_CLUSTER_TW, radius: int = DEFAULT_CLUSTER_RADIUS -) -> tuple[np.ndarray, Set[tuple[int]]]: +) -> tuple[np.ndarray, np.ndarray]: """ Uses scipy.spatial's KDTree to cluster raw input data. Requires a time window for clustering adjacent pixels and the total search radius. @@ -431,90 +376,41 @@ def neighbor_set_from_df( An set of tuples of the indices of the clustered events. The outer set is each cluster, and the inner tuples are the events in each cluster. """ events = np.array( - df[["t", "x", "y", "ToT", "t"]].values + df[["t", "x", "y", "ToT", "t"]].values # raw data stored in dataframe. duplicate timestamp column as the first instance is windowed ) # first three columns are for search radius of KDTree events[:, 0] = np.floor_divide(events[:, 0], tw) # bin by the time window - tree = KDTree(events[:, :3]) # generate KDTree based off the coordinates + tree = KDTree(events[:, :3]) # generate KDTree based off the coordinates (t/timewindow, x, y) neighbors = tree.query_ball_tree( tree, radius ) # compare tree against itself to find neighbors within the search radius - clusters = set(tuple(n) for n in neighbors) # turn the list of lists into a set of tuples - return events, clusters - - -def cluster_stats( - clusters: Set[tuple[int]] -) -> tuple[int]: - """ - Determines basic information about cluster information, such as the number of clusters and size of the largest cluster. - - Parameters - ---------- - clusters : Set[tuple[int]] - The set of tuples of clusters from neighbor_set_from_df() - - Returns - ------- - int - The total number of clusters - int - The number of events in the largest cluster - """ - num_clusters = len(clusters) - max_cluster = max(map(len, clusters)) - return num_clusters, max_cluster + if len(neighbors) >= 2147483647: # performance is marginally better if can use int32 for the indices, so check for that + dtype = np.int64 + else: + dtype = np.int32 + return pd.DataFrame(neighbors).fillna(-1).astype(dtype).drop_duplicates().values, events[:, 1:] # a bit weird, but faster than using sets and list operators to unpack neighbors -def create_cluster_arr( - clusters: Set[tuple[int]], num_clusters: int, max_cluster: int -) -> np.ndarray: # is there a better way to do this? - """ - Converts the clusters from a set of tuples of indices to an 2D numpy array format which can be efficiently iterated through with Numba. - - Parameters - ---------- - clusters : Set[tuple[int]] - The set of tuples of clusters from neighbor_set_from_df() - num_clusters : int - The total number of clusters - max_cluster : int - The number of events in the largest cluster - - Returns - ------- - np.ndarray - The cluster data now in a 2D numpy array. - """ - cluster_arr = np.full( - (num_clusters, max_cluster), -1, dtype=np.int64 - ) # fill with -1; these will be passed later - for cluster_num, cluster in enumerate(clusters): - for event_num, event in enumerate(cluster): - cluster_arr[cluster_num, event_num] = event - return cluster_arr - - -@numba.jit(nopython=True) -def cluster_arr_to_cent( - cluster_arr: np.ndarray, events: np.ndarray, num_clusters: int, max_cluster: int -) -> tuple[np.ndarray]: +@numba.jit(nopython=True, cache=True) +def centroid_clusters( + cluster_arr: np.ndarray, events: np.ndarray +) -> tuple[np.ndarray]: """ Performs the centroiding of a group of clusters using Numba. Note I originally attempted to unpack the clusters using list comprehensions, but this approach is significantly faster. Parameters ---------- - clusters : Set[tuple[int]] - The set of tuples of clusters from neighbor_set_from_df() - num_clusters : int - The total number of clusters - max_cluster : int - The number of events in the largest clust + clusters : nd.array + The numpy representation of the clusters' event indices. + events : nd.array + The numpy represetation of the event data. Returns ------- tuple[np.ndarray] t, xc, yc, ToT_max, ToT_sum, and n (number of events) in each cluster. """ + num_clusters = cluster_arr.shape[0] + max_cluster = cluster_arr.shape[1] t = np.zeros(num_clusters, dtype="uint64") xc = np.zeros(num_clusters, dtype="float32") yc = np.zeros(num_clusters, dtype="float32") @@ -527,13 +423,13 @@ def cluster_arr_to_cent( for event_num in range(max_cluster): event = cluster_arr[cluster_id, event_num] if event > -1: # if we have an event here - if events[event, 3] > _ToT_max: # find the max ToT, assign, use that time - _ToT_max = events[event, 3] - t[cluster_id] = events[event, 4] + if events[event, 2] > _ToT_max: # find the max ToT, assign, use that time + _ToT_max = events[event, 2] + t[cluster_id] = events[event, 3] ToT_max[cluster_id] = _ToT_max - xc[cluster_id] += events[event, 1] * events[event, 3] # x and y centroids by time over threshold - yc[cluster_id] += events[event, 2] * events[event, 3] - ToT_sum[cluster_id] += events[event, 3] # calcuate sum + xc[cluster_id] += events[event, 0] * events[event, 2] # x and y centroids by time over threshold + yc[cluster_id] += events[event, 1] * events[event, 2] + ToT_sum[cluster_id] += events[event, 2] # calcuate sum n[cluster_id] += np.ubyte(1) # number of events in cluster else: break @@ -568,60 +464,15 @@ def ingest_cent_data( } -def cent_to_numpy( - cluster_arr: np.ndarray, events: int, num_clusters: int, max_cluster: int -) -> Dict[str, np.ndarray]: - """ - Wrapper function to perform ingest_cent_data(cluster_arr_to_cent()) - - Parameters - ---------- - cluster_arr : np.ndarray - The array of cluster events from create_cluster_arr() - events : int - Number of photon events - num_clusters : int - The total number of clusters - max_cluster : int - The number of events in the largest clust - - Returns - ------- - Dict[str, np.ndarray] - Keys of t, xc, yc, ToT_max, ToT_sum, and n (number of events) in each cluster. - """ - return ingest_cent_data(cluster_arr_to_cent(cluster_arr, events, num_clusters, max_cluster)) - - -def cent_to_df( - cd_np: Dict[str, np.ndarray] -) -> pd.DataFrame: - """ - Returns the centroided dataframe from the zipped inputs. - - Parameters - ---------- - cd_np : Dict[str, np.ndarray] - Dictionary of the clustered data. - - Returns - ------- - pd.DataFrame - Time sorted dataframe of the centroids. - """ - cent_df = pd.DataFrame(cd_np) - return cent_df.sort_values("t").reset_index(drop=True) - - -def raw_df_to_cluster_df( - raw_df: pd.DataFrame, tw: int = DEFAULT_CLUSTER_TW, radius: int = DEFAULT_CLUSTER_RADIUS +def raw_df_to_cent_df( + df: pd.DataFrame, tw: int = DEFAULT_CLUSTER_TW, radius: int = DEFAULT_CLUSTER_RADIUS ) -> pd.DataFrame: """ Uses functions defined herein to take Dataframe of raw data and return dataframe of clustered data. Parameters ---------- - raw_df : pd.DataFrame + df : pd.DataFrame Pandas DataFrame of the raw data tw : int The time window to be considered "coincident" for clustering purposes @@ -633,11 +484,10 @@ def raw_df_to_cluster_df( pd.DataFrame Pandas DataFrame of the centroided data. """ - filt_cond_raw_df = drop_zero_tot(condense_raw_df(raw_df)) - events, clusters = neighbor_set_from_df(filt_cond_raw_df, tw, radius) - num_clusters, max_cluster = cluster_stats(clusters) - cluster_arr = create_cluster_arr(clusters, num_clusters, max_cluster) - return cent_to_df(cent_to_numpy(cluster_arr, events, num_clusters, max_cluster)) + fdf = drop_zero_tot(df) + cluster_arr, events = cluster_df(fdf, tw, radius) + data = centroid_clusters(cluster_arr, events) + return pd.DataFrame(ingest_cent_data(data)).sort_values("t").reset_index(drop=True) # should we specify the sort type here? this should be *almost* completely sorted already def add_centroid_cols( @@ -659,107 +509,31 @@ def add_centroid_cols( Originally dataframe with new columns x, y, and t_ns added. """ if gap: - df.loc[df['xc'] >= 255.5, 'xc'] += 2 - df.loc[df['yc'] >= 255.5, 'yc'] += 2 - df["x"] = np.round(df["xc"]).astype(np.uint16) + df.loc[df["xc"] >= 255.5, "xc"] += 2 + df.loc[df["yc"] >= 255.5, "yc"] += 2 + df["x"] = np.round(df["xc"]).astype(np.uint16) # sometimes you just want to know the closest pixel df["y"] = np.round(df["yc"]).astype(np.uint16) - df["t_ns"] = df["t"] / 4096 * 25 + df["t_ns"] = (df["t"].astype(np.float64) * 1.5625) # better way to convert to ns while maintaining precision? return df """ -A bunch of functions to help process multiple related .tpx3 files into Pandas dataframes stored in .h5 files. +Functions to help process multiple related .tpx3 files into Pandas dataframes stored in .h5 files. """ RAW_H5_SUFFIX = "" CENT_H5_SUFFIX = "_cent" CONCAT_H5_SUFFIX = "_cent" -def extract_fpaths_from_sid( - sid: int -) -> List[str]: - """ - Extract file paths from a given sid. - - Parameters - ---------- - sid : int - Short ID of a BlueSky scan - - Returns - ------- - List[str] - Filepaths of the written .tpx3, as recorded in Tiled - """ - return list(db[sid].table()["tpx3_files_raw_filepaths"].to_numpy()[0]) - - -def extract_uid_from_fpaths( - fpaths: List[str] -) -> str: - """ - Extract scan unique ID from file paths. - - Parameters - ---------- - fpaths : List[str] - List of the filepaths. - - Returns - ------- - str - String of the first file's unique ID. - - """ - return os.path.basename(fpaths[0])[:23] - - -def extract_dir_from_fpaths( - fpaths: List[str] -) -> str: - """ - Extract directory from file paths. - - Parameters - ---------- - fpaths : List[str] - List of the filepaths. - - Returns - ------- - str - String of the first file's directory. - - """ - return os.path.dirname(fpaths[0]) - - -def extract_uid_from_sid( - sid: int -) -> str: - """ - Extract user ID from a given sid. - - Parameters - ---------- - sid : int - - Returns - ------- - str - String of the short ID's corresponding unique ID. - - """ - return extract_uid_from_fpaths(extract_fpaths_from_sid(sid)) - - -def convert_file( - fpath: Union[str, Path], time_window_microsecond: float = DEFAULT_CLUSTER_TW_MICROSECONDS, radius: int = DEFAULT_CLUSTER_RADIUS, print_details: bool = False +def convert_tpx_file( + fpath: Union[str, Path], time_window_microsecond: float = DEFAULT_CLUSTER_TW_MICROSECONDS, radius: int = DEFAULT_CLUSTER_RADIUS, print_details: bool = False, overwrite: bool = True ): """ Convert a .tpx3 file into raw and centroided Pandas dataframes, which are stored in .h5 files. + TO DO: Args to specify output directory (default will be same directory as .tpx3 file as is now). + Parameters ---------- fpath : Union[str, Path] @@ -770,193 +544,122 @@ def convert_file( The radius, in pixels, to perform centroiding print_details : bool = False Boolean toggle about whether to print detailed data. + overwrite : bool = True + Boolean toggle about whether to overwrite pre-existing data. + """ fname, ext = os.path.splitext(fpath) dfname = "{}{}.h5".format(fname, RAW_H5_SUFFIX) dfcname = "{}{}.h5".format(fname, CONCAT_H5_SUFFIX) if ext == ".tpx3" and os.path.exists(fpath): - file_size = os.path.getsize(fpath) - have_df = os.path.exists(dfname) - have_dfc = os.path.exists(dfcname) - - if have_df and have_dfc: - print("-> {} exists, skipping.".format(dfname)) - else: - print("-> Processing {}, size: {:.1f} MB".format(fpath, file_size/1000000)) - time_window = time_window_microsecond * 1e-6 - time_stamp_conversion = 6.1e-12 - timedif = int(time_window / time_stamp_conversion) - - if print_details: - print("Loading {} data into dataframe...".format(fpath)) - df = raw_to_sorted_df(fpath) - num_events = df.shape[0] - - if print_details: - print("Loading {} complete. {} events found. Saving to: {}".format(fpath, num_events, dfname)) - df.to_hdf(dfname, key='df', mode='w') - - if print_details: - print("Saving {} complete. Beginning clustering...".format(dfname)) - df_c = raw_df_to_cluster_df(df, timedif, radius) - num_clusters = df_c.shape[0] + + try: + file_size = os.path.getsize(fpath) + have_df = os.path.exists(dfname) + have_dfc = os.path.exists(dfcname) + + if have_df and have_dfc and not overwrite: + + print("-> {} exists, skipping.".format(dfname)) + + else: + + if print_details: + print("-> Processing {}, size: {:.1f} MB".format(fpath, file_size/1000000)) + + time_window = time_window_microsecond * 1e-6 + time_stamp_conversion = 6.1e-12 + timedif = int(time_window / time_stamp_conversion) + + if print_details: + print("Loading {} data into dataframe...".format(fpath)) + + df = tpx_to_raw_df(fpath) + num_events = df.shape[0] + + if print_details: + print("Loading {} complete. {} events found. Saving to: {}".format(fpath, num_events, dfname)) + + df.to_hdf(dfname, key="df", format="table", mode="w") + + if print_details: + print("Saving {} complete. Beginning clustering...".format(dfname)) + + df_c = raw_df_to_cent_df(df, timedif, radius) + + num_clusters = df_c.shape[0] + + if print_details: + print("Clustering {} complete. {} clusters found. Saving to {}".format(fpath, num_clusters, dfcname)) + + df_c.to_hdf(dfcname, key="df", format="table", mode="w") + + if print_details: + print("Saving {} complete. Moving onto next file.".format(dfcname)) + + except Exception as e: + if print_details: - print("Clustering {} complete. {} clusters found. Saving to {}".format(fpath, num_clusters, dfcname)) - df_c.to_hdf(dfcname, key='df', mode='w') - print("Saving {} complete. Moving onto next file.".format(dfcname)) + print("Conversion of {} failed due to {}, moving on.".format(fpath,e)) + else: - print("File not found. Moving onto next file.") - - -def convert_tpx3_parallel( - fpaths: Union[str, Path], num_workers: int = None -): + if print_details: + print("File not found. Moving onto next file.") + + +def convert_tpx3_files_parallel(fpaths: Union[List[str], List[Path]], num_workers: int = None): """ - Convert a list of .tpx3 files in a parallel processing pool. + Convert a list .tpx3 files in parallel using multiprocessing and convert_tpx_file(). + + TO DO: Accept more arguments for convert_tpx_file. Parameters ---------- - fpaths : Union[str, Path] - .tpx3 file paths to convert in a parallel processing pool. - num_workers : int = None - Number of parallel workers to employ. + fpath : Union[str, Path] + .tpx3 file path + time_window_microsecond : float = DEFAULT_CLUSTER_TW_MICROSECONDS + The time window, in microseconds, to perform centroiding + radius : int = DEFAULT_CLUSTER_RADIUS + The radius, in pixels, to perform centroiding + print_details : bool = False + Boolean toggle about whether to print detailed data. + overwrite : bool = True + Boolean toggle about whether to overwrite pre-existing data. + """ - if num_workers == None: - num_cores = multiprocessing.cpu_count() - max_workers = num_cores-1 + if num_workers is None: + max_workers = multiprocessing.cpu_count() else: max_workers = num_workers - - with multiprocessing.Pool(processes=max_workers) as pool: - pool.map(convert_file, fpaths) - - print("Parallel conversion complete") - - -def convert_tpx3_st(fpaths: Union[str, Path]): - """ - Convert a list of .tpx3 files in a single thread. - - Parameters - ---------- - fpaths : Union[str, Path] - .tpx3 file paths to convert in a single thread. - """ - for file in fpaths: - convert_file(file) - -def get_cent_files( - uid: str, dir_name: Union[str, Path] -) -> List[str]: - """ - Gets a list of the centroided .h5 files from a given uid, sorted by sequence number. - - Parameters - ---------- - uid : str - The unique ID of the scan of which we want to get the files. + with multiprocessing.Pool(processes=max_workers) as pool: + for _ in tqdm(pool.imap_unordered(convert_tpx_file, fpaths), total=len(fpaths)): + pass - dir_name : Union[str, path] - Directory to look in for the files. - Returns - ------- - List[str] - List of the centroided file paths. +def convert_tpx3_files(fpaths: Union[List[str], List[Path]], print_details = True): """ - cent_files = [ - os.path.join(dir_name, file) - for file in os.listdir(dir_name) - if file.endswith("{}.h5".format(CENT_H5_SUFFIX)) and str(uid) in file and len(os.path.basename(file)) == 44 - ] - - cent_files.sort(key=lambda f: int(os.path.splitext(os.path.basename(f))[0].split("_")[-2])) - return cent_files - - -def concat_cent_files( - cfpaths: List[Union[str, Path]] -): - """ - Concatenates several centroided files together. + Convert a list .tpx3 files in a single process using convert_tpx_file(). - Parameters - ---------- - cfpaths : List[str, Path] - List of the centroided .h5 files to concatenate together. - """ - dir_name = os.path.dirname(cfpaths[0]) - uid = extract_uid_from_fpaths(cfpaths) - - dfs = [pd.read_hdf(fpath, key='df') for fpath in tqdm(cfpaths)] - combined_df = pd.concat(dfs).reset_index(drop=True) - - save_path = os.path.join(dir_name, "{}{}.h5".format(uid, CONCAT_H5_SUFFIX)) - combined_df.to_hdf(save_path, key='df', mode='w') - - print("-> Saving complete.") - - -def get_con_cent_file( - sid: int -) -> str: - """ - Gets the location of the concatenated centroid files of a given sid. + TO DO: Accept more arguments for convert_tpx_file. Parameters ---------- - sid : int - Short ID of whichto get the centroided file path + fpath : Union[str, Path] + .tpx3 file path + time_window_microsecond : float = DEFAULT_CLUSTER_TW_MICROSECONDS + The time window, in microseconds, to perform centroiding + radius : int = DEFAULT_CLUSTER_RADIUS + The radius, in pixels, to perform centroiding + print_details : bool = False + Boolean toggle about whether to print detailed data. + overwrite : bool = True + Boolean toggle about whether to overwrite pre-existing data. - Returns - ------- - str - Path of the centroided file. """ - fpaths = extract_fpaths_from_sid(sid) - uid = extract_uid_from_fpaths(fpaths) - dir_name = extract_dir_from_fpaths(fpaths) - cfpath = os.path.join(dir_name, "{}{}.h5".format(uid, CONCAT_H5_SUFFIX)) - - if os.path.exists(cfpath): - return cfpath - else: - print("-> Warning: {} does not exist".format(cfpath)) - return None - - -def convert_sids( - sids: List[int] -): - """ - Convert given sids by converting each .tpx3 file and then concatenating results together into a master dataframe. - - Parameters - ---------- - sids : List[int] - List of BlueSky scans' short IDs to convert. - """ - - for sid in sids: - print("\n\n---> Beginning sid: {} <---\n".format(sid)) + for file in fpaths: + convert_tpx_file(file, print_details = print_details) - tpx3fpaths = extract_fpaths_from_sid(sid) - dir_name = extract_dir_from_fpaths(tpx3fpaths) - num_tpx = len(tpx3fpaths) - uid = extract_uid_from_fpaths(tpx3fpaths) - - convert_tpx3_parallel(tpx3fpaths, num_workers=16) - centfpaths = get_cent_files(uid, dir_name) - num_cent = len(centfpaths) - - if num_tpx == num_cent: - print("---> Conversion numbers match") - concat_cent_files(centfpaths) - else: - print("---> Warning: conversion mismatch: tpx3={}, cent={}".format(num_tpx, num_cent)) - - print("---> Done with {}!".format(sid)) - gc.collect() + \ No newline at end of file diff --git a/tpx3awkward/examples/example.ipynb b/tpx3awkward/examples/example.ipynb new file mode 100644 index 0000000..6eeabb1 --- /dev/null +++ b/tpx3awkward/examples/example.ipynb @@ -0,0 +1,351 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "76010898-2082-49cb-9bff-8173fd79ad6e", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import _utils as tpx\n", + "import pandas as pd\n", + "from tiled.client import from_uri\n", + "db = from_uri('https://tiled.nsls2.bnl.gov', 'dask')['chx']['raw']" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "59bb0e40-1686-413c-8595-c38655e2d0d6", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "sid = 143200 # example SID (1800 partitions at 1 sec/partition from NSLS-II quantum microscope project)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "1b629cf7-1d46-49b9-a41b-43f3b9972f4e", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Raw dataframe: \n", + " x y ToT t chip\n", + "0 502 452 475 11301032 1\n", + "1 405 272 275 11302904 1\n", + "2 406 272 350 11302904 1\n", + "3 295 135 525 11303039 0\n", + "4 358 509 300 11303780 1\n", + "... ... ... ... ... ...\n", + "971254 200 46 550 651295354 3\n", + "971255 489 104 450 651296021 0\n", + "971256 389 21 300 651296305 0\n", + "971257 390 21 300 651296306 0\n", + "971258 37 325 75 651299518 2\n", + "\n", + "[971259 rows x 5 columns]\n", + "Centroided dataframe: \n", + " t xc yc ToT_max ToT_sum n\n", + "0 11301032 502.000000 452.0 475 475 1\n", + "1 11302904 405.559998 272.0 350 625 2\n", + "2 11303039 295.000000 135.0 525 525 1\n", + "3 11303780 358.478271 509.0 300 575 2\n", + "4 11303973 412.409088 60.0 325 550 2\n", + "... ... ... ... ... ... ..\n", + "659582 651294809 326.000000 379.0 475 475 1\n", + "659583 651295354 200.000000 46.0 550 550 1\n", + "659584 651296021 489.000000 104.0 450 450 1\n", + "659585 651296305 389.500000 21.0 300 600 2\n", + "659586 651299518 37.000000 325.0 75 75 1\n", + "\n", + "[659587 rows x 6 columns]\n", + "Centroided dataframe: \n", + " t xc yc ToT_max ToT_sum n x y \\\n", + "0 11301032 504.000000 454.0 475 475 1 504 454 \n", + "1 11302904 407.559998 274.0 350 625 2 408 274 \n", + "2 11303039 297.000000 135.0 525 525 1 297 135 \n", + "3 11303780 360.478271 511.0 300 575 2 360 511 \n", + "4 11303973 414.409088 60.0 325 550 2 414 60 \n", + "... ... ... ... ... ... .. ... ... \n", + "659582 651294809 328.000000 381.0 475 475 1 328 381 \n", + "659583 651295354 200.000000 46.0 550 550 1 200 46 \n", + "659584 651296021 491.000000 104.0 450 450 1 491 104 \n", + "659585 651296305 391.500000 21.0 300 600 2 392 21 \n", + "659586 651299518 37.000000 327.0 75 75 1 37 327 \n", + "\n", + " t_ns \n", + "0 1.765786e+07 \n", + "1 1.766079e+07 \n", + "2 1.766100e+07 \n", + "3 1.766216e+07 \n", + "4 1.766246e+07 \n", + "... ... \n", + "659582 1.017648e+09 \n", + "659583 1.017649e+09 \n", + "659584 1.017650e+09 \n", + "659585 1.017650e+09 \n", + "659586 1.017655e+09 \n", + "\n", + "[659587 rows x 9 columns]\n", + "Partition duration: 0.999997634375 sec.\n" + ] + } + ], + "source": [ + "files = db[sid]['primary']['data']['tpx3_files_raw_filepaths'][0].compute()\n", + "\n", + "file = files[0][5:] # cut off 'file:' from beginning\n", + "\n", + "df = tpx.tpx_to_raw_df(file)\n", + "cdf = tpx.raw_df_to_cent_df(df)\n", + "\n", + "print('Raw dataframe: ')\n", + "print(df)\n", + "\n", + "print('Centroided dataframe: ')\n", + "print(cdf)\n", + "\n", + "cdf = tpx.add_centroid_cols(cdf)\n", + "print('Centroided dataframe: ')\n", + "print(cdf)\n", + "\n", + "duration = (cdf.iloc[-1]['t_ns'] - cdf.iloc[0]['t_ns'])/1e9\n", + "print(f'Partition duration: {duration} sec.')" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "48254ae7-8d0c-4766-b1d1-2b7f4732934c", + "metadata": {}, + "outputs": [], + "source": [ + "sid = 143201 # new example SID (1800 partitions at 1 sec/partition from NSLS-II quantum microscope project)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "2adae433-a68c-436e-9640-d4a9745af969", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "-> Processing /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000.tpx3, size: 10.5 MB\n", + "Loading /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000.tpx3 data into dataframe...\n", + "Loading /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000.tpx3 complete. 975938 events found. Saving to: /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000.h5\n", + "Saving /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000.h5 complete. Beginning clustering...\n", + "Clustering /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000.tpx3 complete. 659537 clusters found. Saving to /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000_cent.h5\n", + "Saving /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000_cent.h5 complete. Moving onto next file.\n", + "-> Processing /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000001.tpx3, size: 10.5 MB\n", + "Loading /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000001.tpx3 data into dataframe...\n", + "Loading /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000001.tpx3 complete. 972867 events found. Saving to: /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000001.h5\n", + "Saving /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000001.h5 complete. Beginning clustering...\n", + "Clustering /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000001.tpx3 complete. 657491 clusters found. Saving to /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000001_cent.h5\n", + "Saving /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000001_cent.h5 complete. Moving onto next file.\n" + ] + } + ], + "source": [ + "files = db[sid]['primary']['data']['tpx3_files_raw_filepaths'][0].compute()\n", + "files = [f.replace('file:', '') for f in files]\n", + "tpx.convert_tpx3_files(files[0:2])" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "05332e5b-17eb-4074-ad92-51ef1273e9f4", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Raw dataframe: \n", + " x y ToT t chip\n", + "0 210 132 575 11294477 3\n", + "1 106 227 375 11294540 3\n", + "2 107 227 150 11294544 3\n", + "3 381 346 475 11294882 1\n", + "4 268 444 450 11295743 1\n", + "... ... ... ... ... ...\n", + "975933 103 441 600 651291179 2\n", + "975934 193 383 450 651294115 2\n", + "975935 121 280 400 651294141 2\n", + "975936 256 373 400 651294145 1\n", + "975937 255 373 50 651294166 2\n", + "\n", + "[975938 rows x 5 columns]\n", + "Centroided dataframe: \n", + " t xc yc ToT_max ToT_sum n\n", + "0 11294477 210.000000 132.0 575 575 1\n", + "1 11294540 106.285713 227.0 375 525 2\n", + "2 11294882 381.000000 346.0 475 475 1\n", + "3 11295743 268.000000 444.0 450 450 1\n", + "4 11298466 420.000000 52.0 450 450 1\n", + "... ... ... ... ... ... ..\n", + "659532 651290601 101.000000 436.0 575 575 1\n", + "659533 651291179 103.000000 441.0 600 600 1\n", + "659534 651294115 193.000000 383.0 450 450 1\n", + "659535 651294141 121.000000 280.0 400 400 1\n", + "659536 651294145 255.888885 373.0 400 450 2\n", + "\n", + "[659537 rows x 6 columns]\n", + "Centroided dataframe: \n", + " t xc yc ToT_max ToT_sum n x y \\\n", + "0 11294477 210.000000 132.0 575 575 1 210 132 \n", + "1 11294540 106.285713 227.0 375 525 2 106 227 \n", + "2 11294882 383.000000 348.0 475 475 1 383 348 \n", + "3 11295743 270.000000 446.0 450 450 1 270 446 \n", + "4 11298466 422.000000 52.0 450 450 1 422 52 \n", + "... ... ... ... ... ... .. ... ... \n", + "659532 651290601 101.000000 438.0 575 575 1 101 438 \n", + "659533 651291179 103.000000 443.0 600 600 1 103 443 \n", + "659534 651294115 193.000000 385.0 450 450 1 193 385 \n", + "659535 651294141 121.000000 282.0 400 400 1 121 282 \n", + "659536 651294145 257.888885 375.0 400 450 2 258 375 \n", + "\n", + " t_ns \n", + "0 1.764762e+07 \n", + "1 1.764772e+07 \n", + "2 1.764825e+07 \n", + "3 1.764960e+07 \n", + "4 1.765385e+07 \n", + "... ... \n", + "659532 1.017642e+09 \n", + "659533 1.017642e+09 \n", + "659534 1.017647e+09 \n", + "659535 1.017647e+09 \n", + "659536 1.017647e+09 \n", + "\n", + "[659537 rows x 9 columns]\n", + "Partition duration: 0.99999948125 sec.\n" + ] + } + ], + "source": [ + "df = pd.read_hdf('/nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000.h5')\n", + "cdf = pd.read_hdf('/nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000_cent.h5')\n", + "\n", + "print('Raw dataframe: ')\n", + "print(df)\n", + "\n", + "print('Centroided dataframe: ')\n", + "print(cdf)\n", + "\n", + "cdf = tpx.add_centroid_cols(cdf)\n", + "print('Centroided dataframe: ')\n", + "print(cdf)\n", + "\n", + "duration = (cdf.iloc[-1]['t_ns'] - cdf.iloc[0]['t_ns'])/1e9\n", + "print(f'Partition duration: {duration} sec.')" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "c981be40-e631-4f73-bbab-4fadda8b9285", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "100%|██████████| 1800/1800 [07:51<00:00, 3.82it/s]\n" + ] + } + ], + "source": [ + "try:\n", + " files = db[sid]['primary']['data']['tpx3_files_raw_filepaths'][0].compute()\n", + " files = [f.replace('file:', '') for f in files]\n", + " tpx.convert_tpx3_files_parallel(files)\n", + " \n", + "except Exception as e:\n", + " print(f'Could not finish {sid} due to {e}') " + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "946fa74f-8c8c-4f36-bc0c-9f203e023f78", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "All raw .h5 files exist!\n", + "All centroided raw .h5 files exist!\n" + ] + } + ], + "source": [ + "import os\n", + "\n", + "raw_h5_files = [os.path.exists(f.replace('.tpx3', '.h5')) for f in files]\n", + "cent_h5_files = [os.path.exists(f.replace('.tpx3', '_cent.h5')) for f in files]\n", + "\n", + "if all(raw_h5_files):\n", + " print('All raw .h5 files exist!')\n", + "else:\n", + " print('Missing .h5 files!')\n", + " \n", + "if all(cent_h5_files):\n", + " print('All centroided raw .h5 files exist!')\n", + "else:\n", + " print('Missing centroided .h5 files!')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "705c639d-dac9-4517-b5ff-ce44365ed4df", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}