Source code for sarcasm.motion

# -*- coding: utf-8 -*-
# Copyright (c) 2025 University Medical Center Göttingen, Germany.
# All rights reserved.
#
# Patent Pending: DE 10 2024 112 939.5
# SPDX-License-Identifier: LicenseRef-Proprietary-See-LICENSE
#
# This software is licensed under a custom license. See the LICENSE file
# in the root directory for full details.
#
# **Commercial use is prohibited without a separate license.**
# Contact MBM ScienceBridge GmbH (https://sciencebridge.de/en/) for licensing.


import contextlib
import io
import os
from typing import List, Tuple, Union

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pywt import cwt
from scipy.ndimage import binary_closing, binary_opening, label, binary_dilation
from scipy.stats import kstest, geom
from skimage.segmentation import clear_border
from trackpy import link_iter

from contraction_net.prediction import predict_contractions
from sarcasm.core import SarcAsM
from sarcasm.ioutils import IOUtils
from sarcasm.utils import Utils


[docs] class Motion(SarcAsM): """Class for tracking and analysis of sarcomere motion at line of interest LOI""" def __init__(self, filename: str, loi_name: str, restart: bool = False, auto_save: bool = True): """ Initialization of a Motion object for single LOI (Line of Interest) analysis Parameters ---------- filename : str Filename of cardiomyocyte tif-movie loi_name : str Filename of LOI (only basename). All LOI files can be found by loi_files = glob.glob(cell.folder + '*.json') restart : bool If True, analysis is started from beginning, and empty LOI dictionary is initialized auto_save : bool If True, LOI dictionary is saved at end of processing steps. """ super().__init__(filename) # init super SarcAsM object assert self.metadata['frametime'] is not None, "frametime is not defined in metadata" self.loi_data = {} # init empty dictionary self.loi_file = os.path.join(os.path.splitext(filename)[0], loi_name) # folder for loi data self.loi_name = Motion.get_loi_name_from_file_name(loi_name) # loi_name is the file name of the json self file # create folder for LOI (sub-folder in cell folder) for analysis self.loi_folder = os.path.join(self.base_dir, self.loi_name) os.makedirs(self.loi_folder, exist_ok=True) # flag to automatically save dict after processing self.auto_save = auto_save # load data dict or create new dict (of not yet analyzed or restart=True) self.__load_analysed_data_or_create(restart) def __load_analysed_data_or_create(self, restart: bool): # load data if already analyzed if os.path.exists(self.__get_loi_data_file_name(is_temp_file=False)) and not restart: self.load_loi_data() else: print('LOI not yet analyzed.') # return if the file does not exist if not os.path.exists(self.loi_file): return self.__create_loi_data() def __create_loi_data(self): # read file with profiles and get time array x_pos, y_int, y_int_raw, line, time, line_width = self.read_profile_data() # initialize and save dictionary self.loi_data = {'x_pos': x_pos, 'y_int': y_int, 'y_int_raw': y_int_raw, 'time': time, 'line': line, 'line_width': line_width} if self.auto_save: self.store_loi_data()
[docs] @staticmethod def get_loi_name_from_file_name(filename) -> str: return filename.replace(".temp", "").replace("_loi", "").replace(".json", "").replace(".csv", "")
def __get_loi_data_file_name(self, is_temp_file=False) -> str: if is_temp_file: return os.path.join(self.data_dir, self.loi_name + "_loi_data.temp.json") else: return os.path.join(self.data_dir, self.loi_name + "_loi_data.json")
[docs] def load_loi_data(self): if os.path.exists(self.__get_loi_data_file_name(is_temp_file=False)): # persistent file exists, try using it try: self.loi_data = IOUtils.json_deserialize(self.__get_loi_data_file_name()) except: if os.path.exists(self.__get_loi_data_file_name(is_temp_file=True)): self.loi_data = IOUtils.json_deserialize(self.__get_loi_data_file_name(is_temp_file=True)) else: # no persistent file exists, look if a temp-file exists if os.path.exists(self.__get_loi_data_file_name(is_temp_file=True)): self.loi_data = IOUtils.json_deserialize(self.__get_loi_data_file_name(is_temp_file=True)) if self.loi_data is None or not self.loi_data: # self data is empty, reload from self file if os.path.exists(self.__get_loi_data_file_name()): os.remove(self.__get_loi_data_file_name()) if not os.path.exists(self.loi_file): return self.__create_loi_data() if not self.auto_save: self.store_loi_data() self.commit()
[docs] def store_loi_data(self): """Save LOI data""" IOUtils.json_serialize(self.loi_data, self.__get_loi_data_file_name()) print('LOI data saved!')
[docs] def commit(self): if os.path.exists(self.__get_loi_data_file_name(is_temp_file=True)): if os.path.exists(self.__get_loi_data_file_name(is_temp_file=False)): os.remove(self.__get_loi_data_file_name(is_temp_file=False)) os.rename(self.__get_loi_data_file_name(is_temp_file=True), self.__get_loi_data_file_name()) pass
[docs] def read_profile_data(self): """ Read z-band profile data """ # return if the file does not exist if not os.path.exists(self.loi_file): return elif ".json" in self.loi_file: data = IOUtils.json_deserialize(self.loi_file) if 'length' not in data.keys(): data['length'] = np.sqrt((data['line_start_x'] - data['line_end_x']) ** 2 + (data['line_start_y'] - data['line_end_y']) ** 2) * self.metadata['pixelsize'] data['line'] = np.asarray( [[data['line_start_x'], data['line_start_y']], [data['line_end_x'], data['line_end_y']]]) # x_pos is 0 until line length(included) x_pos = np.linspace(0, data['length'], data['profiles'].shape[1]) no_frames = len(data['profiles']) time = np.arange(0, no_frames * self.metadata['frametime'], self.metadata['frametime']) if 'profiles_raw' not in data.keys(): data['profiles_raw'] = None return (x_pos, data['profiles'], data['profiles_raw'], data['line'], time, data['linewidth']) else: raise ValueError('LOI-File is not .json')
[docs] def detekt_peaks(self, thres: float = 0.2, min_dist: float = 1.4, width: int = 0.5): """ Detect peaks of z-band intensity profiles Parameters ---------- thres : float Threshold for peak finder min_dist : float Minimal distance of z-band peaks in µm width : float Width of interval around peak for precise determination of peak center, in µm. """ peaks = [] if not self.loi_data: raise ValueError('loi_data is not initialized, create intensity profiles first') self.loi_data['params.detect_peaks'] = {'thresh': thres, 'min_dist': min_dist, 'width': width} min_dist_pixels = int(round(min_dist / self.metadata['pixelsize'], 0)) width_pixels = int(round(width / self.metadata['pixelsize'], 0)) for i, y in enumerate(self.loi_data['y_int']): peaks_i = Utils.peakdetekt(self.loi_data['x_pos'], y, thres=thres, min_dist=min_dist_pixels, width=width_pixels) peaks.append(peaks_i[~np.isnan(peaks_i)]) # save peaks self.loi_data['peaks'] = peaks if self.auto_save: self.store_loi_data()
[docs] def track_z_bands(self, search_range: float = 2, memory_tracking: int = 10, memory_interpol: int = 3, t_range: Union[Tuple[int, int], None] = None, z_range: Union[Tuple[int, int], None] = None, min_length: float = 1, filter_params: Tuple[int, int] = (13, 7)): """ Track peaks of intensity profile over time with Crocker-Grier algorithm from TrackPy package Parameters ---------- search_range : float Search range for tracking algorithm (see documentation of trackpy package) memory_tracking : int Memory for tracking algorithm, in frames (see documentation of trackpy package) memory_interpol : int Memory (max interval) to interpolate gaps in trajectories, in frames (see documentation of trackpy package) t_range : float[int, int] If not None, select time-interval of data, in frames z_range : float[int, int] If not None, select range of z-bands min_length : float Minimal length of z-band trajectory in seconds. Shorter trajectories will not be deleted but set to np.nan. filter_params : tuple(float, float) Parameters window length and poly order of Savitzky-Golay filter to smooth z position """ params_dict = {'params.track_z_bands.search_range': search_range, 'params.track_z_bands.memory_tracking': memory_tracking, 'params.track_z_bands.memory_interpol': memory_interpol, 'params.track_z_bands.t_range': t_range, 'params.track_z_bands.z_range': z_range} self.loi_data.update(params_dict) peaks = self.loi_data['peaks'].copy() # make x,y array peaks = [np.asarray([p, np.zeros_like(p)]).T for p in peaks] # track positions with Crocker-Grier algorithm with contextlib.redirect_stdout(io.StringIO()): trajs_idx = pd.DataFrame( link_iter(peaks, search_range=search_range, memory=memory_tracking, neighbor_strategy='KDTree', link_strategy='auto', adaptive_stop=1, adaptive_step=0.8) )[1].to_numpy() # sort array into z-band trajectories z_pos = np.zeros((len(trajs_idx[0]), len(self.loi_data['time']))) * np.nan for t, idx in enumerate(trajs_idx): for n, j in enumerate(idx): if j < len(trajs_idx[0]): z_pos[j][t] = self.loi_data['peaks'][t][n] # interpolate gaps in trajectories (interpolate with pandas) z_pos = pd.DataFrame(z_pos) z_pos = z_pos.interpolate(limit=memory_interpol, axis=1, method='cubic', limit_area='inside', limit_direction='both') z_pos = z_pos.to_numpy() # set short trajectories (len<min_length) to np.nan len_z_pos = np.count_nonzero(~np.isnan(z_pos), axis=1) z_pos = z_pos[len_z_pos > int(min_length / self.metadata['frametime'])] # set t range and z range if t_range is not None: z_pos = z_pos[:, t_range[0]:t_range[1]] self.loi_data['time'] = self.loi_data['time'][:t_range[1] - t_range[0]] if z_range is not None: z_pos = z_pos[z_range[0]:z_range[1], :] # filter z positions z_pos_filt = z_pos.copy() z_pos_filt = Utils.nan_sav_golay(z_pos_filt, window_length=filter_params[0], polyorder=filter_params[1]) # calculate sarcomere lengths slen = np.diff(z_pos_filt, axis=0) # save data dict_temp = {'z_pos_raw': z_pos, 'z_pos': z_pos_filt, 'slen': slen, 'params.track_z_bands': search_range, 'params.track_z_bands.memory_tracking': memory_tracking, 'params.track_z_bands.memory_interpol': memory_interpol, 'params.track_z_bands.t_range': t_range, 'params.track_z_bands.z_range': z_range, 'params.track_z_bands.min_length': min_length, 'params.track_z_bands.filter_params': filter_params} self.loi_data.update(dict_temp) if self.auto_save: self.store_loi_data()
[docs] def detect_analyze_contractions(self, model: Union[str, None] = None, threshold: float = 0.3, slen_lims: Tuple[float, float] = (1.2, 3), n_sarcomeres_min: int = 4, buffer_frames: int = 3, contr_time_min: float = 0.2, merge_time_max: float = 0.05): """ Detect contractions from contraction time-series using convolutional neural network and analyze beating 1. Predict contractions / contraction state (0 = quiescent, 1 = contracting) from sarcomere lengths (average or percentile) 2. Optimize state by morphological closing and opening (minimal time of contraction cycle=contr_time_min, merge contractions closer than merge_time_max). Remove cycles at very beginning or end (buffer_frames). 3. Characterize state: obtain start times of contr. cycles (start_contractions_frame in frames, start_contr in s), number of cycles (n_contr), label contraction cycles (1 to n_contr), duration of contr. cycles (time_contractions) Parameters ---------- model : str Neural network parameters (.pt file) threshold : float Binary threshold for contraction state (0, 1) after prediction slen_lims : tuple(float, float) Minimal and maximal sarcomere lengths, sarcomere outside interval are set to NaN n_sarcomeres_min : int Minimal number of sarcomeres, if lower, contraction state is set to 0. buffer_frames : int Remove contraction cycles / contractions within "buffer_frames" frames to start and end of time-series contr_time_min : float Minimal time of contraction in seconds. If smaller, contraction is removed. merge_time_max : float Maximal time between two contractions. If smaller, two contractions are merged to one. """ # select weights for convolutional neural network if model == None or model is 'default': model = self.model_dir + 'model_ContractionNet.pt' # detect contractions with convolutional neural network (0 = quiescence, 1 = contraction) contr = self.predict_contractions(self.loi_data['z_pos'], self.loi_data['slen'], model, threshold=threshold) # edit contractions # filter sarcomeres by sarcomere lengths and set to 0 if less sarcomeres than n_sarcomere_min slen = np.diff(self.loi_data['z_pos'], axis=0) slen[(slen < slen_lims[0]) | (slen > slen_lims[1])] = np.nan n_sarcomeres_time = np.count_nonzero(~np.isnan(slen), axis=0) contr[n_sarcomeres_time < n_sarcomeres_min] = 0 # merge very close contractions and remove short contractions structure_closing = np.ones(max(1, int(merge_time_max / self.metadata['frametime']))) structure_opening = np.ones(max(1, int(contr_time_min / self.metadata['frametime']))) contr = binary_opening(binary_closing(contr, structure=structure_closing), structure=structure_opening) # remove incomplete contractions at the beginning and end of time series contr = clear_border(contr, buffer_size=buffer_frames) # analyze contractions start_contr_frame = np.where(np.diff(contr.astype('float32')) > 0.5)[0] start_contr = start_contr_frame * self.metadata['frametime'] labels_contr, n_contr = label(contr) time_contr = np.asarray( [np.count_nonzero(labels_contr == i) for i in np.unique(labels_contr)[1:]]) * \ self.metadata['frametime'] beating_rate = 1 / np.mean(np.diff(start_contr)) beating_rate_variability = np.std(np.diff(start_contr)) # analyze quiescent period quiet = 1 - contr.copy() # remove incomplete quiescent periods at the beginning and end of time series quiet = clear_border(quiet, buffer_size=buffer_frames) start_quiet_frame = np.where(np.diff(quiet.astype('float32')) > 0.5)[0] start_quiet = start_quiet_frame * self.metadata['frametime'] labels_quiet, n_quiet = label(quiet) time_quiet = np.asarray( [np.count_nonzero(labels_quiet == i) for i in np.unique(labels_quiet)[1:]]) * \ self.metadata['frametime'] time_quiet_avg = np.mean(time_quiet) time_contr_avg = np.mean(time_contr) # time of full contraction cycles (equivalent to 1/beating_rate) time_cycle = time_contr[:-1] + time_quiet # store in LOI dict dict_temp = {'params.detect_analyze_contractions.model': model, 'params.detect_analyze_contractions.slen_lims': slen_lims, 'params.detect_analyze_contractions.n_sarcomeres_min': n_sarcomeres_min, 'params.detect_analyze_contractions.buffer_frames': buffer_frames, 'params.detect_analyze_contractions.contr_time_min': contr_time_min, 'params.detect_analyze_contractions.merge_time_max': merge_time_max, 'contr': contr, 'start_contr_frame': start_contr_frame, 'start_contr': start_contr, 'quiet': quiet, 'start_quiet_frame': start_quiet_frame, 'start_quiet': start_quiet, 'labels_contr': labels_contr, 'labels_quiet': labels_quiet, 'time_contr': time_contr, 'time_quiet': time_quiet, 'time_quiet_avg': time_quiet_avg, 'time_contr_avg': time_contr_avg, 'time_cycle': time_cycle, 'n_contr': n_contr, 'n_quiet': n_quiet, 'beating_rate_variability': beating_rate_variability, 'beating_rate': beating_rate, } self.loi_data.update(dict_temp) if self.auto_save: self.store_loi_data()
[docs] def get_trajectories(self, slen_lims: Tuple[float, float] = (1.2, 3.), filter_params_vel: Tuple[int, int] = (13, 5), dilate_contr: float = 0, equ_lims: Tuple[float, float] = (1.5, 2.3)): """ 1. Calculate sarcomere lengths (single and avg) and filter too large and too small values (slen_lims). 2. Calculate sarcomere velocities (single and avg), prior smoothing of s'lengths with Savitzky-Golay filter (filter_params_vel) 3. Calculate sarcomere equilibrium lengths (equ) and delta_slen Parameters ---------- slen_lims : tuple(float, float) Lower and upper limits of sarcomere lengths, values outside are set to nan filter_params_vel : tuple(int, int) Window length and poly order for Savitky-Golay filter for smoothing of delta_slen prior to differentiation to obtain sarcomere velocities dilate_contr : float Dilation time (in seconds) of contraction time-series to shorten time-interval during diastole at which the sarcomere equilibrium lengths are determined equ_lims : tuple(float, float) Lower and upper limits of sarcomere equilibrium lengths, values outside are set to nan """ # calculate sarcomere lengths slen = np.diff(self.loi_data['z_pos'], axis=0) slen[(slen < slen_lims[0]) | (slen > slen_lims[1])] = np.nan slen_avg = np.nanmean(slen, axis=0) n_sarcomeres = slen.shape[0] n_sarcomeres_time = np.count_nonzero(~np.isnan(slen), axis=0) frametime = self.metadata['frametime'] # smooth slen with sav. golay filter and calculate velocity vel = Utils.custom_diff(Utils.nan_sav_golay(slen, filter_params_vel[0], filter_params_vel[1]), frametime) vel_avg = np.nanmean(vel, axis=0) # calculate sarcomere equ length and delta sarcomere length dilate_contr = int(dilate_contr * 2 / self.metadata['frametime']) if dilate_contr == 0: contr_dilated = self.loi_data['contr'] elif dilate_contr > 0: contr_dilated = binary_dilation(self.loi_data['contr'], structure=np.ones(dilate_contr)) else: raise ValueError(f'Parameter dilate_contr={dilate_contr} not valid!') equ = np.asarray([np.nanmedian(s[contr_dilated == 0]) for s in slen]) equ[(equ < equ_lims[0]) | (equ > equ_lims[1])] = np.nan delta_slen = np.asarray([slen[i] - equ[i] for i in range(len(equ))]) delta_slen_avg = np.nanmean(delta_slen, axis=0) if np.count_nonzero(delta_slen) > 0: ratio_nans = np.count_nonzero(np.isnan(delta_slen)) / np.count_nonzero(delta_slen) else: ratio_nans = np.nan # store data in LOI dictionary dict_temp = { 'params.get_sarcomere_trajectories.slen_lims': slen_lims, 'params.get_sarcomere_trajectories.filter_params_vel': filter_params_vel, 'slen': slen, 'slen_avg': slen_avg, 'vel': vel, 'vel_avg': vel_avg, 'n_sarcomeres': n_sarcomeres, 'n_sarcomeres_time': n_sarcomeres_time, 'equ': equ, 'delta_slen': delta_slen, 'delta_slen_avg': delta_slen_avg, 'ratio_nans': ratio_nans} self.loi_data.update(dict_temp) if self.auto_save: self.store_loi_data()
[docs] def analyze_trajectories(self, custom_perc: Union[List[Tuple[float, float]], None] = None): """ Analyze sarcomere single and average trajectories (extrema of sarcomeres contraction and velocity) and sarcomere popping Parameters ---------- custom_perc : List[Tuple[float, float]] or None, optional A list of tuples where each tuple contains two percentages (p0, p1) representing custom percentage points to analyze contraction, e.g., the time from p0% to p1% contraction of individual and average sarcomere length change. If p0<p1, the shortening is analyzed, if p1<p0, the elongation is analyzed. If not provided, defaults to None. """ # initialize arrays # maximal contraction contr_max = np.zeros((len(self.loi_data['delta_slen']), self.loi_data['n_contr'])) * np.nan contr_max_avg = np.zeros(self.loi_data['n_contr']) * np.nan # maximal elongation elong_max = np.zeros_like(contr_max) * np.nan elong_max_avg = np.zeros_like(contr_max_avg) * np.nan # maximal velocity in both directions vel_contr_max = np.zeros_like(contr_max) * np.nan vel_elong_max = np.zeros_like(contr_max) * np.nan vel_contr_max_avg = np.zeros_like(contr_max_avg) * np.nan vel_elong_max_avg = np.zeros_like(contr_max_avg) * np.nan # time to peak (0% to 100%) time_to_peak = np.zeros_like(contr_max) * np.nan time_to_peak_avg = np.zeros_like(contr_max_avg) * np.nan # relaxation time (100% to 0%) time_to_relax = np.zeros_like(contr_max) * np.nan time_to_relax_avg = np.zeros_like(contr_max_avg) * np.nan # custom percentages (p0% to p1%) if custom_perc and not isinstance(custom_perc, list): custom_perc = [custom_perc] elif not custom_perc: custom_perc = [] custom_perc_time = [np.zeros_like(contr_max) for _ in range(len(custom_perc))] custom_perc_time_avg = [np.zeros_like(contr_max_avg) for _ in range(len(custom_perc))] # iterate individual sarcomeres labels_contr = self.loi_data['labels_contr'] for j, delta_j in enumerate(self.loi_data['delta_slen']): vel_j = self.loi_data['vel'][j] for i in range(self.loi_data['n_contr']): # get time-series of one contraction cycle (start to start) delta_i = delta_j[labels_contr == i + 1] vel_i = vel_j[labels_contr == i + 1] # find extrema contr_max[j][i] = np.nanmin(delta_i) elong_max[j][i] = np.nanmax(delta_i) vel_contr_max[j][i] = np.nanmin(vel_i) vel_elong_max[j][i] = np.nanmax(vel_i) # time to peak if np.count_nonzero(np.isnan(delta_i)) == 0: time_to_peak[j][i] = np.nanargmin(delta_i) * self.metadata['frametime'] time_to_relax[j][i] = (len(delta_i) - np.nanargmin(delta_i)) * self.metadata['frametime'] if custom_perc: for k, (p0, p1) in enumerate(custom_perc): if p0 < p1: # shortening delta_shortening_i = delta_i[: np.nanargmin(delta_i)+1] if len(delta_shortening_i) > 0: if p0 == 0: t0, contr0 = 0, 0 else: t0, contr0 = Utils.find_closest(delta_shortening_i, contr_max[j][i] * p0 * 0.01) t1, contr1 = Utils.find_closest(delta_shortening_i, contr_max[j][i] * p1 * 0.01) else: t0, contr0 = np.nan, np.nan t1, contr1 = np.nan, np.nan elif p0 > p1: # elongation delta_elongation_i = delta_i[np.nanargmin(delta_i):] if len(delta_elongation_i) > 0: if p1 == 0: t1, contr1 = len(delta_elongation_i), 0 else: t1, contr1 = Utils.find_closest(delta_elongation_i, contr_max[j][i] * p1 * 0.01) t0, contr0 = Utils.find_closest(delta_elongation_i, contr_max[j][i] * p0 * 0.01) else: t0, contr0 = np.nan, np.nan t1, contr1 = np.nan, np.nan else: raise ValueError('p0 and p1 must be different.') custom_perc_time[k][j, i] = (t1 - t0) * self.metadata['frametime'] # average contraction for i in range(self.loi_data['n_contr']): # get time-series of one contraction cycle (start to start) delta_i = self.loi_data['delta_slen_avg'][labels_contr == i + 1] vel_i = self.loi_data['vel_avg'][labels_contr == i + 1] # find extrema contr_max_avg[i] = np.nanmin(delta_i) elong_max_avg[i] = np.nanmax(delta_i) vel_contr_max_avg[i] = np.nanmin(vel_i) vel_elong_max_avg[i] = np.nanmax(vel_i) # time to peak if np.count_nonzero(np.isnan(delta_i)) == 0: time_to_peak_avg[i] = np.nanargmin(delta_i) * self.metadata['frametime'] time_to_relax_avg[i] = (len(delta_i) - np.nanargmin(delta_i)) * self.metadata['frametime'] if custom_perc: for k, (p0, p1) in enumerate(custom_perc): if p0 < p1: # shortening delta_shortening_i = delta_i[: np.nanargmin(delta_i)+1] if len(delta_shortening_i) > 0: if p0 == 0: t0_avg, contr0_avg = 0, 0 else: t0_avg, contr0_avg = Utils.find_closest(delta_shortening_i, contr_max_avg[i] * p0 * 0.01) t1_avg, contr1_avg = Utils.find_closest(delta_shortening_i, contr_max_avg[i] * p1 * 0.01) else: t0_avg, contr0_avg = np.nan, np.nan t1_avg, contr1_avg = np.nan, np.nan elif p0 > p1: # elongation delta_elongation_i = delta_i[np.nanargmin(delta_i):] if len(delta_elongation_i) > 0: if p1 == 0: t1_avg, contr1_avg = len(delta_elongation_i), 0 else: t1_avg, contr1_avg = Utils.find_closest(delta_elongation_i, contr_max_avg[i] * p1 * 0.01) t0_avg, contr0_avg = Utils.find_closest(delta_elongation_i, contr_max_avg[i] * p0 * 0.01) else: t0_avg, contr0_avg = np.nan, np.nan t1_avg, contr1_avg = np.nan, np.nan else: raise ValueError('p0 and p1 must be different.') custom_perc_time_avg[k][i] = (t1_avg - t0_avg) * self.metadata['frametime'] # calculate surplus motion index self.calculate_surplus_motion_index() # save data in LOI dict self.loi_data.update({'contr_max': contr_max, 'elong_max': elong_max, 'vel_contr_max': vel_contr_max, 'vel_elong_max': vel_elong_max, 'contr_max_avg': contr_max_avg, 'elong_max_avg': elong_max_avg, 'vel_contr_max_avg': vel_contr_max_avg, 'vel_elong_max_avg': vel_elong_max_avg, 'time_to_peak': time_to_peak, 'time_to_peak_avg': time_to_peak_avg, 'time_to_relax': time_to_relax, 'time_to_relax_avg': time_to_relax_avg, 'custom_perc_time': np.asarray(custom_perc_time), 'custom_perc_time_avg': np.asarray(custom_perc_time_avg)}) if self.auto_save: self.store_loi_data()
[docs] def calculate_surplus_motion_index(self): """ Calculate surplus motion index (SMI) for sarcomere motion: average distance traveled by individual sarcomeres contractions divided by distance traveled by sarcomere average """ vel = self.loi_data['vel'] vel_avg = self.loi_data['vel_avg'] contr = self.loi_data['contr'] # label contractions contraction_labels, n_contr = label(contr) # define arrays abs_motion_single = np.zeros((n_contr, vel.shape[0])) * np.nan abs_motion_avg = np.zeros(n_contr) * np.nan # iterate contractions for i, contraction_i in enumerate(np.arange(1, n_contr + 1)): vel_i = vel[:, contraction_labels == contraction_i] vel_avg_i = vel_avg[contraction_labels == contraction_i] abs_motion_single_i = np.sum(np.abs(vel_i), axis=1) * self.metadata['frametime'] abs_motion_avg_i = np.sum(np.abs(vel_avg_i)) * self.metadata['frametime'] abs_motion_single[i] = abs_motion_single_i abs_motion_avg[i] = abs_motion_avg_i # calculate surplus motion index per contraction cycle and store in dict smi = np.nanmean(abs_motion_single) / np.nanmean(abs_motion_avg) self.loi_data['smi'] = smi if self.auto_save: self.store_loi_data()
[docs] def analyze_popping(self, thres_popping: float = 0.25): """ Analyze sarcomere popping - popping if elongation of individual sarcomere is larger than thres_popping Parameters ---------- thres_popping : float Threshold above which sarcomere is identified as popping, in µm beyond equilibrium length """ # identify popping events elong_max = self.loi_data['elong_max'] popping = np.zeros_like(elong_max, dtype='bool') popping[elong_max > thres_popping] = 1 # calculate popping rates rate_contr = np.mean(popping, axis=0) rate_sarcomeres = np.mean(popping, axis=1) rate = np.mean(popping) # dictionary dict_popping = {'popping_rate_contr': rate_contr, 'popping_rate_sarcomeres': rate_sarcomeres, 'popping_rate': rate, 'popping_events': popping, 'params.analyze_popping.thres_popping': thres_popping} popping_events = dict_popping['popping_events'] idxs_popping_s, idxs_popping_c = np.where(popping_events == 1) # inter sarcomere distance of popping events in each contraction cycle cycles = np.unique(idxs_popping_c) dist = [np.diff(idxs_popping_s[idxs_popping_c == t]) for t in cycles] dist = np.concatenate(dist) if dist else [] # time gap between popping events of the same sarcomere sarcomeres = np.unique(idxs_popping_s) tau = [np.diff(idxs_popping_c[idxs_popping_s == s]) for s in sarcomeres] tau = np.concatenate(tau) if tau else [] dist = np.array(dist) # convert dist to numpy array tau = np.array(tau) # convert tau to numpy array p_dist = 1 / np.mean(dist) if dist.size != 0 else 0 p_tau = 1 / np.mean(tau) if tau.size != 0 else 0 if dist.size != 0: kstest_result_dist = kstest(dist, geom(p_dist).cdf) else: kstest_result_dist = (np.nan, np.nan) if tau.size != 0: kstest_result_tau = kstest(tau, geom(p_tau).cdf) else: kstest_result_tau = (np.nan, np.nan) dict_popping.update({'popping_p_dist': p_dist, 'popping_p_tau': p_tau, 'popping_ks_dist_statistic': kstest_result_dist[0], 'popping_ks_dist_pvalue': kstest_result_dist[1], 'popping_ks_tau_statistic': kstest_result_tau[0], 'popping_ks_tau_pvalue': kstest_result_tau[1], 'popping_tau': tau, 'popping_dist': dist}) self.loi_data.update(dict_popping) if self.auto_save: self.store_loi_data()
[docs] def analyze_correlations(self): """ Computes the Pearson correlation coefficients for sarcomere motion patterns (∆SL and V) across different contraction cycles and between sarcomeres within the same cycle to analyze static and stochastic heterogeneity in sarcomere dynamics. It calculates the average serial (r_s) and mutual (r_m) correlation coefficients, and introduces the ratio R of serial to mutual correlations to distinguish between static and stochastic heterogeneity. The function updates the instance's loi_data with correlation data, including the calculated R values, and stores the data if auto_save is enabled. Returns ------- None Updates `self.loi_data` with the following keys: - 'corr_delta_slen' (ndarray or None): Correlation matrix for sarcomere length changes. - 'corr_vel' (ndarray or None): Correlation matrix for sarcomere velocities. - 'corr_delta_slen_serial' (float or NaN): Average serial correlation for sarcomere length changes. - 'corr_delta_slen_mutual' (float or NaN): Average mutual correlation for sarcomere length changes. - 'corr_vel_serial' (float or NaN): Average serial correlation for sarcomere velocities. - 'corr_vel_mutual' (float or NaN): Average mutual correlation for sarcomere velocities. - 'ratio_delta_slen_mutual_serial' (float or NaN): Ratio of mutual to serial correlation for sarcomere length changes. - 'ratio_vel_mutual_serial' (float or NaN): Ratio of mutual to serial correlation for sarcomere velocities. """ if self.loi_data['n_contr'] > 0: time_contr_median = int(np.median(self.loi_data['time_contr']) / self.metadata['frametime']) corr_delta_slen = np.zeros((self.loi_data['n_sarcomeres'], self.loi_data['n_sarcomeres'], self.loi_data['n_contr'], self.loi_data['n_contr'])) * np.nan corr_vel = np.zeros((self.loi_data['n_sarcomeres'], self.loi_data['n_sarcomeres'], self.loi_data['n_contr'], self.loi_data['n_contr'])) * np.nan for i in range(self.loi_data['n_sarcomeres']): for j in range(self.loi_data['n_sarcomeres']): if i >= j: delta_slen_i = self.loi_data['delta_slen'][i] vel_i = self.loi_data['vel'][i] delta_slen_j = self.loi_data['delta_slen'][j] vel_j = self.loi_data['vel'][j] for k, contr_k in enumerate(self.loi_data['start_contr_frame'][:-1]): for l, contr_l in enumerate(self.loi_data['start_contr_frame'][:-1]): if k >= l: if i != j or k != l: corr_delta_slen[i, j, k, l] = \ np.corrcoef(delta_slen_i[contr_k:contr_k + time_contr_median], delta_slen_j[contr_l:contr_l + time_contr_median])[1, 0] corr_vel[i, j, k, l] = np.corrcoef(vel_i[contr_k:contr_k + time_contr_median], vel_j[ contr_l:contr_l + time_contr_median])[1, 0] # serial correlation corr_delta_slen_serial = np.nanmean(np.diagonal(corr_delta_slen)) corr_vel_serial = np.nanmean(np.diagonal(corr_vel)) # mutual correlation corr_delta_slen_mutual = np.nanmean(np.diagonal(corr_delta_slen, axis1=1, axis2=2)) corr_vel_mutual = np.nanmean(np.diagonal(corr_vel, axis1=1, axis2=2)) # ratio R of mutual and serial correlation ratio_delta_slen_mutual_serial = corr_delta_slen_mutual / corr_delta_slen_serial ratio_vel_mutual_serial = corr_vel_mutual / corr_vel_serial else: corr_delta_slen = None corr_vel = None corr_delta_slen_serial = np.nan corr_vel_serial = np.nan corr_delta_slen_mutual = np.nan corr_vel_mutual = np.nan ratio_delta_slen_mutual_serial = np.nan ratio_vel_mutual_serial = np.nan corr_dict = {'corr_delta_slen': corr_delta_slen, 'corr_vel': corr_vel, 'corr_delta_slen_serial': corr_delta_slen_serial, 'corr_delta_slen_mutual': corr_delta_slen_mutual, 'corr_vel_serial': corr_vel_serial, 'corr_vel_mutual': corr_vel_mutual, 'ratio_delta_slen_mutual_serial': ratio_delta_slen_mutual_serial, 'ratio_vel_mutual_serial': ratio_vel_mutual_serial} self.loi_data.update(corr_dict) if self.auto_save: self.store_loi_data()
[docs] def analyze_oscillations(self, min_scale: float = 6, max_scale: float = 180, num_scales: int = 60, wavelet: str = 'morl', freq_thres: float = 2, plot: bool = False): """ Analyze the oscillation frequencies of average and individual sarcomere length changes. Parameters ---------- min_scale : float, optional Minimum scale to use for the wavelet transform (default is 6). max_scale : float, optional Maximum scale to use for the wavelet transform (default is 150). num_scales : int, optional Number of scales to use for the wavelet transform (default is 100). wavelet : str, optional Type of wavelet to use for the wavelet transform (default is 'morl' = Morlet wavelet). freq_thres : float, optional Frequency threshold in Hz for distinguishing low-freq. oscillations at beating rate, and high-freq. oscillations. plot : bool, optional If True, a plot illustrating the analysis is shown. Returns ------- None """ # Analyze oscillation frequencies of average sarcomere length change cfs_avg, frequencies = self.wavelet_analysis_oscillations(self.loi_data['delta_slen_avg'], self.metadata['frametime'], min_scale=min_scale, max_scale=max_scale, num_scales=num_scales, wavelet=wavelet) mask = self.loi_data['contr'] != 0 mag_avg = np.nanmean(np.abs(cfs_avg[:, mask]), axis=1) # Analyze individual sarcomere oscillation frequencies cfs = [] mags = [] for d_i in self.loi_data['delta_slen']: cfs_i, _ = self.wavelet_analysis_oscillations(d_i, self.metadata['frametime'], min_scale=min_scale, max_scale=max_scale, num_scales=num_scales, wavelet=wavelet) mag_i = np.nanmean(np.abs(cfs_i[:, mask]), axis=1) cfs.append(cfs_i) mags.append(mag_i) mag_all_mean, mag_all_std = np.nanmean(mags, axis=0), np.nanstd(mags, axis=0) freq_thres = max(freq_thres, self.loi_data['beating_rate'] * 2.1) # find first peak corresponding to beating rate peak_avg = frequencies[np.argmax(mag_avg)] amp_avg = np.max(mag_avg) mag_all_mean_1 = mag_all_mean.copy() mag_all_mean_1[frequencies > freq_thres] = np.nan peak_1_single = frequencies[np.nanargmax(mag_all_mean_1)] amp_1_single = np.max(mag_all_mean_1) # find second peak corresponding to high-frequency oscillations of individual sarcomeres mag_all_mean_2 = mag_all_mean.copy() mag_all_mean_2[frequencies < freq_thres] = np.nan min_freq = np.min(frequencies[frequencies >= freq_thres]) peak_2_single = frequencies[np.nanargmax(mag_all_mean_2)] amp_2_single = np.max(mag_all_mean_2) if peak_2_single == min_freq: peak_2_single = np.nan amp_2_single = np.nan dict_oscill = {'params.analyze_oscillations.min_scale': min_scale, 'params.analyze_oscillations.max_scale': max_scale, 'params.analyze_oscillations.num_scales': num_scales, 'params.analyze_oscillations.wavelet': wavelet, 'params.analyze_oscillations.freq_thres': freq_thres, 'oscill_frequencies': frequencies, 'oscill_cfs_avg': cfs_avg, 'oscill_cfs': np.asarray(cfs), 'oscill_magnitudes_avg': mag_avg, 'oscill_magnitudes': np.asarray(mags), 'oscill_peak_avg': peak_avg, 'oscill_peak_1_single': peak_1_single, 'oscill_peak_2_single': peak_2_single, 'oscill_amp_avg': amp_avg, 'oscill_amp_1_single': amp_1_single, 'oscill_amp_2_single': amp_2_single} self.loi_data.update(dict_oscill) if self.auto_save: self.store_loi_data() if plot: fig, ax = plt.subplots(figsize=(6, 2.5)) ax.plot(frequencies, mag_avg, c='r', label='Average') ax.plot(frequencies, np.asarray(mags).T, c='k', alpha=0.1) ax.fill_between(frequencies, mag_all_mean - mag_all_std, mag_all_mean + mag_all_std, color='k', alpha=0.25) ax.plot(frequencies, mag_all_mean, c='k', label='Single') ax.axvline(self.loi_data['beating_rate'], c='k', linestyle='--', label='Beating rate') ax.axvspan(0, freq_thres, zorder=-5, color='silver', alpha=0.5) ax.axvline(peak_avg, c='b', linestyle=':', label='Peak avg 1') ax.axvline(peak_2_single, c='g', linestyle=':', color='gold', label='Peak 2') ax.set_xlabel('Frequency [Hz]') ax.set_ylabel('Average magnitude') ax.legend() plt.tight_layout() plt.show()
[docs] def full_analysis_loi(self): """ Full analysis of LOI with default parameters """ auto_save_ = self.auto_save self.auto_save = False self.detekt_peaks() self.track_z_bands() self.detect_analyze_contractions() self.get_trajectories() self.analyze_trajectories() self.analyze_popping() self.auto_save = auto_save_ self.store_loi_data()
[docs] @staticmethod def predict_contractions(z_pos: np.ndarray, slen: np.ndarray, weights: str, threshold: float = 0.33): """Predict contractions from motion of z-bands and sarcomere lengths, then calculate mean state and threshold to get more accurate estimation of contractions Parameters ---------- z_pos : ndarray Time-series of Z-band positions slen : ndarray Time-series of sarcomere lengths weights : str Neural network parameters (.pt file) threshold : float Binary threshold for contraction state (0, 1) """ data = np.concatenate([z_pos, slen]) contr_all = np.asarray([predict_contractions(d, weights)[0] for d in data]) contr_mean = np.nanmean(contr_all, axis=0) return contr_mean > threshold
[docs] @staticmethod def wavelet_analysis_oscillations(data: np.ndarray, frametime: float, min_scale: float = 6, max_scale: float = 150, num_scales: int = 100, wavelet: str = 'morl'): """ Perform a wavelet transform of the data. Parameters ---------- data : array_like 1-D input signal. frametime : float Sampling period of the signal. min_scale : float, optional Minimum scale to use for the wavelet transform (default is 6). max_scale : float, optional Maximum scale to use for the wavelet transform (default is 150). num_scales : int, optional Number of scales to use for the wavelet transform (default is 200). wavelet : str, optional Type of wavelet to use for the wavelet transform (default is 'morl'). Returns ------- cfs : ndarray Continuous wavelet transform coefficients. frequencies : ndarray Corresponding frequencies for each scale. """ # Generate a range of scales that are logarithmically spaced scales = np.geomspace(min_scale, max_scale, num=num_scales) # Perform the wavelet transform cfs, frequencies = cwt(data, scales, wavelet, sampling_period=frametime) return cfs, frequencies