Source code for monai.transforms.signal.array

# Copyright (c) MONAI Consortium
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
A collection of transforms for signal operations.
"""

from __future__ import annotations

import warnings
from collections.abc import Sequence
from typing import Any

import numpy as np
import torch

from monai.config.type_definitions import NdarrayOrTensor
from monai.transforms.transform import RandomizableTransform, Transform
from monai.transforms.utils import check_boundaries, paste, squarepulse
from monai.utils import optional_import
from monai.utils.enums import TransformBackends
from monai.utils.type_conversion import convert_data_type, convert_to_tensor

shift, has_shift = optional_import("scipy.ndimage.interpolation", name="shift")
iirnotch, has_iirnotch = optional_import("scipy.signal", name="iirnotch")
with warnings.catch_warnings():
    warnings.simplefilter("ignore", UserWarning)  # project-monai/monai#5204
    filtfilt, has_filtfilt = optional_import("torchaudio.functional", name="filtfilt")
central_frequency, has_central_frequency = optional_import("pywt", name="central_frequency")
cwt, has_cwt = optional_import("pywt", name="cwt")

__all__ = [
    "SignalRandDrop",
    "SignalRandScale",
    "SignalRandShift",
    "SignalRandAddSine",
    "SignalRandAddSquarePulse",
    "SignalRandAddGaussianNoise",
    "SignalRandAddSinePartial",
    "SignalRandAddSquarePulsePartial",
    "SignalFillEmpty",
    "SignalRemoveFrequency",
    "SignalContinuousWavelet",
]


[docs] class SignalRandShift(RandomizableTransform): """ Apply a random shift on a signal """ backend = [TransformBackends.NUMPY, TransformBackends.TORCH]
[docs] def __init__( self, mode: str | None = "wrap", filling: float | None = 0.0, boundaries: Sequence[float] = (-1.0, 1.0) ) -> None: """ Args: mode: define how the extension of the input array is done beyond its boundaries, see for more details : https://docs.scipy.org/doc/scipy/reference/generated/scipy.ndimage.shift.html. filling: value to fill past edges of input if mode is ‘constant’. Default is 0.0. see for mode details : https://docs.scipy.org/doc/scipy/reference/generated/scipy.ndimage.shift.html. boundaries: list defining lower and upper boundaries for the signal shift, default : ``[-1.0, 1.0]`` """ super().__init__() check_boundaries(boundaries) self.filling = filling self.mode = mode self.boundaries = boundaries
[docs] def __call__(self, signal: NdarrayOrTensor) -> NdarrayOrTensor: """ Args: signal: input 1 dimension signal to be shifted """ self.randomize(None) self.magnitude = self.R.uniform(low=self.boundaries[0], high=self.boundaries[1]) length = signal.shape[1] shift_idx = round(self.magnitude * length) sig = convert_data_type(signal, np.ndarray)[0] signal = convert_to_tensor(shift(input=sig, mode=self.mode, shift=shift_idx, cval=self.filling)) return signal
[docs] class SignalRandScale(RandomizableTransform): """ Apply a random rescaling on a signal """ backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
[docs] def __init__(self, boundaries: Sequence[float] = (-1.0, 1.0)) -> None: """ Args: boundaries: list defining lower and upper boundaries for the signal scaling, default : ``[-1.0, 1.0]`` """ super().__init__() check_boundaries(boundaries) self.boundaries = boundaries
[docs] def __call__(self, signal: NdarrayOrTensor) -> NdarrayOrTensor: """ Args: signal: input 1 dimension signal to be scaled """ self.randomize(None) self.magnitude = self.R.uniform(low=self.boundaries[0], high=self.boundaries[1]) signal = convert_to_tensor(self.magnitude * signal) return signal
[docs] class SignalRandDrop(RandomizableTransform): """ Randomly drop a portion of a signal """ backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
[docs] def __init__(self, boundaries: Sequence[float] = (0.0, 1.0)) -> None: """ Args: boundaries: list defining lower and upper boundaries for the signal drop, lower and upper values need to be positive default : ``[0.0, 1.0]`` """ super().__init__() check_boundaries(boundaries) self.boundaries = boundaries
[docs] def __call__(self, signal: NdarrayOrTensor) -> NdarrayOrTensor: """ Args: signal: input 1 dimension signal to be dropped """ self.randomize(None) self.magnitude = self.R.uniform(low=self.boundaries[0], high=self.boundaries[1]) length = signal.shape[-1] mask = torch.zeros(round(self.magnitude * length)) trange = torch.arange(length) loc = trange[torch.randint(0, trange.size(0), (1,))] signal = convert_to_tensor(paste(signal, mask, (loc,))) return signal
[docs] class SignalRandAddSine(RandomizableTransform): """ Add a random sinusoidal signal to the input signal """ backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
[docs] def __init__(self, boundaries: Sequence[float] = (0.1, 0.3), frequencies: Sequence[float] = (0.001, 0.02)) -> None: """ Args: boundaries: list defining lower and upper boundaries for the sinusoidal magnitude, lower and upper values need to be positive ,default : ``[0.1, 0.3]`` frequencies: list defining lower and upper frequencies for sinusoidal signal generation ,default : ``[0.001, 0.02]`` """ super().__init__() check_boundaries(boundaries) self.boundaries = boundaries self.frequencies = frequencies
[docs] def __call__(self, signal: NdarrayOrTensor) -> NdarrayOrTensor: """ Args: signal: input 1 dimension signal to which sinusoidal signal will be added """ self.randomize(None) self.magnitude = self.R.uniform(low=self.boundaries[0], high=self.boundaries[1]) self.freqs = self.R.uniform(low=self.frequencies[0], high=self.frequencies[1]) length = signal.shape[1] time = np.arange(0, length, 1) data = convert_to_tensor(self.freqs * time) sine = self.magnitude * torch.sin(data) signal = convert_to_tensor(signal) + sine return signal
[docs] class SignalRandAddSquarePulse(RandomizableTransform): """ Add a random square pulse signal to the input signal """ backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
[docs] def __init__(self, boundaries: Sequence[float] = (0.01, 0.2), frequencies: Sequence[float] = (0.001, 0.02)) -> None: """ Args: boundaries: list defining lower and upper boundaries for the square pulse magnitude, lower and upper values need to be positive , default : ``[0.01, 0.2]`` frequencies: list defining lower and upper frequencies for the square pulse signal generation , default : ``[0.001, 0.02]`` """ super().__init__() check_boundaries(boundaries) self.boundaries = boundaries self.frequencies = frequencies
[docs] def __call__(self, signal: NdarrayOrTensor) -> NdarrayOrTensor: """ Args: signal: input 1 dimension signal to which square pulse will be added """ self.randomize(None) self.magnitude = self.R.uniform(low=self.boundaries[0], high=self.boundaries[1]) self.freqs = self.R.uniform(low=self.frequencies[0], high=self.frequencies[1]) length = signal.shape[1] time = np.arange(0, length, 1) squaredpulse = self.magnitude * squarepulse(self.freqs * time) signal = convert_to_tensor(signal) + squaredpulse return signal
[docs] class SignalRandAddSinePartial(RandomizableTransform): """ Add a random partial sinusoidal signal to the input signal """ backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
[docs] def __init__( self, boundaries: Sequence[float] = (0.1, 0.3), frequencies: Sequence[float] = (0.001, 0.02), fraction: Sequence[float] = (0.01, 0.2), ) -> None: """ Args: boundaries: list defining lower and upper boundaries for the sinusoidal magnitude, lower and upper values need to be positive , default : ``[0.1, 0.3]`` frequencies: list defining lower and upper frequencies for sinusoidal signal generation , default : ``[0.001, 0.02]`` fraction: list defining lower and upper boundaries for partial signal generation default : ``[0.01, 0.2]`` """ super().__init__() check_boundaries(boundaries) self.boundaries = boundaries self.frequencies = frequencies self.fraction = fraction
[docs] def __call__(self, signal: NdarrayOrTensor) -> NdarrayOrTensor: """ Args: signal: input 1 dimension signal to which a partial sinusoidal signal will be added """ self.randomize(None) self.magnitude = self.R.uniform(low=self.boundaries[0], high=self.boundaries[1]) self.fracs = self.R.uniform(low=self.fraction[0], high=self.fraction[1]) self.freqs = self.R.uniform(low=self.frequencies[0], high=self.frequencies[1]) length = signal.shape[-1] time_partial = np.arange(0, round(self.fracs * length), 1) data = convert_to_tensor(self.freqs * time_partial) sine_partial = self.magnitude * torch.sin(data) loc = np.random.choice(range(length)) signal = paste(signal, sine_partial, (loc,)) return signal
[docs] class SignalRandAddGaussianNoise(RandomizableTransform): """ Add a random gaussian noise to the input signal """ backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
[docs] def __init__(self, boundaries: Sequence[float] = (0.001, 0.02)) -> None: """ Args: boundaries: list defining lower and upper boundaries for the signal magnitude, default : ``[0.001,0.02]`` """ super().__init__() check_boundaries(boundaries) self.boundaries = boundaries
[docs] def __call__(self, signal: NdarrayOrTensor) -> NdarrayOrTensor: """ Args: signal: input 1 dimension signal to which gaussian noise will be added """ self.randomize(None) self.magnitude = self.R.uniform(low=self.boundaries[0], high=self.boundaries[1]) length = signal.shape[1] gaussiannoise = self.magnitude * torch.randn(length) signal = convert_to_tensor(signal) + gaussiannoise return signal
[docs] class SignalRandAddSquarePulsePartial(RandomizableTransform): """ Add a random partial square pulse to a signal """ backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
[docs] def __init__( self, boundaries: Sequence[float] = (0.01, 0.2), frequencies: Sequence[float] = (0.001, 0.02), fraction: Sequence[float] = (0.01, 0.2), ) -> None: """ Args: boundaries: list defining lower and upper boundaries for the square pulse magnitude, lower and upper values need to be positive , default : ``[0.01, 0.2]`` frequencies: list defining lower and upper frequencies for square pulse signal generation example : ``[0.001, 0.02]`` fraction: list defining lower and upper boundaries for partial square pulse generation default: ``[0.01, 0.2]`` """ super().__init__() check_boundaries(boundaries) self.boundaries = boundaries self.frequencies = frequencies self.fraction = fraction
[docs] def __call__(self, signal: NdarrayOrTensor) -> NdarrayOrTensor: """ Args: signal: input 1 dimension signal to which a partial square pulse will be added """ self.randomize(None) self.magnitude = self.R.uniform(low=self.boundaries[0], high=self.boundaries[1]) self.fracs = self.R.uniform(low=self.fraction[0], high=self.fraction[1]) self.freqs = self.R.uniform(low=self.frequencies[0], high=self.frequencies[1]) length = signal.shape[-1] time_partial = np.arange(0, round(self.fracs * length), 1) squaredpulse_partial = self.magnitude * squarepulse(self.freqs * time_partial) loc = np.random.choice(range(length)) signal = paste(signal, squaredpulse_partial, (loc,)) return signal
[docs] class SignalFillEmpty(Transform): """ replace empty part of a signal (NaN) """ backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
[docs] def __init__(self, replacement: float = 0.0) -> None: """ Args: replacement: value to replace nan items in signal """ super().__init__() self.replacement = replacement
[docs] def __call__(self, signal: NdarrayOrTensor) -> NdarrayOrTensor: """ Args: signal: signal to be filled """ signal = torch.nan_to_num(convert_to_tensor(signal, track_meta=True), nan=self.replacement) return signal
[docs] class SignalRemoveFrequency(Transform): """ Remove a frequency from a signal """ backend = [TransformBackends.TORCH, TransformBackends.NUMPY]
[docs] def __init__( self, frequency: float | None = None, quality_factor: float | None = None, sampling_freq: float | None = None ) -> None: """ Args: frequency: frequency to be removed from the signal quality_factor: quality factor for notch filter see : https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.iirnotch.html sampling_freq: sampling frequency of the input signal """ super().__init__() self.frequency = frequency self.quality_factor = quality_factor self.sampling_freq = sampling_freq
[docs] def __call__(self, signal: np.ndarray) -> Any: """ Args: signal: signal to be frequency removed """ b_notch, a_notch = convert_to_tensor( iirnotch(self.frequency, self.quality_factor, self.sampling_freq), dtype=torch.float ) y_notched = filtfilt(convert_to_tensor(signal), a_notch, b_notch) return y_notched
[docs] class SignalContinuousWavelet(Transform): """ Generate continuous wavelet transform of a signal """ backend = [TransformBackends.NUMPY]
[docs] def __init__(self, type: str = "mexh", length: float = 125.0, frequency: float = 500.0) -> None: """ Args: type: mother wavelet type. Available options are: {``"mexh"``, ``"morl"``, ``"cmorB-C"``, , ``"gausP"``} see : https://pywavelets.readthedocs.io/en/latest/ref/cwt.html length: expected length, default ``125.0`` frequency: signal frequency, default ``500.0`` """ super().__init__() self.frequency = frequency self.length = length self.type = type
[docs] def __call__(self, signal: np.ndarray) -> Any: """ Args: signal: signal for which to generate continuous wavelet transform """ mother_wavelet = self.type spread = np.arange(1, self.length + 1, 1) scales = central_frequency(mother_wavelet) * self.frequency / spread coeffs, _ = cwt(signal, scales, mother_wavelet, 1.0 / self.frequency) coeffs = np.transpose(coeffs, [1, 0, 2]) return coeffs