Source code for sarcasm.structure

# -*- coding: utf-8 -*-
# Copyright (c) 2025 University Medical Center Göttingen, Germany.
# All rights reserved.
#
# Patent Pending: DE 10 2024 112 939.5
# SPDX-License-Identifier: LicenseRef-Proprietary-See-LICENSE
#
# This software is licensed under a custom license. See the LICENSE file
# in the root directory for full details.
#
# **Commercial use is prohibited without a separate license.**
# Contact MBM ScienceBridge GmbH (https://sciencebridge.de/en/) for licensing.


import glob
import os
import random
import shutil
import warnings
from collections import deque
from multiprocessing import Pool
from typing import Optional, Tuple, Union, List, Literal, Any, Dict
os.environ["KMP_WARNINGS"] = "False"
warnings.filterwarnings("ignore", message=".*omp_set_nested.*")


import igraph as ig
import networkx as nx
import numpy as np
import pandas as pd
import tifffile
import torch
from bio_image_unet import multi_output_unet3d as unet3d
from bio_image_unet.multi_output_unet.multi_output_nested_unet import MultiOutputNestedUNet_3Levels
from bio_image_unet.multi_output_unet.predict import Predict as Predict_UNet
from bio_image_unet.progress import ProgressNotifier
from joblib import Parallel, delayed
from scipy import ndimage, stats, sparse
from scipy.optimize import curve_fit, linear_sum_assignment
from scipy.spatial import cKDTree
from scipy.spatial.distance import directed_hausdorff, squareform, pdist
from skimage import segmentation, morphology, measure
from skimage.draw import line
from skimage.measure import label, regionprops_table
from skimage.morphology import skeletonize, disk, binary_dilation
from sklearn.cluster import AgglomerativeClustering
from sklearn.neighbors import NearestNeighbors

from sarcasm.core import SarcAsM
from sarcasm.ioutils import IOUtils
from sarcasm.utils import Utils


