
import torch 
from import Dataset, DataLoader
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib import rc
from sklearn.model_selection import train_test_split
from multiprocessing import cpu_count
from torch import nn
import torch.nn.functional as F
from arff2pandas import a2p
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
import glob

sns.set(style = 'whitegrid', palette = 'muted', font_scale = 1.2)
HAPPY_COLORS_PALETTE = ["#01BEFE", "#FFDD00", "#FF7D00", "#FF006D", "#ADFF02", "#8F00FF"]


Global seed set to 42


Importando los Datos

with open('ECG5000/ECG5000_TRAIN.arff') as f:
    train = a2p.load(f)

5 rows × 141 columns

with open('ECG5000/ECG5000_TEST.arff') as f:
    test = a2p.load(f)

5 rows × 141 columns

Vamos a plantear esto como un problema de Detección de Anomalías. La detección de anomalías es un problema no supervisado, por lo tanto no depende de las etiquetas. Por lo tanto para poder tener más datos para que el modelo aprenda vamos a unir los datasets de train y test y mezclarlo.

df = train.append(test)
df = df.sample(frac=1) #this is a shuffle
train.shape, test.shape, df.shape
((500, 141), (4500, 141), (5000, 141))

Haciendo mis averiguaciones (mi hermano está terminando su internado en Medicina), obtuve lo siguiente:

  • El ECG mide la señal electrica del corazón (perdón si no utilizo la terminología apropiada) para detectar afecciones cardiacas.
  • El ECG es uno de los exámenes más complejos de poder interpretar.
  • Se requiere de médicos cardiólogos altamente especializados para poder leerlos de buena manera. E incluso a ellos les puede costar.

Un ECG se ve así (Fuente: Mi hermano)

  • La data utilizada en realidad corresponde a 500 latidos extraídos de alreadedor de 20 horas de ECG. Cada uno de estos latidos fueron clasificados de la siguiente manera:

  • Normal (N)
  • R on T Premature Ventricular Contraction (R-on-T PVC) Extrasistole
  • Premature Ventricular Contraction (PVC)
  • Supra Ventricular Premature or Ectopic Beat (SP or EB)
  • Unclassified Beat (UB)

En caso de que les interese reproducir el ejercicio, pueden obtener los datos desde acá.

Exploración de los Datos

Como se puede ver los nombres de las columnas son un poco extraños. Al parecer esto se debe por el tipo de dato extraño del cual estamos importando. Para facilitar la manipulación de los datos vamos a cambiar el nombre de nuestro vector de Labels:

new_columns = df.columns.tolist()
new_columns[-1] = 'target'
df.columns = new_columns
Index(['att1@NUMERIC', 'att2@NUMERIC', 'att3@NUMERIC', 'att4@NUMERIC',
       'att5@NUMERIC', 'att6@NUMERIC', 'att7@NUMERIC', 'att8@NUMERIC',
       'att9@NUMERIC', 'att10@NUMERIC',
       'att132@NUMERIC', 'att133@NUMERIC', 'att134@NUMERIC', 'att135@NUMERIC',
       'att136@NUMERIC', 'att137@NUMERIC', 'att138@NUMERIC', 'att139@NUMERIC',
       'att140@NUMERIC', 'target'],
      dtype='object', length=141)
class_names = ['Normal','R on T','PVC','SP','UB']

ax = = 'bar')


Lo primero que se puede apreciar es que las anomalías se dan en bastante menor medida que los datos normales (quizás la excepción es el R on T). Lo cual es bueno, ya que de no ser así no serían anomalías.

Adicionalmente poder ver la data de manera apropiada es importante. Por lo tanto para poder entender “en promedio” cómo se ven los distintos diagnósticos de un latido usaremos lo siguiente:

def plot_time_series_class(data, class_name, ax, n_steps=10):
    time_series_df = pd.DataFrame(data)

    smooth_path = time_series_df.rolling(n_steps).mean()
    path_deviation = 2 * time_series_df.rolling(n_steps).std()

    under_line = (smooth_path - path_deviation)[0]
    over_line = (smooth_path + path_deviation)[0]

    ax.plot(smooth_path, linewidth=2)
classes =
fig, axs = plt.subplots(nrows = len(classes)// 3 + 1,
                        ncols = 3, 
                        sharey = True,
                        figsize = (12,8)

for i, cls in enumerate(classes):
    ax = axs.flat[i]
    data = df.query(f'target == "{cls}"').drop(columns = 'target').mean(axis = 0).to_numpy()
    plot_time_series_class(data, class_names[i], ax)



Observando los datos se puede apreciar lo siguiente:

  • Los latidos normales se caracterizan por un incremento de la señal inicialmente y luego un peak y un valle al final del latido.

  • Las anomalías se caracterizan porque no tienen ese peak al final, y rapidamente la señal decae, probablemente con distintas intensidades dependiendo de la anomalía. Quizás se requiere de un ojo más experto para poder diferenciar de mejor manera las distintas afecciones anomalas, pero para nuestro caso basta con que las anomalias es “no-normal”.

Data Split

Si bien en este caso juntamos toda la data disponible para tener más muestras de entrenamiento, eso no significa que no la dividiremos. Además nuestro modelo no aprenderá utilizando Etiquetas, sino que aprenderá lo “normal” para luego detectar que algo no está dentro de lo normal.

La manera en la que separaremos los datos es la siguiente:

  • normal_df: Contendrá toda la data clasificada como normal. Este dataset será posteriormente** dividido en train_df, val_df y test_df.
  • anomalies_df: Contendrá toda la data que no está clasificada como normal.

Creación del modelo en Pytorch Lightning

Como ya sabemos de tutoriales anteriores, el modelo en Pytorch Lightning parte generando la clase Dataset y el LightningDataModule. Estos procesos transformarán la data en tensores para que puedan ser procesados por Pytorch.

La clase ECGData transformará cada fila de los dataframes en tensores que representan una serie de tiempo de un latido.

La clase ECGDataModule define varios métodos:

  • import_arff es una utility funcion que importa un dataset del formato arff a pandas.
  • setup: Es el el método encargado de organizar todas las fuentes de datos del modelo:
    • Importa los archivos terminados en .arff y los concatena como un sólo pandas DataFrame.
    • Se define el normal_data como el DataFrame que contiene sólo latidos de clase normal.
    • Se utiliza un split para separar normal_data en train_df, val_df y test_df.
    • Se define anomaly_df como todos los latidos que no contienen una clase normal.
    • Se transforman los distintos df creados en tensores utilizando la clase ECGData.
  • Finalmente se crean los DataLoaders correspondientes para cada subset.
    • Se considera el predict_dataloader con la data de anomalías.
class ECGData(Dataset):
    def __init__(self, data): = data
    def __len__(self):
        return len(
    def __getitem__(self, idx):
        # agrega singleton dim al final. dtype debe ser float ya que es el requerido por LSTMs.
        return torch.tensor([idx], dtype = torch.float32).unsqueeze(-1) 
class ECGDataModule(pl.LightningDataModule):
    def __init__(self, folder_path, normal_class, test_splits = [0.15, 0.5], batch_size = 1):
        self.folder_path = folder_path
        self.normal_class = normal_class
        self.batch_size = batch_size
        self.val_split, self.test_split = test_splits
    def import_arff(self, path):
        with open(path) as f:
            data = a2p.load(f)
        return data

    def setup(self, stage = None):
        file_paths = glob.glob(f'{self.folder_path}/*.arff') = pd.concat([self.import_arff(f) for f in file_paths]).rename(columns = {'target@{1,2,3,4,5}': 'target'})
        normal_data ='target == "{self.normal_class}"').drop(columns = 'target')
        self.train_df, self.val_df = train_test_split(normal_data, test_size = self.val_split, random_state=RANDOM_SEED)
        self.val_df, self.test_df = train_test_split(self.val_df, test_size = self.test_split, random_state=RANDOM_SEED)
        self.anomaly_df ='target != "{self.normal_class}"').drop(columns = 'target')
        self.train_df = ECGData(self.train_df)
        self.val_df = ECGData(self.val_df)
        self.test_df = ECGData(self.test_df)
        self.anomaly_df = ECGData(self.anomaly_df)
    def train_dataloader(self):
        return DataLoader(self.train_df, batch_size = self.batch_size, pin_memory = True, num_workers = cpu_count(), shuffle=False)
    def val_dataloader(self):
        return DataLoader(self.val_df, batch_size = self.batch_size, pin_memory = True, num_workers = cpu_count(), shuffle=False)
    def test_dataloader(self):
        return DataLoader(self.test_df, batch_size = self.batch_size, pin_memory = True, num_workers = cpu_count(), shuffle=False)
    def predict_dataloader(self):
        return DataLoader(self.anomaly_df, batch_size = 1, pin_memory = True, num_workers = cpu_count(), shuffle=False)
