QRS peak detection

Detection of cardiac cycles from ECG signals

# setup
import sys
import numpy as np
import scipy.signal as sp

from matplotlib import pyplot as plt

import wfdb

Download records

Identify and download records in the MIMIC III Waveform Database

# Select the first record
selected_record = '3000063_0013'
database_name = 'mimic3wdb/1.0/30/3000063/'
print("Selected record: {}".format(selected_record))

# load data from this record
start_seconds = 20
no_seconds_to_load = 10
fs = 125
record_data = wfdb.rdrecord(record_name = selected_record, sampfrom = fs*start_seconds, sampto = fs*(start_seconds + no_seconds_to_load), pn_dir = database_name) 
print("{} seconds of data loaded from: {}".format(no_seconds_to_load, selected_record))

# Plot the data loaded from this record
title_text = "First record from " + database_name + " (record: " + selected_record + ")"
wfdb.plot_wfdb(record=record_data, title=title_text, time_units='seconds') 
Selected record: 3000063_0013
10 seconds of data loaded from: 3000063_0013
../../_images/qrs-detection_3_1.png

Separate records

Separate ECG, PPG and ABP signals

# ECG, PPG and ABP extraction
h = 0
for i in record_data.sig_name:
    if i == 'II':
        print('ECG at position ' + str(h))
        ecg = record_data.p_signal[:,h]
    elif i == 'PLETH':
        print('PPG at position ' + str(h))
        ppg = record_data.p_signal[:,h]
    elif i == 'ABP':
        print('ABP at position ' + str(h))
        abp = record_data.p_signal[:,h]
    else:
        print('Other signal (' + i + ') at position ' + str(h))
    h = h + 1

t = np.arange(start_seconds,(start_seconds + no_seconds_to_load),1.0/fs)

fig, (ax1,ax2,ax3) = plt.subplots(3, 1, sharex = True, sharey = False)
ax1.plot(t, ecg)
ax1.set(xlabel = '', ylabel = 'ECG')
ax2.plot(t, ppg)
ax2.set(xlabel = '', ylabel = 'PPG')
ax3.plot(t, abp)
ax3.set(xlabel = 'Time [s]', ylabel = 'ABP')
ECG at position 0
ABP at position 1
PPG at position 2
[Text(0.5, 0, 'Time [s]'), Text(0, 0.5, 'ABP')]
../../_images/qrs-detection_5_2.png
# Remove NaNs
x, = np.where(np.isnan(ecg))
if len(x) != 0:
    print('NaNs in ECG: ' + str(x))
    for i in x:
        ecg[i] = ecg[i - 1]
x, = np.where(np.isnan(ppg))
if len(x) != 0:
    print('NaNs in PPG: ' + str(x))
    for i in x:
        ppg[i] = ppg[i - 1]
x, = np.where(np.isnan(abp))
if len(x) != 0:
    print('NaNs in ABP: ' + str(x))
    for i in x:
        abp[i] = abp[i - 1]

Filter data

# Filter ECG
sos_ecg = sp.butter(10, [0.5, 40], btype = 'bp', analog = False, output = 'sos', fs = fs)
ecg_ff = sp.sosfiltfilt(sos_ecg, ecg)

# Filter PPG
sos_ppg = sp.butter(10, [0.2, 10], btype = 'bp', analog = False, output = 'sos', fs = fs)
ppg_ff = sp.sosfiltfilt(sos_ppg, ppg)

# Filter ABP
sos_abp = sp.butter(10, 10, btype = 'low', analog = False, output = 'sos', fs = fs)
abp_ff = sp.sosfiltfilt(sos_abp, abp)

fig, (ax1,ax2,ax3) = plt.subplots(3, 1, sharex = False, sharey = False)
fig.suptitle('Filtered signals') 

ax1.plot(t, ecg, color = 'red')
ax1.plot(t, ecg_ff, color = 'orange')
ax1.set_xlabel('Time [s]')
ax1.set_ylabel('ECG [V]')

ax2.plot(t, ppg, color = 'red')
ax2.plot(t, ppg_ff, color = 'orange')
ax2.set_xlabel('Time [s]')
ax2.set_ylabel('PPG [V]')