[docs] class Structure(SarcAsM): """ Class to analyze sarcomere morphology. Attributes ---------- data : dict A dictionary with structure data. """ def __init__(self, filepath: Union[str, os.PathLike], restart: bool = False, channel: Union[int, None, Literal['RGB']] = None, auto_save: bool = True, use_gui: bool = False, device: Union[torch.device, Literal['auto']] = 'auto', **info: Dict[str, Any] ) -> None: """ Initialize the structural analysis of a tiff file. Parameters ---------- filepath : str | os.PathLike Path to the TIFF file for analysis. restart : bool, optional If True, deletes existing analysis and starts fresh (default: False). channel : int, None or Literal['RGB'], optional Specifies the channel with sarcomeres in multicolor stacks (default: None). auto_save : bool, optional Automatically saves analysis results when True (default: True). use_gui : bool, optional Indicates GUI mode operation (default: False). device : Union[torch.device, Literal['auto']], optional Device for PyTorch computations. 'auto' selects CUDA/MPS if available (default: 'auto'). **info : Any Additional metadata as keyword arguments (e.g. cell_line='wt'). """ # init super SarcAsM object super().__init__(filepath=filepath, restart=restart, channel=channel, auto_save=auto_save, use_gui=use_gui, device=device, **info) # Initialize structure data dictionary if os.path.exists(self.__get_structure_data_file()): self._load_structure_data() else: self.data = {} def __get_structure_data_file(self, is_temp_file: bool = False) -> str: """ Returns the path to the structure data file. Parameters ---------- is_temp_file : bool, optional If True, returns the path to a temporary file. This temporary file is used to prevent creating corrupted data files due to aborted operations (e.g., exceptions or user intervention). The temporary file can be committed to a final file by renaming it. Default is False. Returns ------- str The path to the structure data file, either temporary or final. """ file_name = "structure.temp.json" if is_temp_file else "structure.json" return os.path.join(self.data_dir, file_name)
[docs] def commit(self) -> None: """ Commit data by renaming the temporary file to the final data file. """ temp_file_path = self.__get_structure_data_file(is_temp_file=True) final_file_path = self.__get_structure_data_file() if os.path.exists(temp_file_path): if os.path.exists(final_file_path): os.remove(final_file_path) os.rename(temp_file_path, final_file_path)
[docs] def store_structure_data(self, override: bool = True) -> None: """ Store structure data in a JSON file. Parameters ---------- override : bool, optional If True, override the file. """ if override or not os.path.exists(self.__get_structure_data_file(is_temp_file=False)): IOUtils.json_serialize(self.data, self.__get_structure_data_file(is_temp_file=True)) self.commit()
[docs] def _load_structure_data(self) -> None: """ Load structure data from the final data file; fall back to the temporary file if needed. Raises ------ Exception If no valid structure data could be loaded. """ try: if os.path.exists(self.__get_structure_data_file(is_temp_file=False)): self.data = IOUtils.json_deserialize(self.__get_structure_data_file(is_temp_file=False)) elif os.path.exists(self.__get_structure_data_file(is_temp_file=True)): self.data = IOUtils.json_deserialize(self.__get_structure_data_file(is_temp_file=True)) except: if os.path.exists(self.__get_structure_data_file(is_temp_file=True)): self.data = IOUtils.json_deserialize(self.__get_structure_data_file(is_temp_file=True)) # ensure compatibility with data from early version keys_old = {'points': 'pos_vectors', 'sarcomere_length_points': 'sarcomere_length_vectors', 'midline_length_points': 'midline_length_vectors', 'midline_id_points': 'midline_id_vectors', 'sarcomere_orientation_points': 'sarcomere_orientation_vectors', 'max_score_points': 'max_score_vectors'} for key, val in keys_old.items(): if key in self.data: self.data[val] = self.data[key] keys = [key for key in self.data.keys() if 'timepoints' in key] for key in keys: new_key = key.replace('timepoints', 'frames') self.data[new_key] = self.data[key] if isinstance(self.data[new_key], str) and self.data[new_key] == 'all': self.data[new_key] = list(range(self.metadata['frames'])) if self.data is None: raise Exception('Loading of structure failed')
[docs] def get_list_lois(self): """Returns list of LOIs""" return Utils.get_lois_of_file(self.filepath)
[docs] def detect_sarcomeres(self, frames: Union[str, int, List[int], np.ndarray] = 'all', model_path: str = None, max_patch_size: Tuple[int, int] = (1024, 1024), normalization_mode: str = 'all', clip_thres: Tuple[float, float] = (0., 99.98), progress_notifier: ProgressNotifier = ProgressNotifier.progress_notifier_tqdm()): """ Predict sarcomeres (Z-bands, mbands, distance, orientation) with U-Net. Parameters ---------- frames : Union[str, int, List[int], np.ndarray] Frames for sarcomere detection ('all' for all frames, int for a single frame, list or ndarray for selected frames). Defaults to 'all'. model_path : str, optional Path of trained network weights for U-Net. Default is None. max_patch_size : tuple of int, optional Maximal patch dimensions for convolutional neural network (n_x, n_y). Default is (1024, 1024). normalization_mode : str, optional Mode for intensity normalization for 3D stacks prior to prediction ('single': each image individually, 'all': based on histogram of full stack, 'first': based on histogram of first image in stack). Default is 'all'. clip_thres : tuple of float, optional Clip threshold (lower / upper) for intensity normalization. Default is (0., 99.8). progress_notifier : ProgressNotifier, optional Progress notifier for inclusion in GUI. Default is ProgressNotifier.progress_notifier_tqdm(). Returns ------- None """ if isinstance(frames, str) and frames == 'all': images = self.read_imgs() list_frames = list(range(len(images))) elif np.issubdtype(type(frames), np.integer) or isinstance(frames, list) or type(frames) is np.ndarray: images = self.read_imgs(frames=frames) if np.issubdtype(type(frames), np.integer): list_frames = [frames] else: list_frames = list(frames) else: raise ValueError('frames argument not valid') print('\nPredicting sarcomeres ...') if model_path is None or model_path == 'generalist': model_path = os.path.join(self.model_dir, 'model_sarcomeres_generalist.pt') _ = Predict_UNet(images, model_params=model_path, result_path=self.base_dir, max_patch_size=max_patch_size, normalization_mode=normalization_mode, network=MultiOutputNestedUNet_3Levels, clip_threshold=clip_thres, device=self.device, progress_notifier=progress_notifier) del _ if torch.cuda.is_available(): torch.cuda.empty_cache() _dict = {'params.detect_sarcomeres.frames': list_frames, 'params.detect_sarcomeres.model': model_path, 'params.detect_sarcomeres.normalization_mode': normalization_mode, 'params.detect_sarcomeres.clip_threshold': clip_thres} self.data.update(_dict) if self.auto_save: self.store_structure_data()
[docs] def detect_z_bands_fast_movie(self, model_path: Optional[str] = None, max_patch_size: Tuple[int, int, int] = (32, 256, 256), normalization_mode: str = 'all', clip_thres: Tuple[float, float] = (0., 99.8), progress_notifier: ProgressNotifier = ProgressNotifier.progress_notifier_tqdm()) -> None: """ Predict sarcomere z-bands with 3D U-Net for high-speed movies for improved temporal consistency. Parameters ---------- model_path : str, optional Path of trained network weights for 3D U-Net. Default is None. max_patch_size : tuple of int, optional Maximal patch dimensions for convolutional neural network (n_frames, n_x, n_y). Dimensions need to be divisible by 16. Default is (32, 256, 256). normalization_mode : str, optional Mode for intensity normalization for 3D stacks prior to prediction ('single': each image individually, 'all': based on histogram of full stack, 'first': based on histogram of first image in stack). Default is 'all'. clip_thres : tuple of float, optional Clip threshold (lower / upper) for intensity normalization. Default is (0., 99.8). progress_notifier : ProgressNotifier, optional Progress notifier for inclusion in GUI. Default is ProgressNotifier.progress_notifier_tqdm(). Returns ------- None """ print('\nPredicting sarcomere z-bands ...') if model_path is None: model_path = os.path.join(self.model_dir, 'model_z_bands_unet3d.pt') if len(max_patch_size) != 3: raise ValueError('patch size for prediction has to be be (frames, x, y)') _ = unet3d.Predict(self.read_imgs(), model_params=model_path, result_path=self.base_dir, max_patch_size=max_patch_size, normalization_mode=normalization_mode, device=self.device, clip_threshold=clip_thres, progress_notifier=progress_notifier) del _ if torch.cuda.is_available(): torch.cuda.empty_cache() _dict = {'params.detect_z_bands_fast_movie.model': model_path, 'params.detect_z_bands_fast_movie.max_patch_size': max_patch_size, 'params.detect_z_bands_fast_movie.normalization_mode': normalization_mode, 'params.predict_z_bands_fast_movie.clip_threshold': clip_thres} self.data.update(_dict) if self.auto_save: self.store_structure_data()
[docs] def analyze_cell_mask(self, frames: Union[str, int, List[int], np.ndarray] = 'all', threshold: float = 0.1) -> None: """ Analyzes the area occupied by cells in the given image(s) and calculates the average cell intensity and cell area ratio. Parameters ---------- threshold : float, optional Threshold value for binarizing the cell mask image. Pixels with intensity above threshold are considered cell. Defaults to 0.1. frames: {'all', int, list, np.ndarray}, optional Frames for z-band analysis ('all' for all frames, int for a single frame, list or ndarray for selected frames). Defaults to 'all'. """ if not os.path.exists(self.file_cell_mask): raise FileNotFoundError("Cell mask not found. Please run detect_sarcomeres first.") if (isinstance(frames, str) and frames == 'all') or (self.metadata['frames'] == 1 and frames == 0): cell_mask = tifffile.imread(self.file_cell_mask) images = self.read_imgs() list_frames = list(range(len(images))) elif np.issubdtype(type(frames), np.integer) or isinstance(frames, list) or type(frames) is np.ndarray: cell_mask = tifffile.imread(self.file_cell_mask, key=frames) images = self.read_imgs(frames=frames) if np.issubdtype(type(frames), np.integer): list_frames = [frames] else: list_frames = list(frames) else: raise ValueError('frames argument not valid') if len(cell_mask.shape) == 2: cell_mask = np.expand_dims(cell_mask, 0) if len(images.shape) == 2: images = np.expand_dims(images, 0) n_imgs = len(images) # create empty arrays cell_area, cell_area_ratio = np.full(n_imgs, fill_value=np.nan), np.full(n_imgs, fill_value=np.nan) cell_mask_intensity = np.full(n_imgs, fill_value=np.nan) for i, (img_i, cell_mask_i) in enumerate(zip(images, cell_mask)): # binarize mask mask_i = cell_mask_i > threshold # average cell intensity cell_mask_intensity[i] = np.mean(img_i[mask_i]) # total cell area and ratio to total image area cell_area[i] = np.sum(mask_i) * self.metadata['pixelsize'] ** 2 cell_area_ratio[i] = cell_area[i] / (img_i.shape[0] * img_i.shape[1] * self.metadata['pixelsize'] ** 2) _dict = {'cell_mask_area': cell_area, 'cell_mask_area_ratio': cell_area_ratio, 'cell_mask_intensity': cell_mask_intensity, 'params.analyze_cell_mask.frames': list_frames, 'params.analyze_cell_mask.threshold': threshold} self.data.update(_dict) if self.auto_save: self.store_structure_data()
[docs] def analyze_z_bands(self, frames: Union[str, int, List[int], np.ndarray] = 'all', threshold: float = 0.5, min_length: float = 0.2, median_filter_radius: float = 0.2, theta_phi_min: float = 0.4, a_min: float = 0.3, d_max: float = 3.0, d_min: float = 0.0, progress_notifier: ProgressNotifier = ProgressNotifier.progress_notifier_tqdm()) -> None: """ Segment and analyze sarcomere z-bands. Parameters ---------- frames: {'all', int, list, np.ndarray}, optional Frames for z-band analysis ('all' for all frames, int for a single frame, list or ndarray for selected frames). Defaults to 'all'. threshold : float, optional Threshold for binarizing z-bands prior to labeling (0 - 1). Defaults to 0.1. min_length : float, optional Minimal length of z-bands; smaller z-bands are removed (in µm). Defaults to 0.5. median_filter_radius : float, optional Radius of kernel to smooth sarcomere orientation field. Default is 0.2 µm. theta_phi_min : float, optional Minimal cosine of the angle between the pointed z-band vector and the connecting vector between ends of z-bands. Smaller values are not recognized as connections (for lateral alignment and distance analysis). Defaults to 0.25. a_min: float, optional Minimal lateral alignment between z-band ends to create a lateral connection. Defaults to 0.3. d_max : float, optional Maximal distance between z-band ends (in µm). Z-band end pairs with larger distances are not connected (for lateral alignment and distance analysis). Defaults to 5.0. d_min : float, optional Minimal distance between z-band ends (in µm). Z-band end pairs with smaller distances are not connected. Defaults to 0.25. progress_notifier: ProgressNotifier Wraps progress notification, default is progress notification done with tqdm """ if not os.path.exists(self.file_zbands): raise FileNotFoundError("Z-band mask not found. Please run detect_sarcomeres first.") if (isinstance(frames, str) and frames == 'all') or (self.metadata['frames'] == 1 and frames == 0): zbands = tifffile.imread(self.file_zbands) orientation_field = tifffile.imread(self.file_orientation) images = self.read_imgs() list_frames = list(range(len(images))) elif np.issubdtype(type(frames), np.integer) or isinstance(frames, list) or type(frames) is np.ndarray: zbands = tifffile.imread(self.file_zbands, key=frames) orientation_field = tifffile.imread(self.file_orientation)[frames] images = self.read_imgs(frames=frames) if np.issubdtype(type(frames), np.integer): list_frames = [frames] else: list_frames = list(frames) else: raise ValueError('frames argument not valid') if len(zbands.shape) == 2: zbands = np.expand_dims(zbands, 0) if len(images.shape) == 2: images = np.expand_dims(images, 0) if len(orientation_field.shape) == 3: orientation_field = np.expand_dims(orientation_field, 0) n_imgs = len(zbands) # create empty lists none_lists = lambda: [None] * self.metadata['frames'] z_length, z_intensity, z_straightness, z_orientation = (none_lists() for _ in range(4)) z_lat_neighbors, z_lat_alignment, z_lat_dist = (none_lists() for _ in range(3)) z_lat_size_groups, z_lat_length_groups, z_lat_alignment_groups = (none_lists() for _ in range(3)) z_labels, z_ends, z_lat_links, z_lat_groups = (none_lists() for _ in range(4)) # create empty arrays nan_arrays = lambda: np.full(self.metadata['frames'], np.nan) z_length_mean, z_length_std, z_length_max, z_length_sum, z_oop = (nan_arrays() for _ in range(5)) n_zbands, z_intensity_mean, z_intensity_std = (nan_arrays() for _ in range(3)) z_mask_area, z_mask_intensity, z_mask_area_ratio = (nan_arrays() for _ in range(3)) z_straightness_mean, z_straightness_std = (nan_arrays() for _ in range(2)) z_lat_neighbors_mean, z_lat_neighbors_std = (nan_arrays() for _ in range(2)) z_lat_alignment_mean, z_lat_alignment_std = (nan_arrays() for _ in range(2)) z_lat_dist_mean, z_lat_dist_std = (nan_arrays() for _ in range(2)) z_lat_size_groups_mean, z_lat_size_groups_std = (nan_arrays() for _ in range(2)) z_lat_length_groups_mean, z_lat_length_groups_std = (nan_arrays(), nan_arrays()) z_lat_alignment_groups_mean, z_lat_alignment_groups_std = (nan_arrays() for _ in range(2)) # iterate images print('\nStarting Z-band analysis...') for i, (frame_i, zbands_i, image_i, orientation_field_i) in enumerate( progress_notifier.iterator(zip(list_frames, zbands, images, orientation_field), total=n_imgs)): # segment z-bands labels_i, labels_skel_i = self.segment_z_bands(zbands_i) # analyze z-band features z_band_features = self._analyze_z_bands(zbands_i, labels_i, labels_skel_i, image_i, orientation_field_i, pixelsize=self.metadata['pixelsize'], threshold=threshold, min_length=min_length, median_filter_radius=median_filter_radius, a_min=a_min, theta_phi_min=theta_phi_min, d_max=d_max, d_min=d_min) ( z_length_i, z_intensity_i, z_straightness_i, z_mask_intensity_i, z_mask_area_i, orientation_i, z_oop_i, labels_list_i, labels_i, z_lat_neighbors_i, z_lat_dist_i, z_lat_alignment_i, z_lat_links_i, z_ends_i, z_lat_groups_i, z_lat_size_groups_i, z_lat_length_groups_i, z_lat_alignment_groups_i, ) = z_band_features # fill lists and arrays z_length[frame_i] = z_length_i z_intensity[frame_i] = z_intensity_i z_straightness[frame_i] = z_straightness_i z_lat_alignment[frame_i] = z_lat_alignment_i z_lat_neighbors[frame_i] = z_lat_neighbors_i z_orientation[frame_i] = orientation_i z_lat_dist[frame_i] = z_lat_dist_i z_lat_size_groups[frame_i] = z_lat_size_groups_i z_lat_length_groups[frame_i] = z_lat_length_groups_i z_lat_alignment_groups[frame_i] = z_lat_alignment_groups_i z_mask_area[frame_i], z_mask_intensity[frame_i], z_oop[ frame_i] = z_mask_area_i, z_mask_intensity_i, z_oop_i if 'cell_mask_area' in self.data: z_mask_area_ratio[frame_i] = z_mask_area_i / self.data['cell_mask_area'][frame_i] else: z_mask_area_ratio[frame_i] = z_mask_area_i / (self.metadata['size'][0] * self.metadata['size'][1]) z_labels[frame_i] = sparse.coo_matrix(labels_i) z_lat_links[frame_i] = z_lat_links_i z_ends[frame_i] = z_ends_i z_lat_groups[frame_i] = z_lat_groups_i # calculate mean and std of z-band features if len(z_length_i) > 0: z_length_mean[frame_i], z_length_std[frame_i], z_length_max[frame_i], z_length_sum[frame_i] = np.mean( z_length_i), np.std( z_length_i), np.max(z_length_i), np.sum(z_length_i) n_zbands[frame_i] = len(z_length_i) z_intensity_mean[frame_i], z_intensity_std[frame_i] = np.mean(z_intensity_i), np.std(z_intensity_i) z_straightness_mean[frame_i], z_straightness_std[frame_i] = np.mean(z_straightness_i), np.std( z_straightness_i) z_lat_neighbors_mean[frame_i], z_lat_neighbors_std[frame_i] = np.mean(z_lat_neighbors_i), np.std( z_lat_neighbors_i) z_lat_alignment_mean[frame_i], z_lat_alignment_std[frame_i] = np.nanmean(z_lat_alignment_i), np.nanstd( z_lat_alignment_i) z_lat_dist_mean[frame_i], z_lat_dist_std[frame_i] = np.nanmean(z_lat_dist_i), np.nanstd(z_lat_dist_i) z_lat_size_groups_mean[frame_i], z_lat_size_groups_std[frame_i] = np.nanmean( z_lat_size_groups_i), np.nanstd( z_lat_size_groups_i) z_lat_length_groups_mean[frame_i], z_lat_length_groups_std[frame_i] = np.nanmean( z_lat_length_groups_i), np.nanstd( z_lat_length_groups_i) z_lat_alignment_groups_mean[frame_i], z_lat_alignment_groups_std[frame_i] = np.nanmean( z_lat_alignment_groups_i), np.nanstd(z_lat_alignment_groups_i) # create and save dictionary for cell structure z_band_data = {'n_zbands': n_zbands, 'z_length': z_length, 'z_length_mean': z_length_mean, 'z_length_std': z_length_std, 'z_length_max': z_length_max, 'z_intensity': z_intensity, 'z_intensity_mean': z_intensity_mean, 'z_intensity_std': z_intensity_std, 'z_orientation': z_orientation, 'z_oop': z_oop, 'z_straightness': z_straightness, 'z_mask_intensity': z_mask_intensity, 'z_labels': z_labels, 'z_straightness_mean': z_straightness_mean, 'z_straightness_std': z_straightness_std, 'z_mask_area': z_mask_area, 'z_mask_area_ratio': z_mask_area_ratio, 'z_lat_neighbors': z_lat_neighbors, 'z_lat_neighbors_mean': z_lat_neighbors_mean, 'z_lat_neighbors_std': z_lat_neighbors_std, 'z_lat_alignment': z_lat_alignment, 'z_lat_alignment_mean': z_lat_alignment_mean, 'z_lat_alignment_std': z_lat_neighbors_std, 'z_lat_dist': z_lat_dist, 'z_ends': z_ends, 'z_lat_dist_mean': z_lat_dist_mean, 'z_lat_dist_std': z_lat_dist_std, 'z_lat_links': z_lat_links, 'z_lat_groups': z_lat_groups, 'z_lat_size_groups': z_lat_size_groups, 'z_lat_size_groups_mean': z_lat_size_groups_mean, 'z_lat_size_groups_std': z_lat_size_groups_std, 'z_lat_length_groups': z_lat_length_groups, 'z_lat_alignment_groups': z_lat_alignment_groups, 'z_lat_length_groups_mean': z_lat_length_groups_mean, 'z_lat_length_groups_std': z_lat_length_groups_std, 'z_lat_alignment_groups_mean': z_lat_alignment_groups_mean, 'z_lat_alignment_groups_std': z_lat_alignment_groups_std, 'params.analyze_z_bands.frames': list_frames, 'params.analyze_z_bands.threshold': threshold, 'params.analyze_z_bands.min_length': min_length, 'params.analyze_z_bands.median_filter_radius': median_filter_radius, 'params.analyze_z_bands.theta_phi_min': theta_phi_min, 'params.analyze_z_bands.d_max': d_max, 'params.analyze_z_bands.d_min': d_min} self.data.update(z_band_data) if self.auto_save: self.store_structure_data()
[docs] def analyze_sarcomere_vectors(self, frames: Union[str, int, List[int], np.ndarray] = 'all', threshold_mbands: float = 0.5, median_filter_radius: float = 0.25, linewidth: float = 0.2, interp_factor: int = 0, slen_lims: Tuple[float, float] = (1, 3), threshold_sarcomere_mask=0.1, backend='loky', progress_notifier: ProgressNotifier = ProgressNotifier.progress_notifier_tqdm()) -> None: """ Extract sarcomere orientation and length vectors. Parameters ---------- frames : {'all', int, list, np.ndarray}, optional frames for sarcomere vector analysis ('all' for all frames, int for a single frame, list or ndarray for selected frames). Defaults to 'all'. threshold_mbands : float, optional Threshold to binarize sarcomere M-bands. Lower values might result in more false-positive sarcomere vectors. Defaults to 0.2. median_filter_radius : float, optional Radius of kernel to smooth orientation field before assessing orientation at M-points, in µm (default 0.25 µm). linewidth : float, optional Line width of profile lines to analyze sarcomere lengths, in µm (default is 0.3 µm). interp_factor: int, optional Interpolation factor for profiles to calculate sarcomere length. Default to 4. slen_lims : tuple of float, optional Sarcomere size limits in µm (default is (1, 3) µm). threshold_sarcomere_mask : float Threshold to binarize sarcomere masks. Defaults to 0.1. backend : str, optional Backend for parallelization of profile processing. Defaults to 'loky'. progress_notifier: ProgressNotifier Wraps progress notification, default is progress notification done with tqdm Returns ------- sarcomere_orientation_points : np.ndarray Sarcomere orientation values at midline points. sarcomere_length_points : np.ndarray Sarcomere length values at midline points. """ if not os.path.exists(self.file_zbands): raise FileNotFoundError("Z-band mask not found. Please run detect_sarcomeres first.") _detected_frames = self.data['params.detect_sarcomeres.frames'] if ((isinstance(frames, str) and frames == 'all') or (self.metadata['frames'] == 1 and frames == 0) or (_detected_frames != 'all' and len(_detected_frames) == 1)): list_frames = list(range(self.metadata['frames'])) z_bands = tifffile.imread(self.file_zbands) mbands = tifffile.imread(self.file_mbands) orientation_field = tifffile.imread(self.file_orientation) sarcomere_mask = tifffile.imread(self.file_sarcomere_mask) elif np.issubdtype(type(frames), np.integer) or isinstance(frames, list) or isinstance(frames, np.ndarray): z_bands = tifffile.imread(self.file_zbands, key=frames) mbands = tifffile.imread(self.file_mbands, key=frames) orientation_field = tifffile.imread(self.file_orientation)[frames] sarcomere_mask = tifffile.imread(self.file_sarcomere_mask, key=frames) if np.issubdtype(type(frames), np.integer): list_frames = [frames] else: list_frames = [int(f) for f in frames] else: raise ValueError('frames argument not valid') if len(z_bands.shape) == 2: z_bands = np.expand_dims(z_bands, axis=0) if len(mbands.shape) == 2: mbands = np.expand_dims(mbands, axis=0) if len(sarcomere_mask.shape) == 2: sarcomere_mask = np.expand_dims(sarcomere_mask, axis=0) if len(orientation_field.shape) == 3: orientation_field = np.expand_dims(orientation_field, axis=0) # binarize M-bands mbands = mbands > threshold_mbands n_frames = len(z_bands) pixelsize = self.metadata['pixelsize'] # create empty arrays none_lists = lambda: [None] * self.metadata['frames'] nan_arrays = lambda: np.full(self.metadata['frames'], np.nan) (pos_vectors, pos_vectors_px, sarcomere_length_vectors, sarcomere_orientation_vectors) = (none_lists() for _ in range(4)) midline_id_vectors, midline_length_vectors = (none_lists() for _ in range(2)) sarcomere_masks = np.zeros((self.metadata['frames'], *self.metadata['size']), dtype=bool) (sarcomere_length_mean, sarcomere_length_std) = (nan_arrays() for _ in range(2)) sarcomere_orientation_mean, sarcomere_orientation_std = nan_arrays(), nan_arrays() n_vectors, n_mbands, oop, sarcomere_area, sarcomere_area_ratio, score_thresholds = (nan_arrays() for _ in range(6)) # iterate images print('\nStarting sarcomere length and orientation analysis...') for i, (frame_i, zbands_i, mbands_i, orientation_field_i, sarcomere_mask_i) in enumerate( progress_notifier.iterator(zip(list_frames, z_bands, mbands, orientation_field, sarcomere_mask), total=n_frames)): ( pos_vectors_px_i, pos_vectors_i, midline_id_vectors_i, midline_length_vectors_i, sarcomere_length_vectors_i, sarcomere_orientation_vectors_i, n_mbands_i) = self.get_sarcomere_vectors(zbands_i, mbands_i, orientation_field_i, pixelsize=pixelsize, median_filter_radius=median_filter_radius, slen_lims=slen_lims, interp_factor=interp_factor, linewidth=linewidth, backend=backend) # write in list n_vectors[frame_i] = len(sarcomere_length_vectors_i) n_mbands[frame_i] = n_mbands_i pos_vectors_px[frame_i] = pos_vectors_px_i pos_vectors[frame_i] = pos_vectors_i sarcomere_length_vectors[frame_i] = sarcomere_length_vectors_i sarcomere_orientation_vectors[frame_i] = sarcomere_orientation_vectors_i midline_id_vectors[frame_i] = midline_id_vectors_i midline_length_vectors[frame_i] = midline_length_vectors_i # calculate mean and std of sarcomere length and orientation sarcomere_length_mean[frame_i], sarcomere_length_std[frame_i], = np.nanmean( sarcomere_length_vectors_i), np.nanstd(sarcomere_length_vectors_i) if np.count_nonzero(~np.isnan(sarcomere_orientation_vectors_i)) > 1: sarcomere_orientation_mean[frame_i], sarcomere_orientation_std[frame_i] = stats.circmean( sarcomere_orientation_vectors_i[~np.isnan(sarcomere_orientation_vectors_i)]), stats.circstd( sarcomere_orientation_vectors_i[~np.isnan(sarcomere_orientation_vectors_i)]) # orientation order parameter if len(sarcomere_orientation_vectors_i) > 0: oop[frame_i], _ = Utils.analyze_orientations( sarcomere_orientation_vectors_i[~np.isnan(sarcomere_orientation_vectors_i)]) # calculate sarcomere mask area sarcomere_masks[frame_i] = sarcomere_mask_i > threshold_sarcomere_mask sarcomere_area[frame_i] = np.sum(sarcomere_mask_i) * self.metadata['pixelsize'] ** 2 if 'cell_mask_area' in self.data: sarcomere_area_ratio[frame_i] = sarcomere_area[frame_i] / self.data['cell_mask_area'][i] vectors_dict = {'params.analyze_sarcomere_vectors.frames': list_frames, 'params.analyze_sarcomere_vectors.threshold_sarcomere_mask': threshold_sarcomere_mask, 'params.analyze_sarcomere_vectors.median_filter_radius': median_filter_radius, 'params.analyze_sarcomere_vectors.slen_lims': slen_lims, 'params.analyze_sarcomere_vectors.interp_factor': interp_factor, 'params.analyze_sarcomere_vectors.linewidth': linewidth, 'n_vectors': n_vectors, 'n_mbands': n_mbands, 'pos_vectors_px': pos_vectors_px, 'pos_vectors': pos_vectors, 'sarcomere_length_vectors': sarcomere_length_vectors, 'sarcomere_orientation_vectors': sarcomere_orientation_vectors, 'sarcomere_area': sarcomere_area, 'sarcomere_area_ratio': sarcomere_area_ratio, 'midline_length_vectors': midline_length_vectors, 'midline_id_vectors': midline_id_vectors, 'sarcomere_length_mean': sarcomere_length_mean, 'sarcomere_length_std': sarcomere_length_std, 'sarcomere_orientation_mean': sarcomere_orientation_mean, 'sarcomere_orientation_std': sarcomere_orientation_std, 'sarcomere_oop': oop} self.data.update(vectors_dict) if self.auto_save: self.store_structure_data()
[docs] def analyze_myofibrils(self, frames: Optional[Union[str, int, List[int], np.ndarray]] = None, ratio_seeds: float = 0.1, persistence: int = 3, threshold_distance: float = 0.5, n_min: int = 4, median_filter_radius: float = 0.5, progress_notifier: ProgressNotifier = ProgressNotifier.progress_notifier_tqdm()) -> None: """ Estimate myofibril lines by line growth algorithm and analyze length and curvature. Parameters ---------- frames : {'all', int, list, np.ndarray}, optional frames for myofibril analysis ('all' for all frames, int for a single frame, list or ndarray for selected frames). If None, frames from sarcomere vector analysis are used. Defaults to None. ratio_seeds : float, optional Ratio of sarcomere vector used as seeds for line growth. Defaults to 0.1. persistence : int, optional Persistence of line (average vector length and orientation for prior estimation), needs to be > 0. Defaults to 3. threshold_distance : float, optional Maximal distance for nearest neighbor estimation (in micrometers). Defaults to 0.3. n_min : int, optional Minimal number of sarcomere line segments per line. Shorter lines are removed. Defaults to 5. median_filter_radius : float, optional Filter radius for smoothing myofibril length map (in micrometers). Defaults to 0.5. progress_notifier: ProgressNotifier Wraps progress notification, default is progress notification done with tqdm """ if 'pos_vectors_px' not in self.data: raise ValueError('Sarcomere length and orientation not yet analyzed. Run analyze_sarcomere_vectors first.') if frames is not None: if (isinstance(frames, str) and frames == 'all') or (self.metadata['frames'] == 1 and frames == 0): frames = list(range(self.metadata['frames'])) if np.issubdtype(type(frames), np.integer): frames = [frames] if not set(frames).issubset(self.data['params.analyze_sarcomere_vectors.frames']): raise ValueError(f'Run analyze_sarcomere_vectors first for frames {frames}.') elif frames is None: if 'params.analyze_sarcomere_vectors.frames' in self.data.keys(): frames = self.data['params.analyze_sarcomere_vectors.frames'] else: raise ValueError("To use frames from sarcomere vector analysis, run 'analyze_sarcomere vectors' first!") if frames == 'all': n_imgs = self.metadata['frames'] list_frames = list(range(n_imgs)) elif isinstance(frames, int): list_frames = [frames] elif isinstance(frames, list) or type(frames) is np.ndarray: list_frames = list(frames) else: raise ValueError('Selection of frames not valid!') pos_vectors_px = [self.data['pos_vectors_px'][frame] for frame in list_frames] pos_vectors = [self.data['pos_vectors'][frame] for frame in list_frames] sarcomere_length_vectors = [self.data['sarcomere_length_vectors'][frame] for frame in list_frames] sarcomere_orientation_vectors = [self.data['sarcomere_orientation_vectors'][frame] for frame in list_frames] midline_length_vectors = [self.data['midline_length_vectors'][frame] for frame in list_frames] # create empty arrays none_lists = lambda: [None] * self.metadata['frames'] nan_arrays = lambda: np.full(self.metadata['frames'], np.nan) length_mean, length_std, length_max = (nan_arrays() for _ in range(3)) straightness_mean, straightness_std = (nan_arrays() for _ in range(2)) bending_mean, bending_std = (nan_arrays() for _ in range(2)) myof_lines, lengths, straightness, frechet_straightness, bending = (none_lists() for _ in range(5)) # iterate frames print('\nStarting myofibril line analysis...') for i, ( frame_i, pos_vectors_px_i, pos_vectors_i, sarcomere_length_vectors_i, sarcomere_orientation_vectors_i, midline_length_vectors_i) in enumerate( progress_notifier.iterator( zip(list_frames, pos_vectors_px, pos_vectors, sarcomere_length_vectors, sarcomere_orientation_vectors, midline_length_vectors), total=len(pos_vectors_px))): if len(np.asarray(pos_vectors_px_i).T) > 0: line_data_i = self.line_growth(pos_vectors_px_i, sarcomere_length_vectors_i, sarcomere_orientation_vectors_i, midline_length_vectors_t=midline_length_vectors_i, pixelsize=self.metadata['pixelsize'], ratio_seeds=ratio_seeds, persistence=persistence, threshold_distance=threshold_distance, n_min=n_min) lines_i = line_data_i['lines'] if len(lines_i) > 0: # line lengths and mean squared curvature (msc) lengths_i = line_data_i['line_features']['length_lines'] straightness_i = line_data_i['line_features']['straightness_lines'] bending_i = line_data_i['line_features']['bending_lines'] if len(lengths_i) > 0: # create myofibril length map myof_map_i = self.create_myofibril_length_map(myof_lines=lines_i, myof_length=lengths_i, pos_vectors=pos_vectors_i, sarcomere_orientation_vectors=sarcomere_orientation_vectors_i, sarcomere_length_vectors=sarcomere_length_vectors_i, size=self.metadata['size'], pixelsize=self.metadata['pixelsize'], median_filter_radius=median_filter_radius) myof_map_flat_i = myof_map_i.flatten() myof_map_flat_i = myof_map_flat_i[~np.isnan(myof_map_flat_i)] weights_i = 1.0 / myof_map_flat_i weighted_mean_length_i = np.average(myof_map_flat_i, weights=weights_i) weighted_std_length_i = np.sqrt(np.average((myof_map_flat_i - weighted_mean_length_i) ** 2, weights=weights_i)) length_mean[frame_i], length_std[frame_i], length_max[frame_i] = (weighted_mean_length_i, weighted_std_length_i, np.nanmax(myof_map_flat_i)) straightness_mean[frame_i], straightness_std[frame_i] = (np.mean(straightness_i), np.std(straightness_i)) bending_mean[frame_i], bending_std[frame_i] = (np.mean(bending_i), np.std(bending_i)) myof_lines[frame_i] = lines_i lengths[frame_i] = lengths_i straightness[frame_i] = straightness_i bending[frame_i] = bending_i # update structure dictionary myofibril_data = {'myof_length_mean': length_mean, 'myof_length_std': length_std, 'myof_lines': myof_lines, 'myof_length_max': length_max, 'myof_length': lengths, 'myof_straightness': straightness, 'myof_straightness_mean': straightness_mean, 'myof_straightness_std': straightness_std, 'myof_bending': bending, 'myof_bending_mean': bending_mean, 'myof_bending_std': bending_std, 'params.analyze_myofibrils.persistence': persistence, 'params.analyze_myofibrils.threshold_distance': threshold_distance, 'params.analyze_myofibrils.frames': list_frames, 'params.analyze_myofibrils.n_min': n_min, 'params.analyze_myofibrils.ratio_seeds': ratio_seeds, 'params.analyze_myofibrils.median_filter_radius': median_filter_radius } self.data.update(myofibril_data) if self.auto_save: self.store_structure_data()
[docs] def analyze_sarcomere_domains(self, frames: Optional[Union[str, int, List[int], np.ndarray]] = None, d_max: float = 3, cosine_min: float = 0.65, leiden_resolution: float = 0.06, random_seed: int = 42, area_min: float = 20.0, dilation_radius: float = 0.3, progress_notifier: ProgressNotifier = ProgressNotifier.progress_notifier_tqdm()) -> None: """ Cluster sarcomeres into domains based on their spatial and orientational properties using the Leiden algorithm for community detection. Parameters ---------- frames : {'all', int, list, np.ndarray}, optional frames for domain analysis ('all' for all frames, int for a single frame, list or ndarray for selected frames). If None, frames from sarcomere vector analysis are used. Defaults to None. d_max : float Max. distance threshold for creating a network edge between vector ends cosine_min : float Minimal absolute cosine between vector angles for creating a network edge between vector ends leiden_resolution : float, optional Control parameter for domain size. If resolution is small, the algorithm favors larger domains. Greater resolution favors smaller domains. Defaults to 0.05. random_seed : int, optional Random seed for Leiden algorithm, to ensure reproducibility. Defaults to 2. area_min : float, optional Minimal area of domains/clusters (in µm^2). Defaults to 50.0. dilation_radius : float, optional Dilation radius for refining domain area masks, in µm. Defaults to 0.3. progress_notifier: ProgressNotifier Wraps progress notification, default is progress notification done with tqdm """ if 'pos_vectors' not in self.data: raise ValueError('Sarcomere length and orientation not yet analyzed. Run analyze_sarcomere_vectors first.') if frames is not None: if (isinstance(frames, str) and frames == 'all') or (self.metadata['frames'] == 1 and frames == 0): frames = list(range(self.metadata['frames'])) if np.issubdtype(type(frames), np.integer): frames = [frames] if not set(frames).issubset(self.data['params.analyze_sarcomere_vectors.frames']): raise ValueError(f'Run analyze_sarcomere_vectors first for frames {frames}.') elif frames is None: if 'params.analyze_sarcomere_vectors.frames' in self.data.keys(): frames = self.data['params.analyze_sarcomere_vectors.frames'] else: raise ValueError("To use frames from sarcomere vector analysis, run 'analyze_sarcomere_vectors' first!") if frames == 'all': n_imgs = self.metadata['frames'] list_frames = list(range(n_imgs)) elif isinstance(frames, int): n_imgs = 1 list_frames = [frames] elif isinstance(frames, list) or type(frames) is np.ndarray: n_imgs = len(frames) list_frames = list(frames) else: raise ValueError('Selection of frames not valid!') pos_vectors = [np.asarray(self.data['pos_vectors'][t]) for t in list_frames] sarcomere_length_vectors = [np.asarray(self.data['sarcomere_length_vectors'][t]) for t in list_frames] sarcomere_orientation_vectors = [np.asarray(self.data['sarcomere_orientation_vectors'][t]) for t in list_frames] midline_id_vectors = [np.asarray(self.data['midline_id_vectors'][t]) for t in list_frames] # create empty arrays none_lists = lambda: [None] * self.metadata['frames'] nan_arrays = lambda: np.full(self.metadata['frames'], np.nan) n_domains, domain_area_mean, domain_area_std = (nan_arrays() for _ in range(3)) domain_slen_mean, domain_slen_std = (nan_arrays() for _ in range(2)) domain_oop_mean, domain_oop_std = (nan_arrays() for _ in range(2)) (domains, domain_area, domain_slen, domain_slen_std, domain_oop, domain_orientation) = (none_lists() for _ in range(6)) # iterate frames print('\nStarting sarcomere domain analysis...') for i, (frame_i, pos_vectors_i, sarcomere_length_vectors_i, sarcomere_orientation_vectors_i, midline_id_vectors_i) in enumerate( progress_notifier.iterator( zip(list_frames, pos_vectors, sarcomere_length_vectors, sarcomere_orientation_vectors, midline_id_vectors), total=len(pos_vectors))): cluster_data_t = self.cluster_sarcomeres(pos_vectors_i, sarcomere_length_vectors_i, sarcomere_orientation_vectors_i, pixelsize=self.metadata['pixelsize'], size=self.metadata['size'], d_max=d_max, cosine_min=cosine_min, leiden_resolution=leiden_resolution, random_seed=random_seed, area_min=area_min, dilation_radius=dilation_radius) (n_domains[frame_i], domains[frame_i], domain_area[frame_i], domain_slen[frame_i], domain_slen_std[frame_i], domain_oop[frame_i], domain_orientation[frame_i], domain_mask_i) = cluster_data_t # calculate mean and std of domains domain_area_mean[frame_i], domain_area_std[frame_i] = np.mean(domain_area[frame_i]), np.std( domain_area[frame_i]) domain_slen_mean[frame_i], domain_slen_std[frame_i] = ( np.mean(domain_slen[frame_i]), np.std(domain_slen[frame_i])) domain_oop_mean[frame_i], domain_oop_std[frame_i] = ( np.mean(domain_oop[frame_i]), np.std(domain_oop[frame_i])) # update structure dictionary domain_data = {'n_domains': n_domains, 'domains': domains, 'domain_area': domain_area, 'domain_area_mean': domain_area_mean, 'domain_area_std': domain_area_std, 'domain_slen': domain_slen, 'domain_slen_mean': domain_slen_mean, 'domain_slen_std': domain_slen_std, 'domain_oop': domain_oop, 'domain_oop_mean': domain_oop_mean, 'domain_oop_std': domain_oop_std, 'domain_orientation': domain_orientation, 'params.analyze_sarcomere_domains.frames': list_frames, 'params.analyze_sarcomere_domains.d_max': d_max, 'params.analyze_sarcomere_domains.cosine_min': cosine_min, 'params.analyze_sarcomere_domains.leiden_resolution': leiden_resolution, 'params.analyze_sarcomere_domains.area_min': area_min, 'params.analyze_sarcomere_domains.dilation_radius': dilation_radius} self.data.update(domain_data) if self.auto_save: self.store_structure_data()
[docs] def _grow_lois(self, frame: int = 0, ratio_seeds: float = 0.1, persistence: int = 2, threshold_distance: float = 0.3, random_seed: Union[None, int] = None) -> None: """ Find LOIs (lines of interest) using a line growth algorithm. The parameters **lims can be used to filter LOIs. Parameters ---------- frame : int, optional Frame to select frame. Selects i-th frame of frames specified in sarcomere vector analysis. Defaults to 0. ratio_seeds : float, optional Ratio of sarcomere vectors to take as seeds for line growth. Default 0.1. persistence : int, optional Persistence of line (average vector length and orientation for prior estimation). Defaults to 2. threshold_distance : float, optional Maximal distance for nearest neighbor estimation. Defaults to 0.5. random_seed : int, optional Random seed for reproducibility. Defaults to None. """ # select midline point data at frame (pos_vectors, sarcomere_length_vectors, sarcomere_orientation_vectors, midline_length_vectors) = self.data['pos_vectors_px'][frame], \ self.data['sarcomere_length_vectors'][frame], \ self.data['sarcomere_orientation_vectors'][frame], \ self.data['midline_length_vectors'][frame] loi_data = self.line_growth(points_t=pos_vectors, sarcomere_length_vectors_t=sarcomere_length_vectors, sarcomere_orientation_vectors_t=sarcomere_orientation_vectors, midline_length_vectors_t=midline_length_vectors, pixelsize=self.metadata['pixelsize'], ratio_seeds=ratio_seeds, persistence=persistence, threshold_distance=threshold_distance, random_seed=random_seed) self.data['loi_data'] = loi_data lois_vectors = [self.data['pos_vectors_px'][frame][loi_i] for loi_i in self.data['loi_data']['lines']] self.data['loi_data']['lines_vectors'] = lois_vectors if self.auto_save: self.store_structure_data()
[docs] def _filter_lois(self, number_lims: Tuple[int, int] = (10, 100), length_lims: Tuple[float, float] = (0, 200), sarcomere_mean_length_lims: Tuple[float, float] = (1, 3), sarcomere_std_length_lims: Tuple[float, float] = (0, 1), midline_mean_length_lims: Tuple[float, float] = (0, 50), midline_std_length_lims: Tuple[float, float] = (0, 50), midline_min_length_lims: Tuple[float, float] = (0, 50), ) -> None: """ Filters Lines of Interest (LOIs) based on various geometric and morphological criteria. Parameters ---------- number_lims : tuple of int, optional Limits of sarcomere numbers in LOI (min, max). Defaults to (10, 100). length_lims : tuple of float, optional Limits for LOI lengths (in µm) (min, max). Defaults to (0, 200). sarcomere_mean_length_lims : tuple of float, optional Limits for mean length of sarcomeres in LOI (min, max). Defaults to (1, 3). sarcomere_std_length_lims : tuple of float, optional Limits for standard deviation of sarcomere lengths in LOI (min, max). Defaults to (0, 1). midline_mean_length_lims : tuple of float, optional Limits for mean length of the midline in LOI (min, max). Defaults to (0, 50). midline_std_length_lims : tuple of float, optional Limits for standard deviation of the midline length in LOI (min, max). Defaults to (0, 50). midline_min_length_lims : tuple of float, optional Limits for minimum length of the midline in LOI (min, max). Defaults to (0, 50). """ # Retrieve LOIs and their features from the structure dict lois = self.data['loi_data']['lines'] loi_features = self.data['loi_data']['line_features'] lois_vectors = self.data['loi_data']['lines_vectors'] # Convert feature lists to numpy arrays for boolean operations n_vectors = np.array(loi_features['n_vectors_lines']) length = np.array(loi_features['length_lines']) sarc_mean = np.array(loi_features['sarcomere_mean_length_lines']) sarc_std = np.array(loi_features['sarcomere_std_length_lines']) mid_mean = np.array(loi_features['midline_mean_length_lines']) mid_std = np.array(loi_features['midline_std_length_lines']) mid_min = np.array(loi_features['midline_min_length_lines']) # Apply filters based on the provided limits is_good = ( (n_vectors >= number_lims[0]) & (n_vectors < number_lims[1]) & (length >= length_lims[0]) & (length < length_lims[1]) & (sarc_mean >= sarcomere_mean_length_lims[0]) & (sarc_mean < sarcomere_mean_length_lims[1]) & (sarc_std >= sarcomere_std_length_lims[0]) & (sarc_std < sarcomere_std_length_lims[1]) & (mid_mean >= midline_mean_length_lims[0]) & (mid_mean < midline_mean_length_lims[1]) & (mid_std >= midline_std_length_lims[0]) & (mid_std < midline_std_length_lims[1]) & (mid_min >= midline_min_length_lims[0]) & (mid_min < midline_min_length_lims[1]) ) # Filter the lines and vectors self.data['loi_data']['lines'] = [loi for i, loi in enumerate(lois) if is_good[i]] self.data['loi_data']['lines_vectors'] = [pos_vectors for i, pos_vectors in enumerate(lois_vectors) if is_good[i]] # Filter the features dataframe and convert back to dict df_features = pd.DataFrame(loi_features) filtered_df_features = df_features[is_good].reset_index(drop=True) self.data['loi_data']['line_features'] = filtered_df_features.to_dict(orient='list')
[docs] def _hausdorff_distance_lois(self, symmetry_mode: str = 'max') -> None: """ Compute Hausdorff distances between all good LOIs. Parameters ---------- symmetry_mode : str, optional Choose 'min' or 'max', whether min/max(H(loi_i, loi_j), H(loi_j, loi_i)). Defaults to 'max'. """ # get points of LOI lines lines_vectors = self.data['loi_data']['lines_vectors'] # hausdorff distance between LOIss hausdorff_dist_matrix = np.zeros((len(lines_vectors), len(lines_vectors))) for i, loi_i in enumerate(lines_vectors): for j, loi_j in enumerate(lines_vectors): if symmetry_mode == 'min': hausdorff_dist_matrix[i, j] = min(directed_hausdorff(loi_i, loi_j)[0], directed_hausdorff(loi_j, loi_i)[0]) if symmetry_mode == 'max': hausdorff_dist_matrix[i, j] = max(directed_hausdorff(loi_i, loi_j)[0], directed_hausdorff(loi_j, loi_i)[0]) self.data['loi_data']['hausdorff_dist_matrix'] = hausdorff_dist_matrix if self.auto_save: self.store_structure_data()
[docs] def _cluster_lois(self, distance_threshold_lois: float = 40, linkage: str = 'single') -> None: """ Agglomerative clustering of good LOIs using predefined Hausdorff distance matrix using scikit-learn. Parameters ---------- distance_threshold_lois : float, optional The linkage distance threshold above which clusters will not be merged. Defaults to 40. linkage : {'complete', 'average', 'single'}, optional Which linkage criterion to use. The linkage criterion determines which distance to use between sets of observations. The algorithm will merge the pairs of clusters that minimize this criterion. - 'average' uses the average of the distances of each observation of the two sets. - 'complete' or 'maximum' linkage uses the maximum distances between all observations of the two sets. - 'single' uses the minimum of the distances between all observations of the two sets. Defaults to 'single'. """ if len(self.data['loi_data']['lines_vectors']) == 0: self.data['loi_data']['line_cluster'] = [] self.data['loi_data']['n_lines_clusters'] = 0 elif len(self.data['loi_data']['lines_vectors']) == 1: self.data['loi_data']['line_cluster'] = [[0]] self.data['loi_data']['n_lines_clusters'] = 1 else: clustering = AgglomerativeClustering(n_clusters=None, distance_threshold=distance_threshold_lois, metric='precomputed', linkage=linkage).fit( self.data['loi_data']['hausdorff_dist_matrix']) self.data['loi_data']['line_cluster'] = clustering.labels_ self.data['loi_data']['n_lines_clusters'] = len(np.unique(clustering.labels_)) if self.auto_save: self.store_structure_data()
[docs] def _fit_straight_line(self, add_length=1, n_lois=None): """Fit linear lines to cluster points Parameters ---------- add_length : float Elongate line at end with add_length (in length unit) n_lois : int If int, only n longest LOIs are saved. If None, all are saved. """ def linear(x, a, b): return a * x + b points_clusters = [] loi_lines = [] len_loi_lines = [] add_length = add_length / self.metadata['pixelsize'] for label_i in range(self.data['loi_data']['n_lines_clusters']): points_cluster_i = [] for k in np.where(self.data['loi_data']['line_cluster'] == label_i)[0]: points_cluster_i.append(self.data['loi_data']['lines_vectors'][k]) points_clusters.append(np.concatenate(points_cluster_i).T) p_i, pcov_i = curve_fit(linear, points_clusters[label_i][1], points_clusters[label_i][0]) x_range_i = np.linspace(np.min(points_clusters[label_i][1]) - add_length / np.sqrt(1 + p_i[0] ** 2), np.max(points_clusters[label_i][1]) + add_length / np.sqrt(1 + p_i[0] ** 2), num=2) y_i = linear(x_range_i, p_i[0], p_i[1]) len_i = np.sqrt(np.diff(x_range_i) ** 2 + np.diff(y_i) ** 2) x_range_i, y_i = np.round(x_range_i, 1), np.round(y_i, 1) loi_lines.append(np.asarray((y_i, x_range_i)).T) len_loi_lines.append(len_i) len_loi_lines = np.asarray(len_loi_lines).flatten() loi_lines = np.asarray(loi_lines) # sort lines by length length_idxs = len_loi_lines.argsort() loi_lines = loi_lines[length_idxs[::-1]][:n_lois] len_loi_lines = len_loi_lines[length_idxs[::-1]][:n_lois] self.data['loi_data']['loi_lines'] = np.asarray(loi_lines) self.data['loi_data']['len_loi_lines'] = np.asarray(len_loi_lines) if self.auto_save: self.store_structure_data()
[docs] def _longest_in_cluster(self, n_lois, frame): lines = self.data['loi_data']['lines'] pos_vectors = self.data['pos_vectors_px'][frame] lines_cluster = np.asarray(self.data['loi_data']['line_cluster']) longest_lines = [] for label_i in range(self.data['loi_data']['n_lines_clusters']): lines_cluster_i = [line_j for j, line_j in enumerate(lines) if lines_cluster[j] == label_i] points_lines_cluster_i = [pos_vectors[line_j] for j, line_j in enumerate(lines) if lines_cluster[j] == label_i] length_lines_cluster_i = [len(line_j) for line_j in lines_cluster_i] longest_line = points_lines_cluster_i[np.argmax(length_lines_cluster_i)] longest_lines.append(longest_line) # get n longest lines sorted_by_length = sorted(longest_lines, key=lambda x: len(x[1]), reverse=True) if len(longest_lines) < n_lois: print(f'Only {len(longest_lines)}<{n_lois} clusters identified.') loi_lines = sorted_by_length[:n_lois] self.data['loi_data']['loi_lines'] = loi_lines self.data['loi_data']['len_loi_lines'] = [len(line_i) for line_i in loi_lines] if self.auto_save: self.store_structure_data()
[docs] def _random_from_cluster(self, n_lois, frame): lines = self.data['loi_data']['lines'] pos_vectors = self.data['pos_vectors_px'][frame] lines_cluster = np.asarray(self.data['loi_data']['line_cluster']) random_lines = [] for label_i in range(self.data['loi_data']['n_lines_clusters']): points_lines_cluster_i = [pos_vectors[line_j] for j, line_j in enumerate(lines) if lines_cluster[j] == label_i] random_line = random.choice(points_lines_cluster_i) random_lines.append(random_line) # select clusters randomly loi_lines = random.sample(random_lines, n_lois) self.data['loi_data']['loi_lines'] = loi_lines self.data['loi_data']['len_loi_lines'] = [len(line_i) for line_i in loi_lines] if self.auto_save: self.store_structure_data()
[docs] def _random_lois(self, n_lois, frame): lines = self.data['loi_data']['lines'] pos_vectors = self.data['pos_vectors_px'][frame] loi_lines = random.sample(lines, n_lois) loi_lines = [pos_vectors[line_i] for line_i in loi_lines] self.data['loi_data']['loi_lines'] = loi_lines self.data['loi_data']['len_loi_lines'] = [len(line_i) for line_i in loi_lines] if self.auto_save: self.store_structure_data()
[docs] def create_loi_data(self, line: np.ndarray, linewidth: float = 0.65, order: int = 0, export_raw: bool = False) -> None: """ Extract intensity kymograph along LOI and create LOI file from line. Parameters ---------- line : np.ndarray Line start and end coordinates ((start_x, start_y), (end_x, end_y)) or list of segments [(x0, y0), (x1, y1), (x2, y2), ...] linewidth : float, optional Width of the scan in µm, perpendicular to the line. Defaults to 0.65. order : int, optional The order of the spline interpolation, default is 0 if image.dtype is bool and 1 otherwise. The order has to be in the range 0-5. See `skimage.transform.warp` for details. Defaults to 0. export_raw : bool, optional If True, intensity kymograph along LOI from raw microscopy image is additionally stored. Defaults to False. """ if os.path.exists(self.file_zbands_fast_movie): file_z_bands = self.file_zbands_fast_movie else: file_z_bands = self.file_zbands imgs_sarcomeres = tifffile.imread(file_z_bands) profiles = self.kymograph_movie(imgs_sarcomeres, line, order=order, linewidth=int(linewidth / self.metadata['pixelsize'])) profiles = np.asarray(profiles) if export_raw: imgs_raw = tifffile.imread(self.filepath) profiles_raw = self.kymograph_movie(imgs_raw, line, order=order, linewidth=int(linewidth / self.metadata['pixelsize'])) else: profiles_raw = None # length of line def __calculate_segmented_line_length(line): diffs = np.diff(line, axis=0) lengths = np.sqrt(np.sum(diffs ** 2, axis=1)) return np.sum(lengths) length = __calculate_segmented_line_length(line) * self.metadata['pixelsize'] loi_data = {'profiles': profiles, 'profiles_raw': profiles_raw, 'line': line, 'linewidth': linewidth, 'length': length} for key, value in loi_data.items(): loi_data[key] = np.asarray(value) save_name = os.path.join(self.base_dir, f'{line[0][0]}_{line[0][1]}_{line[-1][0]}_{line[-1][1]}_{linewidth}_loi.json') IOUtils.json_serialize(loi_data, save_name)
[docs] def detect_lois(self, frame: int = 0, n_lois: int = 4, ratio_seeds: float = 0.1, persistence: int = 4, threshold_distance: float = 0.5, mode: str = 'longest_in_cluster', random_seed: Optional[int] = None, number_lims: Tuple[int, int] = (10, 50), length_lims: Tuple[float, float] = (0, 200), sarcomere_mean_length_lims: Tuple[float, float] = (1, 3), sarcomere_std_length_lims: Tuple[float, float] = (0, 1), midline_mean_length_lims: Tuple[float, float] = (0, 50), midline_std_length_lims: Tuple[float, float] = (0, 50), midline_min_length_lims: Tuple[float, float] = (0, 50), distance_threshold_lois: float = 40, linkage: str = 'single', linewidth: float = 0.65, order: int = 0, export_raw: bool = False) -> None: """ Detects Regions of Interest (LOIs) for tracking sarcomere Z-band motion and creates kymographs. This method integrates several steps: growing LOIs based on seed vectors, filtering LOIs based on specified criteria, clustering LOIs, fitting lines to LOI clusters, and extracting intensity profiles to generate kymographs. Parameters ---------- frame : int The index of the frame to select for analysis. n_lois : int Number of LOIs. ratio_seeds : float Ratio of sarcomere vectors to take as seed vectors for initiating LOI growth. persistence : int Persistence parameter influencing line growth direction and termination. threshold_distance : float Maximum distance for nearest neighbor estimation during line growth. mode : str Mode for selecting LOIs from identified clusters. - 'fit_straight_line' fits a straight line to all midline points in the cluster. - 'longest_in_cluster' selects the longest line of each cluster, also allowing curved LOIs. - 'random_from_cluster' selects a random line from each cluster, also allowing curved LOIs. - 'random_line' selects a set of random lines that fulfil the filtering criteria. random_seed : int, optional Random seed for selection of random starting vectors for line growth algorithm, for reproducible outcomes. If None, no random seed is set, and outcomes in every run will differ. number_lims : tuple of int Limits for the number of sarcomeres within an LOI (min, max). length_lims : tuple of float Length limits for LOIs (in µm) (min, max). sarcomere_mean_length_lims : tuple of float Limits for the mean length of sarcomeres within an LOI (min, max). sarcomere_std_length_lims : tuple of float Limits for the standard deviation of sarcomere lengths within an LOI (min, max). midline_mean_length_lims : tuple of float Limits for the mean length of the midline of vectors in LOI (min, max). midline_std_length_lims : tuple of float Limits for the standard deviation of the midline length of vectors in LOI (min, max). midline_min_length_lims : tuple of float Limits for the minimum length of the midline of vectors in LOI (min, max). distance_threshold_lois : float Distance threshold for clustering LOIs. Clusters will not be merged above this threshold. linkage : str Linkage criterion for clustering ('complete', 'average', 'single'). linewidth : float Width of the scan line (in µm), perpendicular to the LOIs. order : int Order of spline interpolation for transforming LOIs (range 0-5). export_raw : bool If True, exports raw intensity kymographs along LOIs. Returns ------- None """ if 'pos_vectors' not in self.data: raise ValueError('Sarcomere length and orientation not yet analyzed. Run analyze_sarcomere_vectors first.') if self.metadata['frames'] == 1: raise ValueError('LOI detection not possible in single images. ' 'Sarcomere motion tracking is only possible in high-speed movies; (t, x, y) stacks.') # Grow LOIs based on seed vectors and specified parameters self._grow_lois(frame=frame, ratio_seeds=ratio_seeds, random_seed=random_seed, persistence=persistence, threshold_distance=threshold_distance) # Filter LOIs based on geometric and morphological criteria self._filter_lois(number_lims=number_lims, length_lims=length_lims, sarcomere_mean_length_lims=sarcomere_mean_length_lims, sarcomere_std_length_lims=sarcomere_std_length_lims, midline_mean_length_lims=midline_mean_length_lims, midline_std_length_lims=midline_std_length_lims, midline_min_length_lims=midline_min_length_lims) if mode == 'fit_straight_line' or mode == 'longest_in_cluster' or mode == 'random_from_cluster': # Calculate Hausdorff distance between LOIs and perform clustering self._hausdorff_distance_lois() self._cluster_lois(distance_threshold_lois=distance_threshold_lois, linkage=linkage) # Fit lines to LOIs clusters and select LOIs for analysis if mode == 'fit_straight_line': self._fit_straight_line(add_length=2, n_lois=n_lois) elif mode == 'longest_in_cluster': self._longest_in_cluster(n_lois=n_lois, frame=frame) elif mode == 'random_from_cluster': self._random_from_cluster(n_lois=n_lois, frame=frame) elif mode == 'random_line': self._random_lois(n_lois=n_lois, frame=frame) else: raise ValueError(f'mode {mode} not valid.') # extract intensity kymographs profiles and save LOI files for line_i in self.data['loi_data']['loi_lines']: self.create_loi_data(line_i, linewidth=linewidth, order=order, export_raw=export_raw)
[docs] def delete_lois(self): """ Delete all LOIs, their associated data files, and their directories. """ self.data.pop('loi_data', None) loi_files = glob.glob(os.path.join(self.base_dir, '*loi.json')) for loi_file in loi_files: try: # Remove the LOI file os.remove(loi_file) # Remove the associated data file data_file = os.path.join(self.data_dir, f"{os.path.splitext(os.path.basename(loi_file))[0]}_data.json") if os.path.exists(data_file): os.remove(data_file) # Remove the directory and its contents directory = loi_file[:-len('_loi.json')] + '/' if os.path.exists(directory): shutil.rmtree(directory) except Exception: pass # Silently continue if an error occurs
[docs] def full_analysis_structure(self, frames='all'): """ Analyze sarcomere structure with default parameters at specified frames Parameters ---------- frames : {'all', int, list, np.ndarray} frames for analysis ('all' for all frames, int for a single frame, list or ndarray for selected frames). """ self.auto_save = False self.analyze_cell_mask() self.analyze_z_bands(frames=frames) self.analyze_sarcomere_vectors(frames=frames) self.analyze_myofibrils(frames=frames) self.analyze_sarcomere_domains(frames=frames) if not self.auto_save: self.store_structure_data() self.auto_save = True
[docs] @staticmethod def segment_z_bands(image: np.ndarray, threshold: float = 0.15) -> Tuple[np.ndarray, np.ndarray]: """ Segment z-bands from U-Net result (threshold, make binary, skeletonize, label regions). Parameters ---------- image : np.ndarray Input image from U-Net. threshold : float, optional Threshold value for binarizing the image. Defaults to 0.15. Returns ------- labels : np.ndarray Labeled regions in the thresholded image. labels_skel : np.ndarray Labeled regions in the skeletonized image. """ mask = image > threshold mask_skel = morphology.skeletonize(mask) labels = label(mask) labels_skel = mask_skel * labels return labels, labels_skel
[docs] @staticmethod def _analyze_z_bands(zbands: np.ndarray, labels: np.ndarray, labels_skel: np.ndarray, image_raw: np.ndarray, orientation_field: np.ndarray, pixelsize: float, min_length: float = 1.0, threshold: float = 0.1, median_filter_radius: float = 0.25, a_min: float = 0.3, theta_phi_min: float = 0.2, d_max: float = 4.0, d_min: float = 0.25) -> Tuple: """ Analyzes segmented z-bands in a single frame, extracting metrics such as length, intensity, orientation, straightness, lateral distance, alignment, number of lateral neighbors per z-band, and characteristics of groups of lateral z-bands (length, alignment, size). Parameters ---------- zbands : np.ndarray The segmented map of z-bands. labels : np.ndarray The labeled image of z-bands. labels_skel : np.ndarray The skeletonized labels of z-bands. image_raw : np.ndarray The raw image. orientation_field : np.ndarray Sarcomere orientation field. pixelsize : float The size of pixels in the image. min_length : float, optional The minimum length threshold for z-bands. Default is 1.0. threshold : float, optional The threshold value for intensity. Default is 0.1. median_filter_radius : float, optional Radius of kernel to smooth orientation field. Default is 0.2 µm. a_min : float, optional The minimum value for alignment. Default is 0.25. Links with smaller alignment are set to np.nan. theta_phi_min : float, optional The minimum dot product/cosine between the direction of a Z-band end and the direction of line from end to other Z-band end. d_max : float, optional The maximum distance between z-band ends. Default is 5.0 µm. Larger distances are set to np.nan. d_min : float, optional The minimum distance between z-band ends. Default is 0 µm. Smaller distances are set to np.nan. Returns ------- tuple A comprehensive tuple containing arrays and values describing the analyzed properties of z-bands: - Lengths, intensities, straightness, ratio of intensities, average intensity, orientations, orientational order parameter, list of z-band labels, processed labels image, number of lateral neighbors, lateral distances, lateral alignments, links between z-band ends, coordinates of z-band ends, linked groups of z-bands, and their respective sizes, lengths, and alignments. """ # analyze skeletonized labels to determine z-band backbone length props_skel = regionprops_table(labels_skel, properties=['label', 'perimeter']) labels_list = props_skel['label'] # remove short z-bands length = props_skel['perimeter'] * pixelsize labels_list_ = labels_list.copy() labels_list[length < min_length] = 0 labels_list = np.insert(labels_list, 0, 0) labels_list_ = np.insert(labels_list_, 0, 0) labels = Utils.map_array(labels, labels_list_, labels_list) labels, forward_map, inverse_map = segmentation.relabel_sequential(labels) labels_list = labels_list[labels_list != 0] # sarcomere orientation map smooth_radius_px = int(median_filter_radius / pixelsize) sarcomere_orientation = Utils.get_orientation_angle_map(orientation_field, use_median_filter=True, radius=smooth_radius_px) # analyze z-band labels props = regionprops_table(labels, intensity_image=image_raw, properties=['label', 'area', 'convex_area', 'mean_intensity', 'orientation', 'image', 'bbox', 'centroid']) # z-band length length = length[length >= min_length] # straightness of z-bands (area/convex_hull) straightness = props['area'] / props['convex_area'] # fluorescence intensity of each individual z-band, the total area, and the average intensity of z-band mask intensity = props['mean_intensity'] z_mask = zbands > threshold z_mask_area = np.sum(z_mask.astype('uint8')) * pixelsize ** 2 z_mask_intensity = np.mean(image_raw[z_mask]) # z band orientational order parameter orientation = props['orientation'] if len(orientation) > 0: oop = 1 / len(orientation) * np.abs(np.sum(np.exp(orientation * 2 * 1j))) else: oop = np.nan # local lateral z-band alignment and distance n_z = len(np.unique(labels)) - 1 if n_z > 0: # get two ends of each z-band z_ends = np.zeros((n_z, 2, 2)) * np.nan # (z-band idx, upper/lower end, x/y) z_orientation = np.zeros((n_z, 2)) * np.nan # (z-band idx, upper/lower) pad_width = int(round(1 / pixelsize, 0)) for i, zbands_i in enumerate(props['image']): zbands_i = np.pad(zbands_i, (pad_width, pad_width)) # skeletonize skel_i = skeletonize(zbands_i, method='lee') # detect line ends def line_end_filter(d): return (d[4] == 1) and np.sum(d) == 2 z_ends_i = ndimage.generic_filter(skel_i, line_end_filter, (3, 3)) z_ends_i = np.asarray(np.where(z_ends_i == 1)) z_ends_i[0] += props['bbox-0'][i] - pad_width z_ends_i[1] += props['bbox-1'][i] - pad_width centroid_i = (props['centroid-0'][i], props['centroid-1'][i]) if len(z_ends_i.T) == 2: if z_ends_i[1, 0] > z_ends_i[1, 1]: z_ends_i = z_ends_i[:, ::-1] # Get orientations from map and add π/2 orientation_ends_i = np.asarray([sarcomere_orientation[z_ends_i[0][0], z_ends_i[1][0]] + np.pi / 2, sarcomere_orientation[z_ends_i[0][1], z_ends_i[1][1]] + np.pi / 2]) # Calculate local directions from endpoints to their own positions in skeleton _orient_1 = np.arctan2(z_ends_i[0, 0] - centroid_i[0], z_ends_i[1, 0] - centroid_i[1]) _orient_2 = np.arctan2(z_ends_i[0, 1] - centroid_i[0], z_ends_i[1, 1] - centroid_i[1]) # Better angle difference calculation (minimum angle in range [0, π]) def angle_diff(a1, a2): return np.abs((a1 - a2 + np.pi) % (2 * np.pi) - np.pi) # Apply π shift if angles differ by more than π/2 if angle_diff(orientation_ends_i[0], _orient_1) > np.pi / 2: orientation_ends_i[0] = orientation_ends_i[0] + np.pi if angle_diff(orientation_ends_i[1], _orient_2) > np.pi / 2: orientation_ends_i[1] = orientation_ends_i[1] + np.pi orientation_ends_i = -orientation_ends_i + np.pi / 2 # # Ensure angles stay in range [-π, π] orientation_ends_i[0] = (orientation_ends_i[0] + np.pi) % (2 * np.pi) - np.pi orientation_ends_i[1] = (orientation_ends_i[1] + np.pi) % (2 * np.pi) - np.pi z_orientation[i] = orientation_ends_i z_ends[i] = z_ends_i.T * pixelsize # lateral alignment index and distance of z-bands def lateral_alignment(pos_i, pos_j, theta_i, theta_j): phi_ij = np.arctan2((pos_j[1] - pos_i[1]), (pos_j[0] - pos_i[0])) % (2 * np.pi) phi_ji = (phi_ij + np.pi) % (2 * np.pi) a_ji = np.cos(theta_i - theta_j + np.pi) * np.cos(theta_i - phi_ij) * np.cos(theta_j - phi_ji) if np.cos(theta_i - theta_j + np.pi) > 0 and np.cos(theta_i - phi_ij) > theta_phi_min and np.cos( theta_j - phi_ji) > theta_phi_min: return a_ji else: return np.nan # distance of z-band ends _z_ends = np.reshape(z_ends, (n_z * 2, 2), order='F') D = squareform(pdist(_z_ends, 'euclidean')) # Set NaNs for specified indices (ends of same objects) and the lower triangle indices = np.arange(0, n_z * 2, 2) mask = np.ones((n_z * 2, n_z * 2)) mask[indices, indices] = 0 mask[indices, indices + 1] = 0 mask[indices + 1, indices] = 0 mask[indices + 1, indices + 1] = 0 mask[np.tril(mask) > 0] = np.nan # filter distance matrix D[(D > d_max) | (D < d_min) | (mask == 0)] = np.nan # indices of end-end-distances shorter than d_max _z_orientation = np.reshape(z_orientation, (n_z * 2), order='F') _idxs = np.asarray(np.where(~np.isnan(D))) # matrix with lateral alignments A A = np.zeros_like(D) * np.nan for (i, j) in _idxs.T: A_ij = lateral_alignment(_z_ends[i], _z_ends[j], _z_orientation[i], _z_orientation[j]) A[i, j] = A_ij if A_ij >= a_min else np.nan D[np.isnan(A)] = np.nan # make matrices symmetric for undirected graph D = (D + D.T) / 2 A = (A + A.T) / 2 def compute_cost_matrix(D, A, penalty=1e6): """ Compute the cost matrix for linking Z-band ends based on a cost 1 - A favoring optimal alignment. Parameters: ---------- D : ndarray Distance matrix between Z-band ends. A : ndarray Alignment matrix between Z-band ends. w_dist : float Weight for distance in the cost function. w_align : float Weight for alignment in the cost function. penalty : float Penalty for invalid links (e.g., NaN or out-of-range values). Returns: ------- C : ndarray Cost matrix for linking Z-band ends. """ # Ensure alignment values are valid (replace NaNs with 0) A = np.nan_to_num(A, nan=0.0) # Compute cost matrix C = 1 - A # Set invalid links (e.g., NaNs in D) to a very high cost C[np.isnan(D)] = penalty return C def solve_linking(C): """ Solve the optimal linking problem using the Hungarian algorithm. Parameters: ---------- C : ndarray Cost matrix for linking Z-band ends. Returns: ------- row_ind : ndarray Row indices of the optimal assignment. col_ind : ndarray Column indices of the optimal assignment. """ # Use scipy's linear_sum_assignment to solve the assignment problem row_ind, col_ind = linear_sum_assignment(C) return row_ind, col_ind # Step 1: Compute cost matrix C = compute_cost_matrix(D, A) # Step 2: Solve optimal linking using Hungarian algorithm row_ind, col_ind = solve_linking(C) # Step 3: Create adjacency matrix for valid links links = np.zeros_like(D) for i, j in zip(row_ind, col_ind): links[i, j] = 1 if D[i, j] <= d_max and A[i, j] >= a_min else 0 # reshape arrays links = links.reshape((n_z, 2, n_z, 2), order='F') lat_dist = D.reshape((n_z, 2, n_z, 2), order='F') lat_alignment = A.reshape((n_z, 2, n_z, 2), order='F') # number of lateral neighbors links_z = np.sum(links, axis=(1, 3)) lat_neighbors = np.count_nonzero(links_z, axis=1) # convert links, lat_dist and lat_alignment to lists links = np.where(links == 1) lat_dist = lat_dist[links] lat_alignment = lat_alignment[links] links = np.asarray(links) # analyze laterally linked groups def analyze_linked_groups(connectivity_matrix, distance_matrix, alignment_matrix): G = nx.Graph() for n in range(n_z): G.add_node(n) # Efficiently add edges based on connectivity and criteria for n, (idx_i, end_i, idx_j, end_j) in enumerate(connectivity_matrix.T): G.add_edge(idx_i, idx_j, alignment=alignment_matrix[n], distance=distance_matrix[n]) # Find connected components in the graph with best matches _linked_groups = list(nx.connected_components(G)) _size_groups = np.asarray([len(group) for group in _linked_groups]) # Calculate length of each group _length_groups = [] _alignment_groups = [] for group in _linked_groups: sum_distance = 0 sum_alignment = 0 for node in group: edges = G.edges(node, data=True) for _, _, data in edges: if G.has_edge(_, node): # Check if edge is within the current group sum_distance += data['distance'] sum_alignment += data['alignment'] sum_distance /= 2 # Each edge is counted twice (undirected graph), so divide by 2 _length_groups.append(sum_distance + np.sum(length[list(group)])) _alignment_groups.append(sum_alignment / len(group)) _linked_groups = [list(s) for s in _linked_groups] return (_linked_groups, np.asarray(_size_groups), np.asarray(_length_groups), np.asarray(_alignment_groups)) linked_groups, size_groups, length_groups, alignment_groups = analyze_linked_groups(links, lat_dist, lat_alignment) else: (lat_neighbors, lat_dist, lat_alignment, links, z_ends, linked_groups, size_groups, length_groups, alignment_groups) = [], [], [], [], [], [], [], [], [] return (length, intensity, straightness, z_mask_intensity, z_mask_area, orientation, oop, labels_list, labels, lat_neighbors, lat_dist, lat_alignment, links, z_ends, linked_groups, size_groups, length_groups, alignment_groups)
[docs] @staticmethod def get_sarcomere_vectors( zbands: np.ndarray, mbands: np.ndarray, orientation_field: np.ndarray, pixelsize: float, median_filter_radius: float = 0.25, slen_lims: Tuple[float, float] = (1, 3), interp_factor: int = 4, linewidth: float = 0.3, backend: str = 'loky', ) -> Tuple[Union[np.ndarray, List], Union[np.ndarray, List], Union[np.ndarray, List], Union[np.ndarray, List], Union[np.ndarray, List], Union[np.ndarray, List], Union[np.ndarray, List]]: """ Extract sarcomere orientation and length vectors. Parameters ---------- zbands : np.ndarray 2D array representing the semantic segmentation map of Z-bands. mbands : np.ndarray 2D array representing the semantic segmentation map of mbands. orientation_field : np.ndarray 2D array representing the orientation field. pixelsize : float Size of a pixel in micrometers. median_filter_radius : float, optional Radius of kernel to smooth orientation field before assessing orientation at M-points, in µm (default 0.25 µm). slen_lims : tuple of float, optional Sarcomere size limits in micrometers (default is (1, 3)). interp_factor : int, optional Interpolation factor for profiles to calculate sarcomere length. Defaults to 4. linewidth : float, optional Line width of profiles to calculate sarcomere length. Defaults to 0.3 µm. Returns ------- pos_vectors : np.ndarray Array of position vectors for sarcomeres. sarcomere_orientation_vectors : np.ndarray Sarcomere orientation values at midline points. sarcomere_length_vectors : np.ndarray Sarcomere length values at midline points. sarcomere_mask : np.ndarray Mask indicating the presence of sarcomeres. """ radius_pixels = max(int(round(median_filter_radius / pixelsize, 0)), 1) linewidth_pixels = max(int(round(linewidth / pixelsize, 0)), 1) # skeletonize mbands mbands_skel = skeletonize(mbands, method='lee') # calculate and preprocess orientation map orientation = Utils.get_orientation_angle_map(orientation_field, use_median_filter=True, radius=radius_pixels) # label mbands midline_labels, n_mbands = ndimage.label(mbands_skel, ndimage.generate_binary_structure(2, 2)) # iterate mbands and create an additional list with labels and midline length (approx. by max. Feret diameter) props = measure.regionprops_table(midline_labels, properties=['label', 'coords', 'feret_diameter_max']) list_labels, coords_mbands, length_mbands = (props['label'], props['coords'], props['feret_diameter_max'] * pixelsize) pos_vectors_px, pos_vectors, midline_id_vectors, midline_length_vectors = [], [], [], [] if n_mbands > 0: for i, (label_i, coords_i, length_midline_i) in enumerate( zip(list_labels, coords_mbands, length_mbands)): pos_vectors_px.append(coords_i) midline_length_vectors.append(np.ones(coords_i.shape[0]) * length_midline_i) midline_id_vectors.append(np.ones(coords_i.shape[0]) * label_i) pos_vectors_px = np.concatenate(pos_vectors_px, axis=0) midline_id_vectors = np.concatenate(midline_id_vectors) midline_length_vectors = np.concatenate(midline_length_vectors) sarcomere_orientation_vectors = orientation[pos_vectors_px[:, 0], pos_vectors_px[:, 1]] ends1 = pos_vectors_px.T + (slen_lims[1] * 1.3) / 2 / pixelsize * np.array( (np.sin(sarcomere_orientation_vectors), np.cos(sarcomere_orientation_vectors)) ) ends2 = pos_vectors_px.T - (slen_lims[1] * 1.3) / 2 / pixelsize * np.array( (np.sin(sarcomere_orientation_vectors), np.cos(sarcomere_orientation_vectors)) ) # Calculate sarcomere lengths by measuring peak-to-peak distance of Z-bands in intensity profile profiles = Utils.fast_profile_lines(zbands, ends1, ends2, linewidth=linewidth_pixels) # Use parallel processing for faster execution results = Parallel(n_jobs=-1, backend=backend)( delayed(Utils.process_profile)( profile, pixelsize, slen_lims=slen_lims, interp_factor=interp_factor ) for profile in profiles ) # Convert results to array sarcomere_length_vectors, center_offsets = np.array(results).T # get vector positions in µm and correct center of vectors pos_vectors = pos_vectors_px * pixelsize offset_vectors = np.stack((np.sin(sarcomere_orientation_vectors) * center_offsets, np.cos(sarcomere_orientation_vectors) * center_offsets), axis=-1) pos_vectors -= offset_vectors # remove NaNs nan_mask = np.isnan(sarcomere_length_vectors) pos_vectors_px = pos_vectors_px[~nan_mask] pos_vectors = pos_vectors[~nan_mask] midline_id_vectors = midline_id_vectors[~nan_mask] sarcomere_orientation_vectors = sarcomere_orientation_vectors[~nan_mask] sarcomere_length_vectors = sarcomere_length_vectors[~nan_mask] else: sarcomere_length_vectors, z_band_thickness_vectors, sarcomere_orientation_vectors = [], [], [] return (pos_vectors_px, pos_vectors, midline_id_vectors, midline_length_vectors, sarcomere_length_vectors, sarcomere_orientation_vectors, n_mbands)
[docs] @staticmethod def cluster_sarcomeres(pos_vectors: np.ndarray, sarcomere_length_vectors: np.ndarray, sarcomere_orientation_vectors: np.ndarray, pixelsize: float, size: Tuple[int, int], d_max: float = 3, cosine_min: float = 0.65, leiden_resolution: float = 0.06, random_seed: int = 42, area_min: float = 20, dilation_radius: float = 0.3) -> Tuple[int, List, List, List, List, List, np.ndarray]: """ This function clusters sarcomeres into domains based on their spatial and orientational properties using the Leiden method for community detection in igraph. It considers sarcomere lengths, orientations, and positions along mbands to form networks of connected sarcomeres. Domains are then identified as communities within these networks, with additional criteria for minimum domain area and connectivity thresholds. Finally, this function quantifies the mean and std of sarcomere lengths, and the orientational order parameter and mean orientation of each domain. Parameters ---------- pos_vectors : np.ndarray Array of sarcomere midline point positions in µm. sarcomere_length_vectors : np.ndarray List of midline point sarcomere lengths sarcomere_orientation_vectors : np.ndarray List of midline point sarcomere orientations, in radians pixelsize : float Pixel size in µm size : tuple(int, int) Shape of the image in pixels d_max : float Max. distance threshold for creating a network edge between vector ends cosine_min : float Minimal absolute cosine between vector angles for creating a network edge between vector ends leiden_resolution : float Resolution parameter for the Leiden algorithm random_seed : int Random seed for reproducibility area_min : float Minimal area (in µm²) for a domain to be kept dilation_radius : float Dilation radius for refining domain area masks (in µm) Returns ------- n_domains : int Number of domains domains : list List of domain sets with point indices area_domains : list List with domain areas sarcomere_length_mean_domains : list Mean sarcomere length within each domain sarcomere_length_std_domains : list Standard deviation of sarcomere length within each domain sarcomere_oop_domains : list Orientational order parameter of sarcomeres in each domain sarcomere_orientation_domains : list Main orientation of domains mask_domains : ndarray Masks of domains with value representing domain label """ if len(pos_vectors) < 10: return 0, [], [], [], [], [], [], [] n_vectors = sarcomere_length_vectors.shape[0] # Calculate orientation vectors using trigonometry orientation_vectors = np.column_stack([np.sin(sarcomere_orientation_vectors), np.cos(sarcomere_orientation_vectors)]) # Calculate end points of the vectors ends_0 = pos_vectors + orientation_vectors * sarcomere_length_vectors[:, None] / 2 ends_1 = pos_vectors - orientation_vectors * sarcomere_length_vectors[:, None] / 2 # Interleave ends_0 and ends_1 ends = np.empty((2 * n_vectors, 2), dtype=np.float64) ends[0::2] = ends_0 ends[1::2] = ends_1 # Interleave orientation vectors orientation_ends = np.empty((2 * n_vectors, 2), dtype=np.float64) orientation_ends[0::2] = orientation_vectors orientation_ends[1::2] = -orientation_vectors # Use cKDTree to find pairs within the distance threshold tree = cKDTree(ends) pairs = tree.query_pairs(d_max, output_type='ndarray') # Compute cosine similarity for all pairs at once dot_products = np.sum(orientation_ends[pairs[:, 0]] * orientation_ends[pairs[:, 1]], axis=1) norms = np.linalg.norm(orientation_ends[pairs[:, 0]], axis=1) * np.linalg.norm( orientation_ends[pairs[:, 1]], axis=1) cosine_similarities = np.abs(dot_products / norms) # Filter pairs based on cosine similarity valid_pairs = cosine_similarities > cosine_min filtered_pairs = pairs[valid_pairs] # Calculate distances for valid pairs distances = (np.sqrt(np.sum((ends[filtered_pairs[:, 0]] - ends[filtered_pairs[:, 1]]) ** 2, axis=1)) / cosine_similarities[valid_pairs]) # Create edges list edges = filtered_pairs.tolist() # Add zero-cost connections between the two ends of each vector zero_cost_edges = [(2 * i, 2 * i + 1) for i in range(n_vectors)] edges.extend(zero_cost_edges) # Create weights list weights = distances.tolist() weights.extend([0] * n_vectors) # Create the graph graph = ig.Graph(2 * n_vectors) graph.add_edges(edges) graph.es['weight'] = weights # Create a mapping to contract pairs of vertices mapping = [i // 2 for i in range(graph.vcount())] # Contract the vertices graph.contract_vertices(mapping) # Set random seed random.seed(random_seed) # Run Leiden # CommunityLeiden returns a VertexClustering, from which we can get memberships clusters = graph.community_leiden( weights="weight", resolution_parameter=leiden_resolution, n_iterations=-1, objective_function="modularity" ) # Build domain sets membership = clusters.membership domains_dict = {} for idx, c_id in enumerate(membership): domains_dict.setdefault(c_id, []).append(idx) domains = list(domains_dict.values()) # Shuffle domains for random ordering random.shuffle(domains) (mask_domains, area_domains, sarcomere_length_mean_domains, sarcomere_length_std_domains, sarcomere_oop_domains, sarcomere_orientation_domains) = Structure._analyze_domains(domains, pos_vectors, sarcomere_orientation_vectors, sarcomere_length_vectors, size=size, pixelsize=pixelsize, dilation_radius=dilation_radius, area_min=area_min) area_domains = np.asarray(area_domains) sarcomere_length_mean_domains = np.asarray(sarcomere_length_mean_domains) sarcomere_length_std_domains = np.asarray(sarcomere_length_std_domains) sarcomere_oop_domains = np.asarray(sarcomere_oop_domains) sarcomere_orientation_domains = np.asarray(sarcomere_orientation_domains) n_domains = len(area_domains) return (n_domains, domains, area_domains, sarcomere_length_mean_domains, sarcomere_length_std_domains, sarcomere_oop_domains, sarcomere_orientation_domains, mask_domains)
[docs] @staticmethod def _grow_line(seed, points_t, sarcomere_length_vectors_t, sarcomere_orientation_vectors_t, nbrs, threshold_distance, pixelsize, persistence): line_i = deque([seed]) stop_right = stop_left = False sarcomere_orientation_vectors_t = sarcomere_orientation_vectors_t + np.pi / 2 threshold_distance_pixels = threshold_distance / pixelsize def angle_diff(alpha, beta): """Return the signed difference between angles alpha and beta. The result is in the range [-pi, pi].""" return np.arctan2(np.sin(beta - alpha), np.cos(beta - alpha)) def calculate_mean_orientation(orientations): # Convert orientations to complex numbers on the unit circle complex_orientations = np.exp(2j * np.array(orientations)) # Calculate the mean of the complex numbers mean_complex = np.mean(complex_orientations) # Convert back to angle and halve it to get the original range return np.angle(mean_complex) / 2 def adjust_orientation(current_orientation, previous_orientation): diff = angle_diff(current_orientation, previous_orientation) if diff > np.pi / 2: return current_orientation - np.pi elif diff < -np.pi / 2: return current_orientation + np.pi return current_orientation # Initialize orientations orientation_left = orientation_right = sarcomere_orientation_vectors_t[seed] points_t = points_t.T while not stop_left or not stop_right: line_i_list = list(line_i) if not stop_left: end_left = points_t[:, line_i_list[0]] length_left = np.mean(sarcomere_length_vectors_t[line_i_list[:persistence]]) / pixelsize new_orientation_left = calculate_mean_orientation( sarcomere_orientation_vectors_t[line_i_list[:persistence]]) if persistence > 1 else sarcomere_orientation_vectors_t[line_i_list[0]] orientation_left = adjust_orientation(new_orientation_left, orientation_left) if not stop_right: end_right = points_t[:, line_i_list[-1]] length_right = np.mean(sarcomere_length_vectors_t[line_i_list[-persistence:]]) / pixelsize new_orientation_right = calculate_mean_orientation( sarcomere_orientation_vectors_t[line_i_list[-persistence:]]) if persistence > 1 else sarcomere_orientation_vectors_t[line_i_list[-1]] orientation_right = adjust_orientation(new_orientation_right, orientation_right) # grow left if not stop_left: prior_left = [end_left[0] + np.cos(orientation_left) * length_left, end_left[1] - np.sin(orientation_left) * length_left] distance_left, index_left = nbrs.kneighbors([prior_left], return_distance=True) if distance_left[0][0] < threshold_distance_pixels: line_i.appendleft(index_left[0][0].astype('int')) else: stop_left = True # grow right if not stop_right: prior_right = [end_right[0] - np.cos(orientation_right) * length_right, end_right[1] + np.sin(orientation_right) * length_right] distance_right, index_right = nbrs.kneighbors([prior_right], return_distance=True) if distance_right[0][0] < threshold_distance_pixels: line_i.append(index_right[0][0].astype('int')) else: stop_right = True return np.asarray(line_i)
[docs] @staticmethod def line_growth(points_t: np.ndarray, sarcomere_length_vectors_t: np.ndarray, sarcomere_orientation_vectors_t: np.ndarray, midline_length_vectors_t: np.ndarray, pixelsize: float, ratio_seeds: float = 0.1, persistence: int = 4, threshold_distance: float = 0.3, n_min: int = 5, random_seed: Union[None, int] = None): """ Line growth algorithm to determine myofibril lines perpendicular to sarcomere z-bands Parameters ---------- points_t : np.ndarray List of midline point positions sarcomere_length_vectors_t : list Sarcomere length at midline points sarcomere_orientation_vectors_t : list Sarcomere orientation angle at midline points, in radians midline_length_vectors_t : list Length of sarcomere mbands of midline points pixelsize : float Pixel size in µm ratio_seeds : float Ratio of sarcomere vectors to be takes as seeds for line growth persistence : int Number of points to consider for averaging length and orientation. random_seed : int, optional Random seed for reproducibility. Defaults to None. Returns ------- line_data : dict Dictionary with LOI data keys = (lines, line_features) """ # select random origins for line growth points_t = np.asarray(points_t) if points_t.shape[0] == 2: points_t = points_t.T if len(points_t) == 0: print('No sarcomeres in image (len(points) = 0), could not grow lines.') return {'lines': [], 'line_features': {}} if random_seed: random.seed(random_seed) n_vectors = len(points_t) seed_idx = random.sample(range(n_vectors), max(1, int(ratio_seeds * n_vectors))) # Precompute Nearest Neighbors nbrs = NearestNeighbors(n_neighbors=1, algorithm='ball_tree').fit(points_t) # Prepare arguments for parallel processing args = [ (seed, points_t, sarcomere_length_vectors_t, sarcomere_orientation_vectors_t, nbrs, threshold_distance, pixelsize, persistence) for seed in seed_idx] # grow lines lines = [Structure._grow_line(*arg) for arg in args] # remove short lines (< n_min) lines = [l for l in lines if len(l) >= n_min] # calculate features of lines n_vectors_lines = np.asarray([len(l) for l in lines]) # number of sarcomeres in line length_line_segments = [sarcomere_length_vectors_t[l] for l in lines] length_lines = [np.sum(lengths) for lengths in length_line_segments] # sarcomere lengths sarcomere_mean_length_lines = [np.mean(sarcomere_length_vectors_t[l]) for l in lines] sarcomere_std_length_lines = [np.std(sarcomere_length_vectors_t[l]) for l in lines] # midline lengths midline_mean_length_lines = [np.nanmean(midline_length_vectors_t[l]) for l in lines] midline_std_length_lines = [np.nanstd(midline_length_vectors_t[l]) for l in lines] midline_min_length_lines = [np.nanmin(midline_length_vectors_t[l]) for l in lines] # Straightness def frechet_straightness(points): """ Compute a Fréchet-inspired straightness measure: 1 - (max perpendicular deviation from chord / chord length) Parameters ---------- points : np.ndarray Array of shape (n_points, 2) representing polyline vertices Returns ------- float Straightness measure (1 = perfectly straight) """ if len(points) < 2: return 1.0 # Single point is trivially straight # Calculate chord vector between first and last points chord_vector = points[-1] - points[0] chord_length = np.linalg.norm(chord_vector) if chord_length < 1e-9: # Handle degenerate chord return 0.0 # Unit vector along chord direction unit_chord = chord_vector / chord_length # Vectors from first point to each polyline vertex displacement_vectors = points - points[0] # Scalar projections onto chord (dot product with unit vector) chord_projections = np.sum(displacement_vectors * unit_chord, axis=1) # Ideal points along chord line projected_points = points[0] + chord_projections[:, np.newaxis] * unit_chord # Perpendicular deviations from actual path deviation_vectors = points - projected_points perpendicular_deviations = np.linalg.norm(deviation_vectors, axis=1) max_deviation = np.max(perpendicular_deviations) return 1.0 - (max_deviation / chord_length) straightness_lines = [ frechet_straightness(points_t[line]) for line in lines ] # Bending: mean squared angular change tangential_vector_line_segments = [np.diff(points_t[l], axis=0) for l in lines] tangential_angle_line_segments = [np.asarray([np.arctan2(v[1], v[0]) for v in vectors]) for vectors in tangential_vector_line_segments] bending_lines = [ np.mean(np.arctan2(np.sin(np.diff(angles)), np.cos(np.diff(angles))) ** 2) if len(angles) > 1 else 0.0 for angles in tangential_angle_line_segments ] # create dictionary line_features = {'n_vectors_lines': n_vectors_lines, 'length_lines': length_lines, 'sarcomere_mean_length_lines': sarcomere_mean_length_lines, 'sarcomere_std_length_lines': sarcomere_std_length_lines, 'bending_lines': bending_lines, 'straightness_lines': straightness_lines, 'midline_mean_length_lines': midline_mean_length_lines, 'midline_std_length_lines': midline_std_length_lines, 'midline_min_length_lines': midline_min_length_lines} line_features = Utils.convert_lists_to_arrays_in_dict(line_features) line_data = {'lines': lines, 'line_features': line_features} return line_data
[docs] @staticmethod def kymograph_movie(movie: np.ndarray, line: np.ndarray, linewidth: int = 10, order: int = 0): """ Generate a kymograph using multiprocessing. Parameters -------- movie : np.ndarray, shape (N, H, W) The movie. line : np.ndarray, shape (N, 2) The coordinates of the segmented line (N>1) linewidth : int, optional Width of the scan in pixels, perpendicular to the line order : int in {0, 1, 2, 3, 4, 5}, optional The order of the spline interpolation, default is 0 if image.dtype is bool and 1 otherwise. The order has to be in the range 0-5. See `skimage.transform.warp` for detail. Return --------- return_value : ndarray Kymograph along segmented line Notes ------- Adapted from scikit-image (https://scikit-image.org/docs/0.22.x/api/skimage.measure.html#skimage.measure.profile_line). """ # prepare coordinates of segmented line perp_lines = Structure.__curved_line_profile_coordinates(points=line, linewidth=linewidth) # Prepare arguments for each frame args = [(movie[frame], perp_lines, linewidth, order) for frame in range(movie.shape[0])] # Create a Pool and map process_frame to each frame with Pool() as pool: results = pool.map(Structure.process_frame, args) # Convert list of results to a numpy array kymograph = np.array(results) return kymograph
[docs] @staticmethod def process_frame(args): frame, perp_lines, linewidth, order = args pixels = ndimage.map_coordinates(frame, perp_lines, prefilter=order > 1, order=order, mode='reflect', cval=0.0) pixels = np.flip(pixels, axis=1) intensities = np.mean(pixels, axis=1) return intensities
@staticmethod def __curved_line_profile_coordinates(points: np.ndarray, linewidth: int = 10): """ Calculate the coordinates of a curved line profile composed of multiple segments with specified linewidth. Parameters ---------- points : np.ndarray A list of points (y, x) defining the segments of the curved line. linewidth : int, optional The width of the line in pixels. Returns ------- coords : ndarray The coordinates of the curved line profile. Shape is (2, N, linewidth), where N is the total number of points in the line. """ all_perp_rows = [] all_perp_cols = [] for i in range(len(points) - 1): src, dst = np.asarray(points[i], dtype=float), np.asarray(points[i + 1], dtype=float) d_row, d_col = dst - src theta = np.arctan2(d_row, d_col) length = int(np.ceil(np.hypot(d_row, d_col) + 1)) line_col = np.linspace(src[1], dst[1], length) line_row = np.linspace(src[0], dst[0], length) col_width, row_width = (linewidth - 1) * np.sin(-theta) / 2, (linewidth - 1) * np.cos(theta) / 2 perp_rows = np.stack([np.linspace(row - row_width, row + row_width, linewidth) for row in line_row]) perp_cols = np.stack([np.linspace(col - col_width, col + col_width, linewidth) for col in line_col]) all_perp_rows.append(perp_rows) all_perp_cols.append(perp_cols) # Concatenate all segments final_perp_rows = np.concatenate(all_perp_rows, axis=0) final_perp_cols = np.concatenate(all_perp_cols, axis=0) return np.stack([final_perp_rows, final_perp_cols])
[docs] @staticmethod def sarcomere_mask(points: np.ndarray, sarcomere_orientation_vectors: np.ndarray, sarcomere_length_vectors: np.ndarray, shape: Tuple[int, int], pixelsize: float, dilation_radius: float = 0.3) -> np.ndarray: """ Calculates a binary mask of areas with sarcomeres. Parameters ---------- points : ndarray Positions of sarcomere vectors in µm. (n_vectors, 2) sarcomere_orientation_vectors : ndarray Orientations of sarcomere vectors. sarcomere_length_vectors : ndarray Lengths of sarcomere vectors in µm. shape : tuple Shape of the image, in pixels. pixelsize : float Pixel size in µm. dilation_radius : float, optional Dilation radius to close small holes in mask, in µm (default is 0.3). Returns ------- mask : ndarray Binary mask of sarcomeres. """ # Calculate orientation vectors using trigonometry sarcomere_orientation_vectors += np.pi / 2 orientation_vectors = np.asarray([np.cos(sarcomere_orientation_vectors), -np.sin(sarcomere_orientation_vectors)]) # Calculate the ends of the vectors based on their orientation and length ends_0 = points.T + orientation_vectors * sarcomere_length_vectors / 2 # End point 1 of each vector ends_1 = points.T - orientation_vectors * sarcomere_length_vectors / 2 # End point 2 of each vector ends_0, ends_1 = ends_0 / pixelsize, ends_1 / pixelsize mask = np.zeros(shape, dtype='bool') for e0, e1 in zip(ends_0.T.astype('int'), ends_1.T.astype('int')): rr, cc = line(*e0, *e1) try: mask[rr, cc] = True except: pass dilation_radius_pixels = int(round(dilation_radius / pixelsize, 0)) mask = binary_dilation(mask, disk(dilation_radius_pixels)) return mask
[docs] @staticmethod def _analyze_domains(domains: List, pos_vectors: np.ndarray, sarcomere_orientation_vectors: np.ndarray, sarcomere_length_vectors: np.ndarray, size: Tuple[int, int], pixelsize: float, dilation_radius: float, area_min: float): """ Creates a domain mask, where each domain has a distinct label, and analyzes the individual domains. Parameters __________ domains : list List with domain labels for each vector. Each domain is labeled with a unique integer. pos_vectors : ndarray Position vectors in micrometers. sarcomere_orientation_vectors : ndarray Orientation angles in radians. sarcomere_length_vectors : ndarray Sarcomere lengths in micrometers. size : tuple of int Output map dimensions (height, width) in pixels. pixelsize : float Physical size of one pixel in micrometers. dilation_radius : float, optional Dilation radius for refining domain masks, in µm. area_min : float, optional Minimal area of a domain in µm^2, smaller domains are discarded. """ # calculate domain properties and remove small domains (area_domains, sarcomere_orientation_domains, sarcomere_oop_domains, sarcomere_length_mean_domains, sarcomere_length_std_domains) = [], [], [], [], [] mask_domains = np.zeros(size, dtype='uint8') j = 1 for i, domain_i in enumerate(domains): pos_vectors_i = pos_vectors[domain_i] orientations_i = sarcomere_orientation_vectors[domain_i] lengths_i = sarcomere_length_vectors[domain_i] if pos_vectors_i.shape[0] > 10: # bounding box min_i = ( max(int((pos_vectors_i[:, 0].min() - 3) // pixelsize), 0), max(int((pos_vectors_i[:, 1].min() - 3) // pixelsize), 0)) max_i = (min(int((pos_vectors_i[:, 0].max() + 3) // pixelsize), size[0]), min(int((pos_vectors_i[:, 1].max() + 3) // pixelsize), size[1])) size_i = (max_i[0] - min_i[0], max_i[1] - min_i[1]) _pos_vectors_i = pos_vectors_i.copy() _pos_vectors_i[:, 0] -= min_i[0] * pixelsize _pos_vectors_i[:, 1] -= min_i[1] * pixelsize mask_i = Structure.sarcomere_mask(_pos_vectors_i, orientations_i, lengths_i, size_i, pixelsize=pixelsize, dilation_radius=dilation_radius) area_i = np.sum(mask_i) * pixelsize ** 2 if area_i >= area_min: ind_i = np.where(mask_i) ind_i = (ind_i[0] + min_i[0], ind_i[1] + min_i[1]) mask_domains[ind_i] = j area_i = np.sum(mask_i) * pixelsize ** 2 area_domains.append(area_i) sarcomere_length_mean_domains.append(np.mean(lengths_i)) sarcomere_length_std_domains.append(np.std(lengths_i)) oop, angle = Utils.analyze_orientations(orientations_i) sarcomere_oop_domains.append(oop) sarcomere_orientation_domains.append(angle) j += 1 return (mask_domains, area_domains, sarcomere_length_mean_domains, sarcomere_length_std_domains, sarcomere_oop_domains, sarcomere_orientation_domains)
[docs] @staticmethod def create_myofibril_length_map( myof_lines: np.ndarray, myof_length: np.ndarray, pos_vectors: np.ndarray, sarcomere_orientation_vectors: np.ndarray, sarcomere_length_vectors: np.ndarray, size: tuple, pixelsize: float, median_filter_radius: float = 0.6, ) -> np.ndarray: """ The `create_myofibril_length_map` function generates a **2D spatial map** of myofibril lengths represented as pixel values. It achieves this by rasterizing myofibril line segments, assigning their corresponding lengths to the pixels they occupy, and averaging these values at overlapping pixels. The resulting map is optionally smoothed using a median filter to reduce noise and provide a more coherent spatial representation. Parameters ---------- myof_lines : ndarray Line indices for myofibril structures. myof_length : ndarray Length values for each myofibril line. pos_vectors : ndarray Position vectors in micrometers. sarcomere_orientation_vectors : ndarray Orientation angles in radians. sarcomere_length_vectors : ndarray Sarcomere lengths in micrometers. size : tuple of int Output map dimensions (height, width) in pixels. pixelsize : float Physical size of one pixel in micrometers. median_filter_radius : float, optional Filter radius in micrometers, by default 0.6. Returns ------- ndarray 2D array of calculated myofibril lengths with NaN for empty regions. """ # Convert median filter radius to pixels median_radius_px = int(round(median_filter_radius / pixelsize)) # Initialize accumulation maps length_sum_map = np.zeros(size, dtype=np.float32) weight_map = np.zeros(size, dtype=np.float32) # Process each myofibril segment for line_idx, line_length in zip(myof_lines, myof_length): # Extract vector data for current line points = pos_vectors[line_idx] orientations = sarcomere_orientation_vectors[line_idx] + np.pi / 2 lengths = sarcomere_length_vectors[line_idx] # Calculate direction vectors dir_x = np.cos(orientations) dir_y = -np.sin(orientations) directions = np.vstack([dir_x, dir_y]) # Calculate endpoints in pixel coordinates end_offset = directions * lengths / 2 end_points = np.stack([ (points.T + end_offset) / pixelsize, (points.T - end_offset) / pixelsize ]).astype(np.int32) # Rasterize lines for (x0, y0), (x1, y1) in zip(end_points[0].T, end_points[1].T): rr, cc = line(x0, y0, x1, y1) # Apply boundary constraints valid = (rr >= 0) & (rr < size[0]) & (cc >= 0) & (cc < size[1]) np.add.at(length_sum_map, (rr[valid], cc[valid]), line_length) np.add.at(weight_map, (rr[valid], cc[valid]), 1) # Calculate weighted average myof_map = np.divide(length_sum_map, weight_map, out=np.full_like(length_sum_map, np.nan), where=weight_map > 0) # Apply median filtering if required if median_radius_px > 0: window_size = 2 * median_radius_px + 1 myof_map = Utils.nanmedian_filter_numba(myof_map, window_size) return myof_map
if __name__ == "__main__": print('Testing Structure class') test_file = '../test_data/antibody_staining_2D_hiPSC_CM/stained hiPSC d60 a-actinin 488 63x 5.tif' sarc = Structure(test_file, pixelsize=0.114) # detect sarcomeres sarc.detect_sarcomeres() # analyze Z-bands sarc.analyze_z_bands() # analyze sarcomere vectors sarc.analyze_sarcomere_vectors() # analyze myofibrils sarc.analyze_myofibrils() # analyze sarcomere domains sarc.analyze_sarcomere_domains()