dm = ECGDataModule('ECG5000', normal_class = 1, batch_size  = 100)
torch.Size([140, 1])

Como podemos ver para poder ingresar la data con las dimensiones correctas se agregó un singleton al final. De esta manera se reconoce que el tensor es de largo 140 y contiene sólo una dimensión asociada a features (es univariado). Esto para cumplir los requerimientos de dimensiones de las Redes LSTM que son las que vamos a utilizar.

for batch in dm.train_dataloader():
torch.Size([100, 140, 1])

Además si ejecutamos una instancia del DataLoader podemos ver las dimensiones del tensor resultante: 100 muestras, de tamaño 140 (largo de la secuencia) por 1 (una variable).

El modelo propiamente tal

El modelo que utilizaremos en un LSTM AutoEncoder. La implementación está basada en la utilizada acá por sequitur.

Partamos definiendo qué es un autoencoder. Un autoencoder es una arquitectura de Redes Neuronales que permite recrear una data de entrada. La data se intenta pasar por un embedding que actúa como cuello de botella con la intención de la red pueda extraer sólo las características esenciales. El embedding viene a ser la representación de la data en un espacio alternativo, normalmente reducido (por eso el cuello de botella).Replicar la secuencia original a partir de este espacio reducido forzará a la red a replicar sólo lo esencial de la secuencia.

Normalmente se considera que el output de un Autoencoder es equivalente a una reducción de dimensionalidad no supervisada o un denoiser. La premisa en este tipo de modelos es que la red aprenderá a reconstruir ECG “normales”. La reconstrucción de un ECG normal debiera ser muy similar al real, es decir, el error será pequeño. Pero si la red intenta reconstruir un ECG que no es normal, entonces el error será mayor. Ajustando la red a un threshold de error podemos detectar cuales son los outliers de nuestra distribución de errores, los que serán catalogados como anomalías.

Con respecto a la arquitectura a utilizar utilizaremos un LSTMAutoEncoder. Éste, tendrá el objetivo de tomar un conjunto de secuencias a las que reducirá su dimensionalidad hasta llegar al cuello de botella z(n). Este será el resultado del hidden state de la útlima LSTM usada en la parte Encoder del Modelo.

Este cuello de botella z(n) se repetirá tantas veces como el largo de secuencia inicial y se someterá a un Decoder también formado por LSTMs. A diferencia del Encoder acá el output total de las LSTMs usadas, la cual pasará por una capa fully connected para reconstruir la dimensión inicial de la secuencia.