ax3.plot(t, abp, color = 'red')
ax3.plot(t, abp_ff, color = 'orange')
ax3.set_xlabel('Time [s]')
ax3.set_ylabel('ABP [V]')

plt.subplots_adjust(left = 0.1,
                    bottom = 0.1, 
                    right = 0.9, 
                    top = 0.9, 
                    wspace = 0.4, 
                    hspace = 0.4)

#plt.show()
../../_images/qrs-detection_8_0.png

QRS detection

Define QRS detector functions

# QRS detection function
def qrs_detect(ecg,fs,w):
    """
    
    Description: QRS peak detection and correction
    Inputs:  ecg, array with ECG signal [user defined units]
             fs, sampling rate of signal [Hz]
             w, window length for analysis [s]
    Outputs: qrs, positions of R peaks [number of samples]
             n_int, number of intervals of length w in the signal
    
    Libraries: NumPy (as np), SciPy (Signal, as sp), Matplotlib (PyPlot, as plt)
    
    Version: 1.0 - June 2022
    
    Developed by: Elisa Mejía-Mejía
                   City, University of London
    
    """
    
    # Normalization of ECG
    ecg_n = (ecg - np.min(ecg))/(np.max(ecg) - np.min(ecg))
    ecg_n = ecg_n - np.mean(ecg_n)
    
    # Peak detection in windows of length w
    n_int = np.floor(len(ecg)/(w*fs))
    for i in range(int(n_int)):
        start = i*fs*w
        stop = (i + 1)*fs*w - 1
        #print('Start: ' + str(start) + ', stop: ' + str(stop))
        aux = ecg_n[range(start,stop)]
        locs, amps, nlocs = rpeakdetect(aux,fs,0.2)
        locs = locs + start
        if i == 0:
            qrs = locs
        else:
            qrs = np.append(qrs,locs)
    if n_int*fs*w != len(ecg_n):
        start = stop + 1
        stop = len(ecg_n)
        aux = ecg_n[range(start,stop)]
        if len(aux) > 20:
            locs, amps, nlocs = rpeakdetect(aux,fs,0.2)
            locs = locs + start
            qrs = np.append(qrs,locs)            
    
    # Peak correction
    # (1) Not peaks
    med_rr = np.median(np.diff(qrs))
    med_amp = np.median(ecg[qrs])
    pos_for = 0
    pos_rev = 0
    for i in range(len(qrs)):
        cur_amp = ecg[qrs[i]]
        if i == 1:                 # Peak is at the first position of the signal
            next_amp = ecg[qrs[i] + 1]
            cond = cur_amp >= next_amp
            #print(cur_amp, next_amp, cond)
        elif i == len(ecg):        # Peak is at the last position of the signal
            prev_amp = ecg[qrs[i] - 1]
            cond = cur_amp >= prev_amp
            #print(cur_amp, prev_amp, cond)
        else:   
            next_amp = ecg[qrs[i] + 1]
            prev_amp = ecg[qrs[i] - 1]
            cond = cur_amp >= prev_amp and cur_amp >= next_amp 
            #print(cur_amp, prev_amp, next_amp, cond)    
        
        if not(cond):
            # Forward search
            j = qrs[i] + 1
            if i == len(qrs) - 1:
                stop = len(ecg)
            else:
                stop = qrs[i + 1]
            while j < stop:
                cur_amp = ecg[j]
                next_amp = ecg[j + 1]
                prev_amp = ecg[j - 1]
                if not(cur_amp > next_amp and cur_amp > prev_amp):
                    j = j + 1
                else:
                    if ecg[j] >= 0.5*med_amp:
                        pos_for = j
                        j = stop
                    else:
                        j = j + 1
            # Reverse search
            j = qrs[i] - 1
            if i == 0:
                stop = 1
            else:
                stop = qrs[i - 1]
            while j > stop:
                cur_amp = ecg[j]
                next_amp = ecg[j + 1]
                prev_amp = ecg[j - 1]
                if not(cur_amp > next_amp and cur_amp > prev_amp):
                    j = j - 1
                else:
                    if ecg[j] >= 0.5*med_amp:
                        pos_rev = j
                        j = stop
                    else:
                        j = j - 1
            # Selection of peak
            if pos_for != 0 and pos_rev != 0:
                pos = [pos_for, pos_rev]
                if i - 1 == 0:
                    dif = - pos
                else:
                    dif = qrs[i - 1] - pos
                dif_med = np.abs(dif - med_rr)
                #print(dif_med)
                min_dif = np.min(dif_med)
                min_dif, = np.where(dif_med == min_dif)
                #print(min_dif,pos(min_ind))
                qrs[i] = pos(min_ind)
    
    # (2) Low peaks
    med_amp = np.median(ecg[qrs])
    #print(len(qrs))
    result, = np.where(ecg[qrs] < 0.5*med_amp)
    if len(result) > 0:
        #print(result)
        qrs = np.delete(np.array(qrs),result)
        #print(len(qrs)) 
    # (3) Too close or too far
    rr = np.diff(qrs)
    avg_rr = np.mean(rr)
    med_rr = np.median(rr)
    q3_rr, q1_rr = np.percentile(rr,[75, 25])
    iqr_rr = q3_rr - q1_rr
    #print(rr, avg_rr, med_rr, q3_rr, q1_rr, iqr_rr)
    if (med_rr < 0.5*fs or avg_rr < 0.5*fs) and (iqr_rr > 0.2*fs):
        result, = np.where(rr <= med_rr)
        qrs = np.delete(qrs, result)
        med_rr = np.median(rr)
    #   (a) Too close
    ind, = np.where(rr < 0.5*med_rr)
    if len(ind) != 0:
        i = 1
        while i <= len(ind):
            ind = no.sort(np.append(ind, ind[i] + 1))
            #print(ind)
            i = i + 2
        qrs = np.delete(qrs, ind)
    #   (b) Too far
    rr = np.diff(qrs)
    ind, = np.where(rr > 1.5*med_rr)
    th_amp = 0.5*np.median(ecg[qrs])
    #print(ind, th_amp)   
    if len(ind) != 0:
        i = 0
        while len(ind) != 0:
            #ind = np.round(ind)
            #qrs = np.round(qrs)
            #print(i, ind[i], len(qrs))
            if qrs[ind[i]] == qrs[-1]:
                aux = ecg[qrs[ind[i]]:len(ecg)]
            else:
                aux = ecg[qrs[ind[i]]:qrs[ind[i] + 1]]
            locs, _ = sp.find_peaks(aux)
            ind_del, = np.where(aux[locs] < th_amp)
            locs = np.delete(locs, ind_del)
            if len(locs) == 0:
                new = med_rr + qrs[ind[i]] - 1
            elif len(locs) == 1:
                new = locs + qrs[ind[i]] - 1
            else:
                aux_dif = np.abs(locs - med_rr)
                min_dif = np.min(aux_dif)
                ind_new, = np.where(aux_dif == min_dif)
                new = locs[ind_new] + qrs[ind[i]] - 1
            qrs = np.sort(np.append(qrs, new))
            rr = np.diff(qrs)
            ind, = np.where(rr > 1.5*med_rr)
    rr = np.diff(qrs)
    ind_del, = np.where(rr < 0.3*fs)
    rr = np.delete(rr, ind_del)
    
    #fig = plt.figure()
    #plt.plot(ecg, color = 'black')
    #plt.scatter(qrs,ecg[qrs],marker = 'o',color = 'red')
        
    return qrs, n_int
