Source code for contraction_net.data

import glob
import os

import numpy as np
import torch
from torch.utils.data import Dataset

from .utils import distance_transform


[docs] class DataProcess(Dataset): """ A Dataset class for creating training data objects for ContractionNet training. Parameters ---------- source_dir : Tuple[str, str] Tuple containing paths to the directories of training data [images, labels]. Images should be in .tif format. input_len : int, optional Length of the input sequences (default is 512). normalize : bool, optional Whether to normalize each time-series (default is False). aug_factor : int, optional Factor for image augmentation (default is 10). val_split : float, optional Validation split for training (default is 0.2). noise_amp : float, optional Amplitude of Gaussian noise for image augmentation (default is 0.2). aug_p : float, optional Probability of applying augmentation (default is 0.5). random_offset : float, optional Amplitude of random offset applied to the input sequences (default is 0.25). random_outlier : float, optional Amplitude of random outliers added to the input sequences (default is 0.5). random_drift : Tuple[float, float], optional Parameters for random drift: (frequency, amplitude) (default is (0.01, 0.2)). random_swap : float, optional Probability of randomly swapping the sign of the input sequences (default is 0.5). random_subsampling : Tuple[int, int], optional Range for random subsampling intervals (default is None). Methods ------- __len__() Returns the total number of samples. __getitem__(idx) Generates one sample of data. """ def __init__(self, source_dir, input_len=512, normalize=False, val_split=0.2, aug_factor=10, aug_p=0.5, noise_amp=0.2, random_offset=0.25, random_outlier=0.5, random_drift=(0.01, 0.2), random_swap=0.5, random_subsampling=None): self.source_dir = source_dir self.data = [] self.is_real = [] self.input_len = input_len self.val_split = val_split self.normalize = normalize self.aug_factor = aug_factor self.aug_p = aug_p self.noise_amp = noise_amp self.random_offset = random_offset self.random_drift = random_drift self.random_outlier = random_outlier self.random_subsampling = random_subsampling self.random_swap = random_swap self.mode = 'train' self.__load_and_edit() if self.aug_factor is not None: self.__augment() def __load_and_edit(self): """ Loads and preprocesses the input data files from the source directory. """ files = glob.glob(self.source_dir + '*.txt') print(f'{len(files)} files found') files_input = [f for f in files if 'peaks' not in f and 'contr' not in os.path.basename(f)] for file_i in files_input: input_i = np.loadtxt(file_i)[: self.input_len] if self.normalize: input_i -= np.mean(input_i) input_i /= np.std(input_i) start_end_systole_i = np.loadtxt(file_i[:-4] + '_contr.txt')[: self.input_len] if len(start_end_systole_i) == len(input_i): contr_i = start_end_systole_i else: contr_i = np.zeros_like(input_i) start_end_systole_i = np.clip(start_end_systole_i, a_min=0, a_max=self.input_len) if start_end_systole_i.size != 0: if len(start_end_systole_i.shape) == 1: contr_i[int(start_end_systole_i[0]): int(start_end_systole_i[1])] = 1 elif len(start_end_systole_i.shape) == 2: for start, end in start_end_systole_i.T: contr_i[int(start): int(end)] = 1 if 'sim' in file_i or 'noise' in file_i: is_real_i = False else: is_real_i = True self.data.append([input_i, contr_i, distance_transform(input_i, contr_i)]) self.is_real.append(is_real_i) self.data = np.asarray(self.data) def __augment(self): """ Applies data augmentation techniques to the loaded data. """ _data = self.data.copy() self.data = [] for i, d_i in enumerate(_data): for j in range(self.aug_factor): d_ij = d_i.copy() std_ij = np.std(d_ij[0]) d_ij[0] = d_ij[0] / std_ij d_ij[0] = np.random.choice([-1, 1], p=(1-self.random_swap, self.random_swap)) * d_ij[0] if self.random_subsampling is not None and np.random.binomial(1, self.aug_p): d_ij = np.zeros_like(d_ij) d_ij_short = np.delete(d_i, np.arange(0, 512, np.random.randint(self.random_subsampling[0], self.random_subsampling[1])), axis=1) d_ij[:, :d_ij_short.shape[1]] = d_ij_short if not self.is_real[i] and np.random.binomial(1, self.aug_p): d_ij[0] += np.random.normal(0, self.noise_amp, size=d_i.shape[1]) if np.random.binomial(1, self.aug_p): d_ij[0] += np.random.normal(0, self.random_offset, size=1) if np.random.binomial(1, self.aug_p): n_outliers = np.random.randint(0, 10) t_outliers = np.random.randint(0, self.input_len, n_outliers) d_ij[0, t_outliers] += np.random.normal(0, self.random_outlier, n_outliers) if np.random.binomial(1, self.aug_p): d_ij[0] += self.random_drift[1] * np.cos(self.random_drift[0] * np.arange(self.input_len)) d_ij[0] = d_ij[0] * std_ij self.data.append(d_ij) self.data = np.asarray(self.data)
[docs] def __len__(self): """ Returns the total number of samples. """ return len(self.data)
[docs] def __getitem__(self, idx): """ Generates one sample of data. Parameters ---------- idx : int Index of the sample to retrieve. Returns ------- sample : dict Dictionary containing 'input', 'target', and 'distance' tensors. """ data_torch = torch.from_numpy(self.data[idx]).float() sample = {'input': data_torch[0], 'target': data_torch[1], 'distance': data_torch[2]} return sample