class Encoder(nn.Module):
    def __init__(self, seq_len, n_features, embedding_dim = 64):
        self.n_features = n_features
        self.embedding_dim, self.hidden_dim = embedding_dim, 2 * embedding_dim
        self.seq_len = seq_len
        self.rnn1 = nn.LSTM(
            input_size = n_features,
            hidden_size = self.hidden_dim,
            num_layers = 1,
            batch_first = True
        self.rnn2 = nn.LSTM(
            input_size = self.hidden_dim,
            hidden_size = self.embedding_dim,
            num_layers = 1,
            batch_first = True
    def forward(self, x):
        #x = x.unsqueeze(0) no es necesario porque ya viene con las dimensiones correctas desde el DataLoader...
        #x = x.reshape((1, 2481, self.n_features))
        # x, (hidden_n, cell_n) = self.rnn1(x)
        # x, (hidden_n, cell_n) = self.rnn2(x)
        x = x.reshape((-1, self.seq_len, self.n_features))
        x, (hidden_n, cell_n) = self.rnn1(x)
        x, (hidden_n, cell_n) = self.rnn2(x)
        return hidden_n.reshape((-1, self.embedding_dim)) #.squeeze(0)#

El Encoder entonces espera el largo de la secuencia (140), el número de features (1) y una dimensión de embedding de 64, que será la dimensión del cuello de botella. En este caso nuestro decoder tomará las 140 secuencias, las reducirá a 128 y luego a 64. Dado que usamos este orden de dimensiones debemos agregar el parámetro batch_first = True.

class Decoder(nn.Module):
    def __init__(self, seq_len, input_dim = 64, output_dim = 1):
        self.seq_len = seq_len
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.hidden_dim = 2* input_dim
        self.rnn1 = nn.LSTM(
            input_size = input_dim,
            hidden_size = input_dim,
            num_layers = 1, 
            batch_first = True
        self.rnn2 = nn.LSTM(
            input_size = input_dim,
            hidden_size = self.hidden_dim,
            num_layers = 1,
            batch_first = True
        self.dense_layers = nn.Linear(self.hidden_dim, output_dim)
    def forward(self, x):
        #print('dim:', x.shape)
        # x = x.repeat(1,self.seq_len, 1)#.unsqueeze(0)
        # #x = x.reshape((1, self.seq_len, self.input_dim))
        # #print('Repeat: ', x.shape)
        # x, (hidden_n, cell_n) = self.rnn1(x)
        # x, (hidden_n, cell_n) = self.rnn2(x)
        # x = x.squeeze(0)
        #print('Squeeze: ', x.shape)
        #print('Puro', x.shape)
        x = x.repeat(self.seq_len, 1) # repetition layer
        #print('Repeat', x.shape)
        x = x.reshape((-1, self.seq_len, self.input_dim)) # reshaping for batches
        #print('Reshape', x.shape)
        x, (hidden_n, cell_n) = self.rnn1(x)
        #print('RNN1', x.shape)
        x, (hidden_n, cell_n) = self.rnn2(x)
        #print('RNN2', x.shape)
        #x = x.reshape((-1,self.seq_len, self.hidden_dim))
        return self.dense_layers(x) 

En el caso del Decoder tomará un tensor proveniente del Encoder, de largo de secuencia 64, lo repetirá simulando el largo original y aumentará su dimensión hasta llegar las dimensiones originales.

enc = Encoder(seq_len = 140, n_features = 1)
dec = Decoder(seq_len = 140)

for batch in dm.train_dataloader():
    print('Tamaño del Batch Inicial:', batch.shape)
    x = enc(batch)
    print('Tamaño a la Salida del Encoder:', x.shape)
    print('Tamaño a la Salida del Decoder:', dec(x).shape)
Tamaño del Batch Inicial: torch.Size([100, 140, 1])
Tamaño a la Salida del Encoder: torch.Size([100, 64])
Tamaño a la Salida del Decoder: torch.Size([100, 140, 1])
class LSTMAutoEncoder(pl.LightningModule):
    def __init__(self, seq_len = 140, n_features = 1):
        self.encoder = Encoder(seq_len, n_features)
        self.decoder = Decoder(seq_len)
        self.criterion = nn.L1Loss(reduction = 'mean')
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x
    def training_step(self, batch, batch_idx):
        pred_seq = self(batch)
        loss = self.criterion(pred_seq, batch)
        self.log('train_loss', loss, prog_bar = True)
        return loss
    def validation_step(self, batch, batch_idx):
        pred_seq = self(batch)
        loss = self.criterion(pred_seq, batch)
        self.log('val_loss', loss, prog_bar = True)
    def predict_step(self, batch, batch_idx, dataloader_idx = None):
        pred_seq = self(batch)
        loss = self.criterion(pred_seq, batch)
        return pred_seq, loss 
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr = 1e-3)

Finalmente se combinan Encoder y Decoder para crear la arquitectura final en el LightningModule. Se define como criterio para medir el error el L1Loss y como optimizer Adam con un learning rate de 1e-3.

model = LSTMAutoEncoder()
dm = ECGDataModule('ECG5000', normal_class = 1, batch_size = 10) 
mc = ModelCheckpoint(
    dirpath = 'checkpoints',
    filename = 'best-checkpoint',
    save_top_k = 1,
    verbose = True,
    monitor = 'val_loss', 
    mode = 'min'
# Instancia del Trainer
trainer = pl.Trainer(max_epochs = 150, #entrenar por 150 epochs
                    deterministic = True,
                    gpus = 1,
                    callbacks = [mc],
                    fast_dev_run = False)
GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs, dm)

  | Name      | Type    | Params
0 | encoder   | Encoder | 116 K 
1 | decoder   | Decoder | 132 K 
2 | criterion | L1Loss  | 0     
249 K     Trainable params
0         Non-trainable params
249 K     Total params
0.998     Total estimated model params size (MB)

! cd checkpoints && ls

Choosing a Threshold

trained_model = LSTMAutoEncoder.load_from_checkpoint(
  (encoder): Encoder(
    (rnn1): LSTM(1, 128, batch_first=True)
    (rnn2): LSTM(128, 64, batch_first=True)
  (decoder): Decoder(
    (rnn1): LSTM(64, 64, batch_first=True)
    (rnn2): LSTM(64, 128, batch_first=True)
    (dense_layers): Linear(in_features=128, out_features=1, bias=True)
  (criterion): L1Loss()
def plot_errors(model, data):
    dl = DataLoader(data, batch_size = 1, pin_memory = True, num_workers = cpu_count(), shuffle=False)
    preds = trainer.predict(model = model, dataloaders = dl)
    preds_losses = torch.tensor([item[1] for item in preds]).numpy()
    sns.displot(preds_losses, bins = 50, kde = True, height = 8, aspect = 2)
    return preds_losses

Esta función me ayudará a entender la distribución de los errores medidos como L1Loss. Por lo que generará una predicción en un dataset de interés y me generará la gráfica y almacenará el loss asociado en la predicción.

Recordar que en este caso no me interesa la predicción propiamente tal sino que el error de reconstrucción. La premisa es que altos errores de reconstrucción implican una anomalía.

normal_loss = plot_errors(trained_model, dm.train_df)
test_loss = plot_errors(trained_model, dm.test_df)
anomaly_loss = plot_errors(trained_model, dm.anomaly_df)
Predicting: 249it [00:00, ?it/s]

Predicting: 249it [00:00, ?it/s]

Predicting: 249it [00:00, ?it/s]

def check_correct(loss, treshold):
    correct = sum(l <= THRESHOLD for l in loss)
    print(f'Correct Normal predictions: {correct}/{len(loss)}, {correct/len(loss)*100:.2f}%')
check_correct(normal_loss, THRESHOLD)
check_correct(test_loss, THRESHOLD)
check_correct(anomaly_loss, THRESHOLD)
Correct Normal predictions: 2298/2481, 92.62%
Correct Normal predictions: 205/219, 93.61%
Correct Normal predictions: 153/2081, 7.35%

Se puede apreciar aproximadamente un error del 7%.