# R peak detect
def rpeakdetect(ecg, fs, th):
    """
    
    Description: QRS peak detection based on rpeakdetect2.m by G. Clifford
                 A batch QRS detector based upon that of Pan, Hamilton and Tompkins:
                 J. Pan & W. Tompkins. A real-time QRS detection algorithm. IEEE Trans Biomed Eng, vol. BME-32 NO. 3. 1985.
                 P. Hamilton & W. Tompkins. Quantitative Investigation of QRS Detection Rules Using the MIT/BIH Arrythmia 
                 Database. IEEE Trans Biomed Eng, vol. BME-33, NO. 12.1986.
    Inputs:  ecg, array with ECG signal [user defined units]
             fs, sampling rate of signal [Hz]
             th, threshold for peaks in integrated waveform - Default: 0.2
    Outputs: locs, positions of R peaks [number of samples]
             amps, amplitudes of R peaks [user defined units]
             nlocs, number of R peaks found
    
    Libraries: NumPy (as np), SciPy (Signal, as sp), Matplotlib (PyPlot, as plt)
    
    Version: 1.0 - June 2022
    
    Developed by: Elisa Mejía-Mejía
                  City, University of London
    
    """
    
    # Create time array
    t = np.divide(range(0,len(ecg) - 1), fs)
    
    # Preprocessing:
    # (1) Filter data
    sos = sp.butter(6, 40, btype = 'low', analog = False, output = 'sos', fs = fs)
    ecg_f = sp.sosfiltfilt(sos, ecg)
    # (2) Differentiate ECG
    ecg_d = np.diff(ecg_f)
    # (3) Square ECG
    ecg_s = ecg_d*ecg_d
    # (4) Integrate data
    if fs >= 256:
        d = np.ones(int(np.round(7*fs/256)))
    else:
        d = np.ones(21)
    ecg_fir = sp.lfilter(d, 1, ecg_s)
    ecg_med = sp.medfilt(ecg_fir, kernel_size = 9)
    
    # Remove filter delay:
    delay = np.ceil(len(d)/2)
    ecg_med = ecg_med[int(delay):len(ecg_med)]
    
    # Find peaks: 
    # (1) Find highest bumps in data
    start_int = round((len(ecg) - 1)/4)
    stop_int = round(3*(len(ecg) - 1)/4)
    max_h = np.max(ecg_med[start_int:stop_int])
    # (2) Determine left and right boundaries
    ecg_med = np.insert(ecg_med,0,0)
    ecg_med = np.append(ecg_med,0)
    n_left = 0
    n_right = 0
    for i in range(len(ecg_med)):
        if i > 0 and i < len(ecg_med):
            if ecg_med[i] > (th*max_h) and ecg_med[i - 1] < (th*max_h): # left boundary
                if n_left == 0:
                    left_bound = i - 1
                    n_left = 1
                else:
                    left_bound = np.append(left_bound,i - 1)
            if ecg_med[i] > (th*max_h) and ecg_med[i + 1] < (th*max_h): # right boundary
                if n_right == 0:
                    right_bound = i - 1
                    n_right = 1
                else:
                    right_bound = np.append(right_bound,i - 1)        
    #print(left_bound)
    #print(right_bound)
    # (4) Look through all possibilities
    if left_bound[0] > right_bound[0]:
        right_bound = np.delete(right_bound,0)
    if left_bound[-1] > right_bound[-1]:
        left_bound = np.delete(left_bound,0)
    nlocs = 0
    for i in range(len(left_bound)):
        #print(i)
        lb = left_bound[i]
        rb, = np.where(np.array(right_bound) > lb)
        if len(rb) != 0:
            rb = right_bound[rb[0]]
            #print(lb, rb)
            
            amp = np.max(ecg[lb:rb])
            pos, = np.where(np.array(ecg[lb:rb]) == np.amax(ecg[lb:rb]))
            pos = pos[0]
            pos = int(pos + lb)
            #print(pos, amp)
            if nlocs == 0:
                locs = pos
                amps = ecg[pos]
            else:
                locs = np.append(locs, pos)
                amps = np.append(amps, ecg[pos])
            nlocs = nlocs + 1
            #print(locs)
    
    #fig = plt.figure()
    #plt.plot(ecg)
    #plt.plot(ecg_f)
    #plt.plot(ecg_d)
    #plt.plot(ecg_s)
    #plt.plot(ecg_fir)
    #plt.plot(ecg_med)
    #plt.scatter(locs, amps, marker = 'o')
    
    return locs, amps, nlocs,
# Detect cardiac cycles
qrs, n_int = qrs_detect(ecg_ff,fs,5)

fig = plt.figure()
plt.title('QRS detection') 
plt.plot(t, ecg_ff, color = 'black')
plt.scatter(start_seconds + qrs/fs, ecg_ff[qrs], color = 'red', marker = 'o')
<matplotlib.collections.PathCollection at 0x7fd635e46f10>
../../_images/qrs-detection_13_1.png