Source code for hermes_rheo.transforms.rheo_analysis

from typing import Tuple, Dict, List, Optional, Any
import numpy as np
import math
import copy
from scipy.fftpack import fft, fftshift

from piblin.transform.abc.measurement_transform import MeasurementTransform
import piblin.data.datasets.abc.split_datasets.one_dimensional_composite_dataset as one_d_composite_dataset


[docs] class RheoAnalysis(MeasurementTransform): """ The RheoAnalysis class, a subclass of MeasurementTransform, is designed for analyzing rheological data, particularly focusing on the transformation and analysis of data collected from rheological experiments involving Arbitrary Waves (e.g. OWChirp). Methods: prepare_chirp_data: Prepares chirp data by adjusting units and switching coordinates. average_until_tw: Applies a filter to the signal by averaging until a specified waiting time. average_over_chirp: Filters the signal by averaging over the length of the chirp only. average_over_all: Filters the signal by averaging over the entire duration, including waiting time. filter_signal: Applies a selected filtering method to the signal. select_best_filter_method: Determines the best filtering method based on the symmetry of the corrected signal. calculate_wave_parameters: Calculates various wave parameters from the wave data used to generate the chirp. moduli_ofr_chirp: Computes viscoelastic properties by applying fast fourier transform from strain and stress data collected during chirp experiments. _apply: Applies transformations to the target dataset based on the specified method. """ def __init__(self, owchirp_waiting_time='before_signal', cutoff_points=0, *args, **kwargs): super().__init__(*args, **kwargs) self.owchirp_waiting_time = owchirp_waiting_time self.cutoff_points = cutoff_points
[docs] @staticmethod def prepare_owchirp_data(dataset: one_d_composite_dataset, time: str, strain: str, stress: str, temperature: str, cutoff_points: float) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Prepares and processes Arbitrary Wave (e.g. chirp) data by switching coordinates and adjusting the units of stress and strain for Fast Fourier Transform (FFT) analysis. It also discards data before a specified cutoff time. Args: dataset (one_d_composite_dataset): The dataset to process containing time, strain, stress, and temperature data. time (str): The name of the time coordinate in the dataset. strain (str): The name of the strain coordinate in the dataset. stress (str): The name of the stress coordinate in the dataset. temperature (str): The name of the temperature coordinate in the dataset. cutoff_points (float): Time (in seconds) to discard data before the cutoff. Returns: Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: A tuple containing the processed arrays: - time values (np.ndarray): The time values starting from the cutoff point onward. - strain values (np.ndarray): The corresponding strain values, converted to unitless if originally in %. - stress values (np.ndarray): The corresponding stress values, converted to Pa if originally in MPa. - temperature values (np.ndarray): The temperature values starting from the cutoff point. Raises: ValueError: - If `cutoff_points` is greater than the maximum value of the dataset time values. - If `strain_values` are not set after switching coordinates to strain. - If `stress_values` are not set after switching coordinates to stress. - If an unsupported unit is found in the stress values. """ # Switch coordinates to time and strain for processing dataset.switch_coordinates(independent_name=time, dependent_name=strain) time_values = dataset.x_values # Validate the cutoff point is less than or equal to the maximum time value if cutoff_points > time_values.max(): raise ValueError( f"Cutoff point {cutoff_points} is greater than the maximum time value {time_values.max()}.") # Find the index corresponding to the cutoff time cutoff_index = (np.abs(time_values - cutoff_points)).argmin() # Adjust strain values: convert to unitless if in percentage if dataset.y_unit == "%": strain_values = copy.deepcopy(dataset.y_values[cutoff_index:]) * 0.01 dataset.y_unit = "unitless" else: strain_values = copy.deepcopy(dataset.y_values[cutoff_index:]) # Check if strain values are set after switching coordinates if strain_values is None: raise ValueError("Strain values not set after switching coordinates to strain.") # Switch coordinates to time and stress dataset.switch_coordinates(independent_name=time, dependent_name=stress) # Adjust stress values: convert MPa to Pa if necessary if dataset.y_unit == "MPa": stress_values = copy.deepcopy(dataset.y_values[cutoff_index:]) * 1e6 dataset.y_unit = "Pa" elif dataset.y_unit == "Pa": stress_values = copy.deepcopy(dataset.y_values[cutoff_index:]) else: raise ValueError(f"Unsupported stress unit '{dataset.y_unit}'. Expected 'Pa' or 'MPa'.") # Check if stress values are set after switching coordinates if stress_values is None: raise ValueError("Stress values not set after switching coordinates to stress.") # Switch coordinates to time and temperature dataset.switch_coordinates(independent_name=time, dependent_name=temperature) temperature_values = copy.deepcopy(dataset.y_values[cutoff_index:]) # Ensure temperature_values is an ndarray temperature_values = np.asarray(temperature_values) # Ensure time_values is an ndarray time_values = np.asarray(time_values[cutoff_index:]) # Return processed time, strain, stress, and temperature values return time_values[cutoff_index:], strain_values, stress_values, temperature_values
[docs] @staticmethod def average_until_tw(t_w: float, time: List[float], signal: List[float]) -> np.ndarray: """ Average until waiting time (t_w): This function subtracts the initial value of the strain/stress during the waiting time. It is usually the best option to correct the strain signal. Args: t_w (float): The waiting time before the chirp starts. time (List[float]): A list of time values corresponding to the signal. signal (List[float]): A list of signal values that need to be corrected. Returns: np.ndarray: An array representing the corrected signal after subtracting the initial average value. Raises: IndexError: If the time list does not contain values less than or equal to t_w. """ # Ensure time and signal are converted to np.ndarray for consistent operations time = np.asarray(time) signal = np.asarray(signal) # Find the last index where time is less than or equal to t_w try: index_tw = np.where(time <= t_w)[0][-1] except IndexError: raise IndexError(f"No values in the time array are less than or equal to t_w = {t_w}") # Calculate the average of the signal up to that index initial_value = np.mean(signal[:index_tw + 1]) # Subtract the average from the entire signal corrected_signal = signal - initial_value return corrected_signal
[docs] @staticmethod def average_over_chirp(t_w: float, time: List[float], signal: List[float]) -> np.ndarray: """ Average over chirp only: this subtracts the average of the signal over the length of the signal alone, excluding the initial waiting time. It can be the best option for the stress signal since the stress can be settling to zero during the initial waiting time. Args: t_w (float): The waiting time before the chirp starts. time (List[float]): A list of time values corresponding to the signal. signal (List[float]): A list of signal values that need to be corrected. Returns: np.ndarray: A NumPy array representing the corrected signal after subtracting the average value post t_w. Raises: IndexError: If the time list does not contain values less than or equal to t_w. """ # Ensure time and signal are converted to np.ndarray for consistent operations time = np.asarray(time) signal = np.asarray(signal) # Find the last index where time is less than or equal to t_w try: index_tw = np.where(time <= t_w)[0][-1] except IndexError: raise IndexError(f"No values in the time array are less than or equal to t_w = {t_w}") # Calculate the average of the signal after t_w avg_signal = np.mean(signal[index_tw + 1:]) # Subtract the average from the entire signal corrected_signal = signal - avg_signal return corrected_signal
[docs] @staticmethod def average_over_all(t_w: float, time: List[float], signal: List[float]) -> np.ndarray: """ Average over all: This subtracts the time-averaged value of the signal before and after the waiting time (t_w). Args: t_w (float): The waiting time before the chirp starts. time (List[float]): A list of time values corresponding to the signal. signal (List[float]): A list of signal values that need to be corrected. Returns: np.ndarray: A NumPy array representing the corrected signal after subtracting the average values before and after t_w from the entire signal. Raises: IndexError: If the time list does not contain values less than or equal to t_w. """ # Ensure time and signal are converted to np.ndarray for consistent operations time = np.asarray(time) signal = np.asarray(signal) # Find the last index where time is less than or equal to t_w try: index_tw = np.where(time <= t_w)[0][-1] except IndexError: raise IndexError(f"No values in the time array are less than or equal to t_w = {t_w}") # Calculate and subtract the averages before and after the index_tw from the entire signal corrected_signal = signal - np.mean(signal[:index_tw + 1]) - np.mean(signal[index_tw + 1:]) return corrected_signal
[docs] def filter_signal(self, t_w: float, time: List[float], signal: List[float], method: str = 'tw') -> Optional[ np.ndarray]: """ Filters the signal based on the specified method and a time threshold. Args: t_w (float): The time threshold used for filtering. time (List[float]): A list of time values corresponding to the signal. signal (List[float]): A list of signal values that need to be filtered. method (str, optional): The method of filtering. Options are 'tw', 'chirp', 'all', or 'none'. Defaults to 'tw'. Returns: Optional[np.ndarray]: A NumPy array representing the filtered signal. Returns None if an invalid method is selected. Raises: IndexError: If the time list does not contain values less than or equal to t_w. """ # Ensure time and signal are converted to np.ndarray for consistent operations time = np.asarray(time) signal = np.asarray(signal) # Choose the filtering method based on the method argument if method == 'tw': corrected_signal = self.average_until_tw(t_w, time, signal) elif method == 'chirp': corrected_signal = self.average_over_chirp(t_w, time, signal) elif method == 'all': corrected_signal = self.average_over_all(t_w, time, signal) elif method == "none" or method == "None": corrected_signal = signal else: print('Invalid method selected. Choose "tw", "chirp", "all", or "none"') return None return corrected_signal
from typing import List, Tuple, Optional import numpy as np
[docs] def select_best_filter_method(self, t_w: float, time: List[float], signal: List[float]) -> Tuple[np.ndarray, str]: """ Iterates over different filtering methods, applies each to the signal, and evaluates the corrected signal. The best filtering method is selected based on the lowest sum of absolute values of the corrected signal, indicating the most symmetrical signal around zero. Args: t_w (float): The time threshold used for filtering. time (List[float]): A list of time values corresponding to the signal. signal (List[float]): A list of signal values to be filtered. Returns: Tuple[np.ndarray, str]: A tuple containing the best corrected signal as a NumPy array and the name of the best method as a string. """ # Ensure time and signal are converted to np.ndarray for consistent operations time = np.asarray(time) signal = np.asarray(signal) methods = ['tw', 'chirp', 'all', 'none'] best_method = None min_sum_abs = float('inf') for method in methods: corrected_signal = self.filter_signal(t_w, time, signal, method) if corrected_signal is not None: sum_abs = np.sum(np.abs(corrected_signal)) if sum_abs < min_sum_abs: min_sum_abs = sum_abs best_method = method best_corrected_signal = self.filter_signal(t_w, time, signal, best_method) return best_corrected_signal, best_method
[docs] def select_best_filter_method_DMA(self, t_w: float, time: List[float], signal: List[float]) -> Tuple[ np.ndarray, str]: """ Iterates over different filtering methods and selects the best one for DMA instruments. The method with the best metric, calculated by combining the average of the start and end signal values and the standard deviation, is selected to correct the bias in the signal. Note: For DMA (Dynamic Mechanical Analysis) instruments, a slightly different algorithm was implemented to correct bias. In this method, we focus on minimizing bias by combining two metrics: 1. The average of the absolute values of the corrected signal at the start and end. 2. The standard deviation of the corrected signal across the entire time range. The method with the smallest combined metric is considered the best. Args: t_w (float): The time threshold used for filtering. time (List[float]): A list of time values corresponding to the signal. signal (List[float]): A list of signal values to be filtered. Returns: Tuple[np.ndarray, str]: A tuple containing the best corrected signal as a NumPy array and the name of the best method as a string. """ # Ensure time and signal are converted to np.ndarray for consistent operations time = np.asarray(time) signal = np.asarray(signal) methods = ['tw', 'chirp', 'all', 'none'] best_method = None min_metric = float('inf') for method in methods: corrected_signal = self.filter_signal(t_w, time, signal, method) if corrected_signal is None: continue # Metric: Sum of absolute values close to zero at the start and end, and standard deviation start_end_avg = (abs(corrected_signal[0]) + abs(corrected_signal[-1])) / 2 std_dev = np.std(corrected_signal) metric = start_end_avg + std_dev # Combine the two metrics if metric < min_metric: min_metric = metric best_method = method best_corrected_signal = self.filter_signal(t_w, time, signal, best_method) return best_corrected_signal, best_method
[docs] @staticmethod def calculate_wave_parameters(wave_data: Dict[str, dict]) -> Tuple[float, float, float, float, float]: """ Calculates wave parameters from the provided wave data. Depending on the number of waves present, it calculates the waiting time (tw), duration of the signal (T), tapering parameter (r), initial frequency (w0), and final frequency (w1) of the wave. Args: wave_data (dict): A dictionary containing wave information. The dictionary is expected to contain keys such as: - 'duration (s)': The duration of each wave in seconds. - 'coef': Coefficients corresponding to the wave parameters. Returns: Tuple[float, float, float, float, float]: A tuple containing: - tw (float): The waiting time before the wave starts. - T (float): The total duration of the signal. - r (float): The tapering parameter - w0 (float): The initial frequency (low frequency) of the wave. - w1 (float): The final frequency (high frequency) of the wave. """ # Extract the wave information duration_values = [wave_data[f'wave {i}']['duration (s)'] for i in range(1, len(wave_data))] coef_values = [wave_data[f'wave {i}']['coef'] for i in range(1, len(wave_data))] if len(duration_values) == 1: tw = 0 T = duration_values[0] r = 0 alpha = coef_values[1][2] w0_alpha = coef_values[1][3] elif len(duration_values) == 2: # Split if tw is in front or back if coef_values[0] == [0.0]: # tw is in front tw = duration_values[0] T = duration_values[1] r = 0 alpha = coef_values[1][2] w0_alpha = coef_values[1][3] else: # tw is in the back tw = duration_values[1] T = duration_values[0] r = 0 alpha = coef_values[1][2] w0_alpha = coef_values[1][3] elif len(duration_values) == 3: tw = 0 T = sum(duration_values) pirT = coef_values[0][1] alpha = coef_values[0][4] w0_alpha = coef_values[0][5] r = round(math.pi / (pirT * T), 4) else: if coef_values[0] == [0.0]: # tw is in front tw = duration_values[0] T = sum(duration_values[1:]) pirT = coef_values[1][1] alpha = coef_values[1][4] w0_alpha = coef_values[1][5] r = round(math.pi / (pirT * T), 4) else: tw = duration_values[3] T = sum(duration_values) - tw pirT = coef_values[0][1] alpha = coef_values[0][4] w0_alpha = coef_values[0][5] r = round(math.pi / (pirT * T), 4) # Calculate initial frequency w0 w0 = round(alpha * w0_alpha, 3) # Calculate high frequency w1 w1 = round(w0 * math.exp(alpha * T), 3) return tw, T, r, w0, w1
[docs] @staticmethod def moduli_owchirp_strain_controlled(strain: np.ndarray, stress: np.ndarray, fs: float, om1: float, om2: float) -> \ Tuple[np.ndarray, ...]: """ Computes the elastic modulus (G'), loss modulus (G''), and complex modulus from strain and stress in the time domain. The Fourier Transform is applied to the strain and stress, and the results are filtered between frequencies om1 and om2. Note: This code is adapted from the MITOWCh MATLAB code by Alessandro Perego (aperego@mmm.com), with specific Python replacements for MATLAB functions like padarray, pow2, nextpow2, and isrow. Args: strain (np.ndarray): The strain data as a NumPy array. stress (np.ndarray): The stress data as a NumPy array. fs (float): The sampling frequency. om1 (float): The lower angular frequency bound. om2 (float): The upper angular frequency bound. Returns: Tuple[np.ndarray, ...]: A tuple containing: - frequency (np.ndarray): The frequency array. - strain_final (np.ndarray): The corrected strain array. - stress_final (np.ndarray): The corrected stress array. - elastic_modulus_final (np.ndarray): The elastic modulus array (G'). - loss_modulus_final (np.ndarray): The loss modulus array (G''). - complex_modulus (np.ndarray): The complex modulus array. """ # Calculate lower and upper frequency bounds f1 = om1 / (2 * np.pi) f2 = om2 / (2 * np.pi) # Find the length of the input signals m = len(strain) # Find the next power of 2 greater than the length of the input signals n = 2 ** (int(np.log2(m)) + 1) # Pad the input data with zeros strain_padded = np.pad(strain, (0, n - m), 'constant') stress_padded = np.pad(stress, (0, n - m), 'constant') # FFT strain_fft = fft(strain_padded, n) stress_ftt = fft(stress_padded, n) # Shifting Signal f = (-n / 2 + np.arange(n)) * (fs / n) # 0-centered frequency range shifted_strain = fftshift(strain_fft) # Rearrange y values shifted_stress = fftshift(stress_ftt) # Rearrange y values # Computing Moduli moduli = shifted_stress[int(n / 2):] / shifted_strain[int(n / 2):] frequency_range = f[int(n / 2):] upper_frequency = frequency_range[frequency_range <= f2] lower_frequency = upper_frequency[upper_frequency >= f1] frequency = 2 * np.pi * lower_frequency # Transpose if W is 1D array if np.size(frequency) == 1: frequency = frequency[np.newaxis] l1 = len(upper_frequency) l2 = len(lower_frequency) elastic_modulus = np.real(moduli) loss_modulus = np.imag(moduli) complex_modulus = moduli[l1 - l2:l1] elastic_modulus_final = elastic_modulus[l1 - l2:l1] loss_modulus_final = loss_modulus[l1 - l2:l1] # Transpose if Ge or Gv is 1D array if np.size(complex_modulus) == 1: complex_modulus = complex_modulus[np.newaxis] if np.size(elastic_modulus_final) == 1: elastic_modulus_final = elastic_modulus_final[np.newaxis] if np.size(loss_modulus_final) == 1: loss_modulus_final = loss_modulus_final[np.newaxis] # Correct for right absolute value stress_corrected = shifted_stress[int(n / 2):] / fs strain_corrected = shifted_strain[int(n / 2):] / fs stress_final = stress_corrected[l1 - l2:l1] strain_final = strain_corrected[l1 - l2:l1] # Transpose if Stress2 or Strain2 is 1D array if np.size(stress_final) == 1: stress_final = stress_final[np.newaxis] if np.size(strain_final) == 1: strain_final = strain_final[np.newaxis] return frequency, strain_final, stress_final, elastic_modulus_final, loss_modulus_final, complex_modulus,
[docs] @staticmethod def moduli_owchirp_stress_controlled(strain_rate: np.ndarray, stress: np.ndarray, fs: float, om1: float, om2: float) -> \ Tuple[np.ndarray, ...]: """ Computes the elastic modulus (G'), loss modulus (G''), and complex modulus from strain and stress in the time domain. The Fourier Transform is applied to the strain rate and stress, and the results are filtered between frequencies om1 and om2. Note: This code is adapted from the MITOWCh MATLAB code by Alessandro Perego (aperego@mmm.com), with specific Python replacements for MATLAB functions like padarray, pow2, nextpow2, and isrow. Important: For stress-controlled instruments, it's preferable to use strain rate instead of strain for calculating the moduli due to the inertia of the instrument. However, the current implementation does not perform inertia correction, and users should be aware of potential bias in the results. Args: strain_rate (np.ndarray): The strain rate data as a NumPy array. stress (np.ndarray): The stress data as a NumPy array. fs (float): The sampling frequency. om1 (float): The lower angular frequency bound. om2 (float): The upper angular frequency bound. Returns: Tuple[np.ndarray, ...]: A tuple containing: - frequency (np.ndarray): The frequency array. - strain_final (np.ndarray): The corrected strain array. - stress_final (np.ndarray): The corrected stress array. - elastic_modulus_final (np.ndarray): The elastic modulus array (G'). - loss_modulus_final (np.ndarray): The loss modulus array (G''). - complex_modulus (np.ndarray): The complex modulus array. """ # Calculate lower and upper frequency bounds f1 = om1 / (2 * np.pi) f2 = om2 / (2 * np.pi) # Find the length of the input signals m = len(strain_rate) # Find the next power of 2 greater than the length of the input signals n = 2 ** (int(np.log2(m)) + 1) # Pad the input data with zeros strain_rate_padded = np.pad(strain_rate, (0, n - m), 'constant') stress_padded = np.pad(stress, (0, n - m), 'constant') # FFT strain_rate_fft = fft(strain_rate_padded, n) stress_ftt = fft(stress_padded, n) # Shifting Signal f = (-n / 2 + np.arange(n)) * (fs / n) # 0-centered frequency range shifted_strain_rate = fftshift(strain_rate_fft) # Rearrange y values shifted_stress = fftshift(stress_ftt) # Rearrange y values # Computing Moduli complex_viscosity = shifted_stress[int(n / 2):] / shifted_strain_rate[int(n / 2):] frequency_range = f[int(n / 2):] upper_frequency = frequency_range[frequency_range <= f2] lower_frequency = upper_frequency[upper_frequency >= f1] frequency = 2 * np.pi * lower_frequency # Transpose if W is 1D array if np.size(frequency) == 1: frequency = frequency[np.newaxis] l1 = len(upper_frequency) l2 = len(lower_frequency) moduli = complex_viscosity * (1j*frequency_range*2*np.pi) elastic_modulus = np.real(moduli) loss_modulus = np.imag(moduli) complex_modulus = moduli[l1 - l2:l1] elastic_modulus_final = elastic_modulus[l1 - l2:l1] loss_modulus_final = loss_modulus[l1 - l2:l1] # Transpose if Ge or Gv is 1D array if np.size(complex_modulus) == 1: complex_modulus = complex_modulus[np.newaxis] if np.size(elastic_modulus_final) == 1: elastic_modulus_final = elastic_modulus_final[np.newaxis] if np.size(loss_modulus_final) == 1: loss_modulus_final = loss_modulus_final[np.newaxis] # Correct for right absolute value stress_corrected = shifted_stress[int(n / 2):] / fs strain_corrected = shifted_strain_rate[int(n / 2):] / fs stress_final = stress_corrected[l1 - l2:l1] strain_final = strain_corrected[l1 - l2:l1] # Transpose if Stress2 or Strain2 is 1D array if np.size(stress_final) == 1: stress_final = stress_final[np.newaxis] if np.size(strain_final) == 1: strain_final = strain_final[np.newaxis] return frequency, strain_final, stress_final, elastic_modulus_final, loss_modulus_final, complex_modulus,
[docs] def _apply(self, target, **kwargs): """ Applies the appropriate data processing pipeline to the given target dataset based on the measurement method. Args: target: The dataset target, which contains details of the measurement method, instrument, and data. kwargs: Additional keyword arguments for processing. Raises: ValueError: If no valid method is found or the method is not supported in the current RheoAnalysis pipeline. ValueError: If an unsupported TA instrument serial number is encountered. Returns: target: The processed dataset with the appropriate corrections and transformations applied. """ method = target.conditions['method'] if 'Arbitrary Wave' in method: method_key = method.split('\t')[-1] # Extract method key, e.g., 'Arbitrary Wave - 2' # Check if the method key indicates a wave step > 1 if 'Arbitrary Wave' in method_key: wave_number = int(method_key.split('-')[-1].strip()) # Extract the wave number # Reset to 'Arbitrary Wave - 1' if wave number > 1 if wave_number > 1: method_key = 'Arbitrary Wave - 1' wave_data = target.details[method_key] strain_applied = wave_data['wave 2']['coef'][0] waiting_time, oscillation_period, taping_parameter, initial_frequency, final_frequency = \ self.calculate_wave_parameters(wave_data) sampling_frequency = wave_data['rate (pts/s)'] original_dataset = target.datasets[0] # Check for instrument serial number and process accordingly if target.details["instrument_serial_number"][:4] == "5343": # TA DHR-3 - stress controlled time, strain, stress, temperature = self.prepare_owchirp_data( original_dataset, 'step time', 'strain', 'stress', 'temperature', self.cutoff_points) strain_filtered, filter_used_strain = self.select_best_filter_method_DMA(waiting_time, time, strain) stress_filtered, filter_used_stress = self.select_best_filter_method_DMA(waiting_time, time, stress) elif target.details["instrument_serial_number"][:4] == "5332": # TA DHR-1 - stress controlled time, strain, stress, temperature = self.prepare_owchirp_data( original_dataset, 'step time', 'strain', 'stress', 'temperature', self.cutoff_points) strain_filtered, filter_used_strain = self.select_best_filter_method_DMA(waiting_time, time, strain) stress_filtered, filter_used_stress = self.select_best_filter_method_DMA(waiting_time, time, stress) elif target.details["instrument_serial_number"][:4] == "4020": # TA DMA time, strain, stress, temperature = self.prepare_owchirp_data( original_dataset, 'step time', 'strain', 'stress', 'temperature', self.cutoff_points) strain_filtered, filter_used_strain = self.select_best_filter_method_DMA(waiting_time, time, strain) stress_filtered, filter_used_stress = self.select_best_filter_method_DMA(waiting_time, time, stress) elif target.details["instrument_serial_number"][:4] == "4010": # TA ARES G2 - strain controlled time, strain, stress, temperature = self.prepare_owchirp_data( original_dataset, 'step time', 'strain', 'stress', 'temperature', self.cutoff_points) strain_filtered, filter_used_strain = self.select_best_filter_method(waiting_time, time, strain) stress_filtered, filter_used_stress = self.select_best_filter_method(waiting_time, time, stress) else: raise ValueError( f"Current version of RheoAnalysis does not support TA instruments with serial number {target.details['instrument_serial_number']}") # Handle the owchirp_waiting_time configurations if self.owchirp_waiting_time == 'before_signal': time_tw = time[time > waiting_time] strain_filtered_tw = strain_filtered[time > waiting_time] stress_filtered_tw = stress_filtered[time > waiting_time] elif self.owchirp_waiting_time == 'after_signal': signal_end_time = oscillation_period signal_start_time = time[0] + 0.1 mask = (time > signal_start_time) & (time < signal_end_time) time_tw = time[mask] strain_filtered_tw = strain_filtered[mask] stress_filtered_tw = stress_filtered[mask] elif self.owchirp_waiting_time == 'all_signal': time_tw = time strain_filtered_tw = strain_filtered stress_filtered_tw = stress_filtered strain_rate = np.diff(strain_filtered_tw) / np.diff(time_tw) # Stress-controlled instruments if target.details["instrument_serial_number"] == "5343" or target.details[ "instrument_serial_number"] == "5332": frequency_radians, fourier_transform_strain, fourier_transform_stress, storage_modulus, loss_modulus, complex_modulus = \ self.moduli_owchirp_stress_controlled( strain_rate, stress_filtered_tw, sampling_frequency, initial_frequency, final_frequency) else: frequency_radians, fourier_transform_strain, fourier_transform_stress, storage_modulus, loss_modulus, complex_modulus = \ self.moduli_owchirp_strain_controlled( strain_filtered_tw, stress_filtered_tw, sampling_frequency, initial_frequency, final_frequency) # Process the frequency-space data and related moduli frequency_hertz = frequency_radians / (2 * np.pi) tan_delta = loss_modulus / storage_modulus complex_viscosity = np.sqrt(storage_modulus ** 2 + loss_modulus ** 2) / frequency_radians complex_modulus_abs = np.abs(complex_modulus) datasets = [] data_arrays = [ frequency_radians, fourier_transform_strain, fourier_transform_stress, storage_modulus, loss_modulus, complex_modulus_abs, frequency_hertz, tan_delta, complex_viscosity] variable_names = ['angular frequency', 'fourier_transform_strain', 'fourier_transform_stress', 'storage modulus', 'loss modulus', 'complex modulus', 'frequency_hz', 'tan(delta)', 'complex viscosity'] variable_units = ['rad/s', 'a.u.', 'a.u ', 'Pa', 'Pa', 'Pa', 'Hz', 'rad', 'Pa s'] # a.u. = arbitrary units dataset_fourier_space = one_d_composite_dataset.OneDimensionalCompositeDataset( data_arrays=data_arrays, data_array_names=variable_names, data_array_units=variable_units, default_independent_name='angular frequency', default_dependent_name='storage modulus', source='Dataset in frequency space') # Time-space dataset time_filtered = time dataset_time_space = one_d_composite_dataset.OneDimensionalCompositeDataset( data_arrays=[time_filtered, strain_filtered, stress_filtered, strain_rate], data_array_names=['adjusted time', 'adjusted strain', 'adjusted stress', 'adjusted strain rate'], data_array_units=['s', 'a.u', 'Pa', '1/s']) original_dataset = one_d_composite_dataset.OneDimensionalCompositeDataset.from_datasets([original_dataset, dataset_time_space,]) # # Masked time-space dataset # dataset_masked_time_space = one_d_composite_dataset.OneDimensionalCompositeDataset( # data_arrays=[time_tw, strain_filtered_tw, stress_filtered_tw], # data_array_names=['time masked', 'strain filtered masked', 'stress filtered masked'], # data_array_units=['s', 'a.u', 'Pa'], # default_independent_name='time masked', # default_dependent_name='strain filtered masked', # source='Dataset in time space') # datasets.append(dataset_fourier_space) datasets.append(original_dataset) # datasets.append(dataset_masked_time_space) target.datasets = datasets # Add details about the processed data target.add_detail('strain_applied', strain_applied) target.add_detail('waiting_time', waiting_time) target.add_detail('taping_parameter', taping_parameter) target.add_detail('filter_used_strain', filter_used_strain) target.add_detail('filter_used_stress', filter_used_stress) target.add_detail('initial_frequency', initial_frequency) target.add_detail('final_frequency', final_frequency) target.add_detail('oscillation_period', oscillation_period) target.add_detail('sampling_frequency', sampling_frequency) elif 'Creep' in method: original_dataset = target.datasets[0] original_dataset.switch_coordinates(independent_name='time', dependent_name='strain') elif 'Amplitude sweep' in method: original_dataset = target.datasets[0] original_dataset.switch_coordinates(independent_name='oscillation strain (cmd)', dependent_name='storage modulus') elif 'Flow ramp' in method: original_dataset = target.datasets[0] original_dataset.switch_coordinates(independent_name='shear rate', dependent_name='viscosity') elif 'Frequency' in method: original_dataset = target.datasets[0] original_dataset.switch_coordinates(independent_name='angular frequency', dependent_name='storage modulus') elif 'Multiwave' in method: original_dataset = target.datasets[0] original_dataset.switch_coordinates(independent_name='angular frequency', dependent_name='storage modulus') elif 'Stress Relaxation' in method: original_dataset = target.datasets[0] original_dataset.switch_coordinates(independent_name='step time', dependent_name='modulus') elif 'Temperature ramp' in method: original_dataset = target.datasets[0] original_dataset.switch_coordinates(independent_name='temperature', dependent_name='storage modulus') elif 'Temperature ramp ISO force' in method: original_dataset = target.datasets[0] original_dataset.switch_coordinates(independent_name='temperature', dependent_name='modulus') elif 'Temperature sweep' in method: original_dataset = target.datasets[0] original_dataset.switch_coordinates(independent_name='temperature', dependent_name='storage modulus') elif 'Frequency DMA' in method: original_dataset = target.datasets[0] original_dataset.switch_coordinates(independent_name='angular frequency', dependent_name='storage modulus') elif 'Master Curve - shift factors' in method: original_dataset = target.datasets[0] original_dataset.switch_coordinates(independent_name='temperature', dependent_name='aT') elif 'Master Curve - master curve' in method: original_dataset = target.datasets[0] original_dataset.switch_coordinates(independent_name='angular frequency', dependent_name='storage modulus') elif 'Recovery' in method: original_dataset = target.datasets[0] original_dataset.switch_coordinates(independent_name='step time', dependent_name='modulus') else: # Raise an error if the method is not supported in the RheoAnalysis pipeline raise ValueError(f"The method '{method}' is currently not supported in the RheoAnalysis pipeline.") return target