# Copyright 2019 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import os
import sys
import warnings
import numpy as np
from scipy.signal import resample
from sklearn import preprocessing
import lale.docstrings
import lale.operators
sys.path.append(os.getcwd())
seizure_type_data = collections.namedtuple(
"seizure_type_data", ["seizure_type", "data"]
)
# Some of the classes and modules have been taken from https://github.com/MichaelHills/seizure-detection
[docs]class Slice:
"""
Job: Take a slice of the data on the last axis.
Note: Slice(x, y) works like a normal python slice, that is x to (y-1) will be taken.
"""
def __init__(self, start, stop):
self.start = start
self.stop = stop + 1
[docs] def get_name(self):
return f"slice{self.start:d}-{self.stop:d}"
[docs] def apply(self, data):
s = [
slice(None),
] * data.ndim
s[-1] = slice(self.start, self.stop)
return data[s]
[docs]class Magnitude:
"""
Job: Take magnitudes of Complex data
"""
[docs] def get_name(self):
return "mag"
[docs] def apply(self, data):
return np.absolute(data)
[docs]class Log10:
"""
Apply Log10
"""
[docs] def get_name(self):
return "log10"
[docs] def apply(self, data):
# 10.0 * log10(re * re + im * im)
indices = np.where(data <= 0)
data[indices] = np.max(data)
data[indices] = np.min(data) * 0.1
return np.log10(data)
[docs]class Pipeline:
"""
A Pipeline is an object representing the data transformations to make
on the input data, finally outputting extracted features.
pipeline: List of transforms to apply one by one to the input data
"""
def __init__(self, pipeline):
self.transforms = pipeline
names = [t.get_name() for t in self.transforms]
self.name = "empty" if len(names) == 0 else "_".join(names)
[docs] def get_name(self):
return self.name
[docs] def apply(self, data):
for transform in self.transforms:
data = transform.apply(data)
return data
[docs]class FFT:
"""
Apply Fast Fourier Transform to the last axis.
"""
[docs] def get_name(self):
return "fft"
[docs] def apply(self, data):
axis = data.ndim - 1
return np.fft.rfft(data, axis=axis)
[docs]class Resample:
"""
Resample time-series data.
"""
def __init__(self, sample_rate):
self.f = sample_rate
[docs] def get_name(self):
return f"resample{self.f:d}"
[docs] def apply(self, data):
axis = data.ndim - 1
if data.shape[-1] > self.f:
return resample(data, self.f, axis=axis)
return data
[docs]class StandardizeLast:
"""
Scale across the last axis.
"""
[docs] def get_name(self):
return "standardize-last"
[docs] def apply(self, data):
return preprocessing.scale(data, axis=data.ndim - 1)
[docs]class StandardizeFirst:
"""
Scale across the first axis.
"""
[docs] def get_name(self):
return "standardize-first"
[docs] def apply(self, data):
return preprocessing.scale(data, axis=0)
[docs]class CorrelationMatrix:
"""
Calculate correlation coefficients matrix across all EEG channels.
"""
[docs] def get_name(self):
return "correlation-matrix"
[docs] def apply(self, data):
return np.corrcoef(data)
[docs]class Eigenvalues:
"""
Take eigenvalues of a matrix, and sort them by magnitude in order to
make them useful as features (as they have no inherent order).
"""
[docs] def get_name(self):
return "eigenvalues"
[docs] def apply(self, data):
w, _v = np.linalg.eig(data)
w = np.absolute(w)
w.sort()
return w
# Take the upper right triangle of a matrix
[docs]def upper_right_triangle(matrix):
accum = []
for i in range(matrix.shape[0]):
for j in range(i + 1, matrix.shape[1]):
accum.append(matrix[i, j])
return np.array(accum)
[docs]class FreqCorrelation:
"""
Correlation in the frequency domain. First take FFT with (start, end) slice options,
then calculate correlation co-efficients on the FFT output, followed by calculating
eigenvalues on the correlation co-efficients matrix.
The output features are (fft, upper_right_diagonal(correlation_coefficients), eigenvalues)
Features can be selected/omitted using the constructor arguments.
"""
def __init__(
self, start, end, scale_option, with_fft=False, with_corr=True, with_eigen=True
):
self.start = start
self.end = end
self.scale_option = scale_option
self.with_fft = with_fft
self.with_corr = with_corr
self.with_eigen = with_eigen
assert scale_option in ("first_axis", "last_axis", "none")
assert with_corr or with_eigen
[docs] def get_name(self):
selections = []
if not self.with_corr:
selections.append("nocorr")
if not self.with_eigen:
selections.append("noeig")
if len(selections) > 0:
selection_str = "-" + "-".join(selections)
else:
selection_str = ""
return f"freq-correlation-{self.start:d}-{self.end:d}-{'withfft' if self.with_fft else 'nofft'}-{self.scale_option}{selection_str}"
[docs] def apply(self, data):
data1 = FFT().apply(data)
data1 = Slice(self.start, self.end).apply(data1)
data1 = Magnitude().apply(data1)
data1 = Log10().apply(data1)
data2 = data1
if self.scale_option == "first_axis":
data2 = StandardizeFirst().apply(data2)
elif self.scale_option == "last_axis":
data2 = StandardizeLast().apply(data2)
data2 = CorrelationMatrix().apply(data2)
out = []
if self.with_corr:
ur = upper_right_triangle(data2)
out.append(ur)
if self.with_eigen:
w = Eigenvalues().apply(data2)
out.append(w)
if self.with_fft:
data1 = data1.ravel()
out.append(data1)
for d in out:
assert d.ndim == 1
return np.concatenate(out, axis=0)
[docs]class TimeCorrelation:
"""
Correlation in the time domain. First downsample the data, then calculate correlation co-efficients
followed by calculating eigenvalues on the correlation co-efficients matrix.
The output features are (upper_right_diagonal(correlation_coefficients), eigenvalues)
Features can be selected/omitted using the constructor arguments.
"""
def __init__(self, max_hz, scale_option, with_corr=True, with_eigen=True):
self.max_hz = max_hz
self.scale_option = scale_option
self.with_corr = with_corr
self.with_eigen = with_eigen
assert scale_option in ("first_axis", "last_axis", "none")
assert with_corr or with_eigen
[docs] def get_name(self):
selections = []
if not self.with_corr:
selections.append("nocorr")
if not self.with_eigen:
selections.append("noeig")
if len(selections) > 0:
selection_str = "-" + "-".join(selections)
else:
selection_str = ""
return f"time-correlation-r{self.max_hz:d}-{self.scale_option}{selection_str}"
[docs] def apply(self, data):
# so that correlation matrix calculation doesn't crash
for ch in data:
if np.all(ch == 0.0):
ch[-1] += 0.00001
data1 = data
if data1.shape[1] > self.max_hz:
data1 = Resample(self.max_hz).apply(data1)
if self.scale_option == "first_axis":
data1 = StandardizeFirst().apply(data1)
elif self.scale_option == "last_axis":
data1 = StandardizeLast().apply(data1)
data1 = CorrelationMatrix().apply(data1)
out = []
if self.with_corr:
ur = upper_right_triangle(data1)
out.append(ur)
if self.with_eigen:
w = Eigenvalues().apply(data1)
out.append(w)
for d in out:
assert d.ndim == 1
return np.concatenate(out, axis=0)
[docs]class FFTWithTimeFreqCorrelation:
"""
Combines FFT with time and frequency correlation, taking both correlation coefficients and eigenvalues.
"""
def __init__(self, start, end, max_hz, scale_option):
self.start = start
self.end = end
self.max_hz = max_hz
self.scale_option = scale_option
assert scale_option in ("first_axis", "last_axis", "none")
[docs] def get_name(self):
return f"fft-with-time-freq-corr-{self.start:d}-{self.end:d}-r{self.max_hz:d}-{self.scale_option}"
[docs] def apply(self, data):
data1 = TimeCorrelation(self.max_hz, self.scale_option).apply(data)
data2 = FreqCorrelation(
self.start, self.end, self.scale_option, with_fft=True
).apply(data)
assert data1.ndim == data2.ndim
return np.concatenate((data1, data2), axis=data1.ndim - 1)
class _TimeFreqEigenVectorsImpl:
def __init__(
self,
window_length=1,
window_step=0.5,
fft_min_freq=1,
fft_max_freq=24,
sampling_frequency=250,
):
self.window_length = window_length
self.window_step = window_step
self.fft_min_freq = fft_min_freq
self.fft_max_freq = fft_max_freq
self.sampling_frequency = sampling_frequency
def transform(self, X, y=None):
warnings.filterwarnings("ignore")
pipeline = Pipeline(
[
FFTWithTimeFreqCorrelation(
self.fft_min_freq,
self.fft_max_freq,
self.sampling_frequency,
"first_axis",
)
]
)
X_transformed = []
y_transformed = np.empty((0))
self.end_index_list = (
[]
) # This is the list of end indices for samples generated per seizure
# The transformation map is just a list of indices corresponding to the last sample generated by each time-series.
for i in range(len(X)): # pylint:disable=consider-using-enumerate
seizure_data = X[i]
start, step = 0, int(np.floor(self.window_step * self.sampling_frequency))
stop = start + int(np.floor(self.window_length * self.sampling_frequency))
fft_data = []
while stop < seizure_data.shape[1]:
signal_window = seizure_data[:, start:stop]
fft_window = pipeline.apply(signal_window)
fft_data.append(fft_window)
start, stop = start + step, stop + step
X_transformed.extend(fft_data)
if y is not None:
seizure_label = y[i]
labels_for_all_seizure_samples = np.full(len(fft_data), seizure_label)
y_transformed = np.hstack(
(y_transformed, labels_for_all_seizure_samples)
)
previous_element = self.end_index_list[i - 1] if (i - 1) >= 0 else 0
self.end_index_list.append(previous_element + len(fft_data))
X_transformed = np.array(X_transformed)
if y is None:
y_transformed = None
return X_transformed, y_transformed
def get_transform_meta_output(self):
if self.end_index_list is not None:
return {"end_index_list": self.end_index_list}
else:
raise ValueError(
"Must call transform before trying to access its meta output."
)
_hyperparams_schema = {
"description": "TODO",
"allOf": [
{
"type": "object",
"additionalProperties": False,
"required": [
"window_length",
"window_step",
"fft_min_freq",
"fft_max_freq",
"sampling_frequency",
],
"relevantToOptimizer": ["window_length", "window_step", "fft_max_freq"],
"properties": {
"window_length": {
"type": "number",
"default": 1,
"description": "TODO",
"minimumForOptimizer": 0.25,
"maximumForOptimizer": 2,
"distribution": "uniform",
},
"window_step": {
"type": "number",
"default": 0.5,
"description": "TODO",
"minimumForOptimizer": 0.25,
"maximumForOptimizer": 1, # TODO: This is $data->window_length once $data is implemented
"distribution": "uniform",
},
"fft_min_freq": {
"type": "integer",
"default": 1,
"description": "TODO",
},
"fft_max_freq": {
"type": "integer",
"default": 24,
"description": "TODO",
"minimumForOptimizer": 2,
"maximumForOptimizer": 30, # TODO: This is $data->sampling_frequency/2 once $data is implemented
"distribution": "uniform",
},
"sampling_frequency": {
"type": "integer",
"default": 250,
"description": "TODO",
},
},
}
# TODO: Any constraints on hyper-parameter combinations?
],
}
_input_transform_schema = {
"description": "Input format for data passed to the transform method.",
"type": "object",
"required": ["X", "y"],
"properties": {
"X": {
"type": "array",
"items": {
"type": "array",
"items": {"type": "array", "items": {"type": "number"}},
},
"description": "The input data to complete.",
},
"y": {
"type": "array",
"items": {"anyOf": [{"type": "integer"}, {"type": "string"}]},
},
},
}
_output_transform_schema = {
"description": "The input data to complete.",
"type": "array", # This is actually a tuple of X and y
"items": {"type": "array"},
}
_combined_schemas = {
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Combined schema for expected data and hyperparameters.",
"type": "object",
"tags": {"pre": [], "op": ["transformer"], "post": []},
"properties": {
"hyperparams": _hyperparams_schema,
"input_transform": _input_transform_schema,
"output_transform": _output_transform_schema,
},
}
TimeFreqEigenVectors = lale.operators.make_operator(
_TimeFreqEigenVectorsImpl, _combined_schemas
)
lale.docstrings.set_docstrings(TimeFreqEigenVectors)