From 123cfd17a5082db4df09960c94a387f0aa912959 Mon Sep 17 00:00:00 2001 From: Sofia Monteiro <81264130+sofia3ms@users.noreply.github.com> Date: Fri, 20 Feb 2026 12:53:57 +0000 Subject: [PATCH] Add EAM support Small changes to egm.py Adds the spatial.eam module for visualization and processing of electroanatomical maps Adds the load_carto function to storage.py, for extracting EAM data from raw files of the CARTO mapping system. --- biosppy/signals/egm.py | 13 +- biosppy/spatial/eam.py | 246 ++++++++++++++++++++++++++++ biosppy/storage.py | 363 +++++++++++++++++++++++++++++++++++++++++ requirements.txt | 3 +- 4 files changed, 618 insertions(+), 7 deletions(-) create mode 100644 biosppy/spatial/eam.py diff --git a/biosppy/signals/egm.py b/biosppy/signals/egm.py index dfa351e5..9a8d0a1f 100644 --- a/biosppy/signals/egm.py +++ b/biosppy/signals/egm.py @@ -5,15 +5,17 @@ This module provides methods to process intracardiac EGM (Electrogram) signals. -:copyright: (c) 2015-2016 by Instituto de Telecomunicacoes +:copyright: (c) 2015-2026 by Instituto de Telecomunicacoes :license: BSD 3-clause, see LICENSE for more details. """ # 3rd party import numpy as np import scipy import matplotlib.pyplot as plt +import matplotlib.colors as colors import inspect import warnings +import pyvista as pv # local from .. import plotting, utils @@ -767,14 +769,13 @@ def dominant_frequency(signal=None, sampling_rate=1000., plot=False): nfft = 2**np.ceil(np.log2(n)).astype(int) # pad signal signal = np.pad(signal, (0, nfft - n), mode='constant') - from scipy.signal.windows import hann - from scipy.fft import fft, fftfreq - w = hann(nfft) - fft_vals = fft(signal*w) + + w = scipy.signal.windows.hann(nfft) + fft_vals = scipy.fft.fft(signal*w) # sample spacing T = 1.0 / sampling_rate - fft_freqs = fftfreq(nfft, T)[:nfft // 2] + fft_freqs = scipy.fft.fftfreq(nfft, T)[:nfft // 2] fft_vals = fft_vals[:nfft // 2] psd = 2.0 / (nfft * T) * np.abs(fft_vals)**2 diff --git a/biosppy/spatial/eam.py b/biosppy/spatial/eam.py new file mode 100644 index 00000000..635af95e --- /dev/null +++ b/biosppy/spatial/eam.py @@ -0,0 +1,246 @@ + +# -*- coding: utf-8 -*- +""" +biosppy.spatial.eam +------------------- + +This module provides functions for computing and visualizing electrophysiological activation maps (EAMs) +from electrogram (EGM) data. It includes functions for calculating conduction velocity (CV) using triangulation +of activation times, as well as interpolation of values across a 3D mesh. +The module also provides a function for plotting the geometry of the mesh with the computed values using PyVista. + +:copyright: (c) 2015-2026 by Instituto de Telecomunicacoes +:license: BSD 3-clause, see LICENSE for more details. +""" +# 3rd party +import numpy as np +import scipy +import matplotlib.pyplot as plt +import pyvista as pv + +# local +from .. import utils + +def plot_geometry(X, triang, values=None, cmap = plt.get_cmap('gist_rainbow', 200), lims = None, type = 'activation', center = None, vec = None): + + """ + Plots a 3D mesh using PyVista. + + Parameters + ---------- + X : np.ndarray + An Nx3 array of vertex coordinates. + triang : np.ndarray + An Mx3 array of triangle indices. + values : np.ndarray, optional + An array of scalar values for coloring the mesh. + cmap : matplotlib.colors.Colormap, optional + A colormap for coloring the mesh. + lims : tuple, optional + A tuple specifying the min and max limits for truncating the colormap. + type : str, optional + A string specifying the type of map (e.g., 'activation', 'voltage', 'cv', 'cv_vec' or any other custom label). Defaults to 'activation'. + center : np.ndarray, optional + An Nx3 array of coordinates for the center points of the vectors (required if type is 'cv_vec'). + vec : np.ndarray, optional + An Nx3 array of vector components (required if type is 'cv_vec'). + """ + + if lims is not None: + range = [lims[0], lims[1]] + else: + range = [np.min(values), np.max(values)] + + if np.shape(triang)[1] == 3: + triang = np.hstack((np.ones((triang.shape[0], 1)) * 3,triang)).astype(int) + + if type == 'activation': + label = 'Activation Time (ms)' + elif type == 'voltage': + label = 'Voltage (mV)' + elif type == 'cv': + label = 'Conduction Velocity (m/s)' + elif type == 'cv_vec': + label = type + + mesh = pv.PolyData(X, triang) + + plotter = pv.Plotter() + plotter.set_background('white') + if type == 'cv_vec': + glyph_points = pv.PolyData(center) + glyph_points["vectors"] = vec + arrows = glyph_points.glyph( + orient="vectors", + scale=True, + factor=10.0 + ) + plotter.add_mesh(mesh, color='lightgray', opacity=0.5, scalars = values, cmap=cmap, clim=[range[0], range[1]], scalar_bar_args={'title': label},) + plotter.add_mesh(arrows, color='red') + elif values is not None: + plotter.add_mesh(mesh, scalars=values, cmap=cmap, clim=[range[0], range[1]], scalar_bar_args={'title': label},) + else: + plotter.add_mesh(mesh, color='black') + + plotter.view_xy() # Force XY view + plotter.show() + + +def _conduction_velocity(p, latp, q, latq, r, latr): + """Computes the conduction velocity vector using triangulation of three points. + + Parameters + ---------- + p : array + Coordinates of point p (x, y, z). + latp : float + Activation time at point p (ms). + q : array + Coordinates of point q (x, y, z). + latq : float + Activation time at point q (ms). + r : array + Coordinates of point r (x, y, z). + latr : float + Activation time at point r (ms). + + Returns + ------- + cv : float + Conduction velocity (m/s). + vec : array + Conduction velocity vector (unit vector in the direction of propagation). + """ + + # compute arrays of triangle edges + a = np.array([q[0]-p[0], q[1]-p[1], q[2]-p[2]]) + b = np.array([r[0]-p[0], r[1]-p[1], r[2]-p[2]]) + c = np.array([r[0]-q[0], r[1]-q[1], r[2]-q[2]]) + + # compute relative activation times + + lat_a = abs(latq - latp) + lat_b = abs(latr - latp) + + + theta = np.arccos((np.linalg.norm(a)**2 + np.linalg.norm(b)**2 - np.linalg.norm(c)**2) / (2*(np.linalg.norm(a) * np.linalg.norm(b)))) + alpha = np.arctan2(lat_b*np.linalg.norm(a) - lat_a*np.linalg.norm(b)*np.cos(theta), lat_a*np.linalg.norm(b)*np.sin(theta)) + + # compute conduction velocity (scalar) + cv = np.linalg.norm(a) * np.cos(alpha) / lat_a + + # compute conduction velocity (vector) + + n = np.cross(b, a) + n = n / np.linalg.norm(n) + + x_ps = np.cross(n, a) * np.tan(alpha) + vec_full = a - x_ps + vec = vec_full / np.linalg.norm(vec_full) + + return utils.ReturnTuple((cv, vec), + ('cv', 'vec')) + +def cv_triangulation(egmX, lat, min_dist = 2, min_lat = 2, verbose = 1): + """Computes the conduction velocity (CV) and CV vector at each point in a 3D mesh using triangulation. + + Parameters + ---------- + egmX : np.ndarray + An Nx3 array of vertex coordinates. + lat : np.ndarray + An array of activation times corresponding to each vertex in egmX. + min_dist : float, optional + The minimum distance (in mm) between points to be considered for CV calculation. Defaults to 2 mm. + min_lat : float, optional + The minimum difference in activation time (in ms) between points to be considered for CV calculation. Defaults to 2 ms. + verbose : int, optional + If set to 1, displays a progress bar during the CV calculation. Defaults to 1. + + Returns + ------- + cvs : np.ndarray + An array of conduction velocities (in m/s) corresponding to each vertex in egmX + vecs : np.ndarray + An Nx3 array of conduction velocity vectors corresponding to each vertex in egmX. + cv_X : np.ndarray + An Nx3 array of vertex coordinates corresponding to the calculated conduction velocities and vectors. + """ + + # triangulate + mesh = pv.PolyData(egmX) + mesh = mesh.delaunay_2d() + + faces = mesh.faces.reshape((-1,4))[:, 1:4] + + found = [] + # nan vector same length as egmX + cvs = np.full(len(egmX), np.nan) + # nan vector same shape as egmX + vecs = np.full((len(egmX), 3), np.nan) + # nan vector same shape as egmX + cv_X = np.full((len(egmX), 3), np.nan) + count = 0 + for i in range(len(mesh.points)): + + neighbors = [j for j in faces if i in j] + + if verbose: + print(f"\rCV calculation. Progress: {i+1}/{len(mesh.points)}", end="", flush=True) + + for neighbor in neighbors: + + # check if triangle has already been used with previous vertex + if not bool(set(neighbor) & set(found)): + # check constraints + delta_pq = lat[neighbor[1]] - lat[neighbor[0]] + delta_pr = lat[neighbor[2]] - lat[neighbor[0]] + + dist_pq = np.linalg.norm(mesh.points[neighbor[1]] - mesh.points[neighbor[0]]) + dist_pr = np.linalg.norm(mesh.points[neighbor[2]] - mesh.points[neighbor[0]]) + + if abs(delta_pq) > min_lat and abs(delta_pr) > min_lat and dist_pq > min_dist and dist_pr > min_dist: + count += 1 + cv, vec = _conduction_velocity(mesh.points[neighbor[0]], lat[neighbor[0]], mesh.points[neighbor[1]], lat[neighbor[1]], mesh.points[neighbor[2]], lat[neighbor[2]]) + cvs[neighbor[0]] = cv[0] + vecs[neighbor[0]] = vec + cv_X[neighbor[0]] = mesh.points[neighbor[0]] + + + found.append(i) + print() + return utils.ReturnTuple((cvs, vecs, cv_X), + ('cvs', 'vecs', 'cv_X')) + + +def interpolator(X, y, X_new, kernel='multiquadric'): + """Interpolates values at new points using Radial Basis Function (RBF) interpolation. + + Parameters + ---------- + X : np.ndarray + An NxD array of original data points. + y : np.ndarray + An array of values corresponding to each point in X. + X_new : np.ndarray + An MxD array of new data points where interpolation is to be performed. + kernel : str, optional + The type of RBF kernel to use. Defaults to 'multiquadric'. + + Returns + ------- + y_new : np.ndarray + An array of interpolated values corresponding to each point in X_new. + """ + + + # remove nans from y + y = y.ravel() + mask = ~np.isnan(y) + X = X[mask] + y = y[mask] + + y_new = scipy.interpolate.RBFInterpolator(X, y, kernel=kernel, epsilon=1.0)(X_new) + + return utils.ReturnTuple((y_new,), + ('y_new',)) \ No newline at end of file diff --git a/biosppy/storage.py b/biosppy/storage.py index 96fd6c0c..44b1131d 100644 --- a/biosppy/storage.py +++ b/biosppy/storage.py @@ -1204,3 +1204,366 @@ def close(self): # close self._file.close() + + +def _read_mesh_vertices(filepath): + """ + Auxiliary function to read the data of a Biosense Webster .mesh file + Returns the vertices, triangles, and data. + + Parameters + ---------- + filepath : str + Path to the .mesh file. + + Returns + ------- + vertices : pandas.DataFrame + DataFrame containing the vertices information. + triangles : pandas.DataFrame + DataFrame containing the triangles information. + data : pandas.DataFrame + DataFrame containing the vertices colors information. + + """ + vertices = [] + triangles = [] + data = [] + section = "empty" + + import pandas as pd + + with open(filepath, "r", errors="ignore") as f: + for line in f: + line = line.strip() + + # Start after VerticesSection header + if line == "[VerticesSection]": + section = "vertices" + next(f) # skip comment line + + if line == "[TrianglesSection]": + section = "triangles" + next(f) # skip comment line + + if line == "[VerticesColorsSection]": + section = "data" + next(f) # skip comment line + next(f) # skip another comment line + + if line == "[VerticesAttributesSection]": + break # End of relevant sections + + if section == "vertices": + values = line.split() + temp = [float(v) for v in values[2:]] + if len(temp) > 0: + vertices.append([float(v) for v in values[2:]]) + + elif section == "triangles": + values = line.split() + temp = [float(v) for v in values[2:]] + if len(temp) > 0: + triangles.append(temp) + + elif section == "data": + values = line.split() + temp = [float(v) for v in values[2:]] + if len(temp) > 0: + data.append(temp) + + columns_vertices = [ + "X", "Y", "Z", + "NormalX", "NormalY", "NormalZ", + "GroupID" + ] + columns_triangles = ["Vertex0", "Vertex1", "Vertex2", "NormalX", "NormalY", "NormalZ", "GroupID"] + columns_data = ["Unipolar", "Bipolar", "LAT", "Impedance", "A1", "A2", "A2-A1", "SCI", "ICL", "ACL", "Force", "Paso", "µBi"] + return pd.DataFrame(vertices, columns=columns_vertices), pd.DataFrame(triangles, columns=columns_triangles), pd.DataFrame(data, columns=columns_data) + +def _read_ecg_files(ECG_file): + """ Auxiliary function to read the data of a Biosense Webster ECG_Export.txt file. + Returns the channel names, reference channel, and voltages. + + Parameters + ---------- + ECG_file : str + Path to the ECG_Export.txt file. + + Returns + ---------- + channel_names : list + List of channel names. + reference_channel : str + Name of the reference channel. + voltages : numpy.ndarray + Signals of each channel. + + """ + if ECG_file is not None: + with open(ECG_file, 'r') as f: + data = f.readlines() + line1 = data[0].strip().split(',') + line2 = data[1].strip().split(',') + line3 = data[2].strip().split(',') + + lib_chars = ('m1', 'wc') + + if not line3[:2] in lib_chars: + # another version of the ecg file + # try the next line + reference_channel = data[2].split('=')[-1].split('\n')[0] + line3 = data[3].strip().split(',') + + # check if line1 matches string + if line1[0] != 'ECG_Export_4.0': + print(f"Warning: Unexpected file format in {ECG_file}") + + gain = float(line2[0].split('= ')[1]) + + channel_names = line3[0].split() + + voltages = data[4:] + + for i in range(len(voltages)): + voltages[i] = voltages[i].split() + + voltages = np.array(voltages, dtype=float) * gain # apply gain + + return channel_names, reference_channel, voltages + + +def _find_file(directory, start_str=None, end_str=None): + """ Auxiliary function to find a file in a directory that starts with start_str and ends with end_str. + If start_str or end_str is None, it will be ignored in the search. + + Parameters + ---------- + directory : str + Directory to search in. + start_str : str, optional + String that the file should start with. + end_str : str, optional + String that the file should end with. + + Returns + ---------- + str + Path to the found file, or None if no file is found. + """ + for file in os.listdir(directory): + if start_str is not None and end_str is not None: + if file.startswith(start_str) and file.endswith(end_str): + return os.path.join(directory, file) + elif start_str is not None: + if file.startswith(start_str): + return os.path.join(directory, file) + elif end_str is not None: + if file.endswith(end_str): + return os.path.join(directory, file) + return None + + +def _find_all_files(directory, start_str=None, end_str=None): + """ Auxiliary function to find all files in a directory that start with start_str and end with end_str. + If start_str or end_str is None, it will be ignored in the search. + + Parameters + ---------- + directory : str + Directory to search in. + start_str : str, optional + String that the file should start with. + end_str : str, optional + String that the file should end with. + + Returns + ---------- + list of str + List of paths to the found files, or an empty list if no file is found. + """ + files_found = [] + for file in os.listdir(directory): + if start_str is not None and end_str is not None: + if file.startswith(start_str) and file.endswith(end_str): + files_found.append(os.path.join(directory, file)) + elif start_str is not None: + if file.startswith(start_str): + files_found.append(os.path.join(directory, file)) + elif end_str is not None: + if file.endswith(end_str): + files_found.append(os.path.join(directory, file)) + return files_found + + +def load_carto_study(filename, verbose = 1): + """ + Loads a CARTO study from a .xml file. The function extracts the relevant information about the maps, + points, and signals, and saves them as .csv files in a subfolder for each map. + + Adapted to Python from original MATLAB code written by the OpenEP team [OpenEP]_ [OpenEP2]_. + Available at: https://github.com/openep/openep-core + + Parameters: + filename : str + Path to the CARTO .xml file. + + References + ---------- + .. [OpenEP] Williams SE and Linton NWF (Feb. 2026). OpenEP/openep-core: v1.0.03 (Version v1.0.03). Zenodo. https://doi.org/10.5281/zenodo.4471318 + .. [OpenEP2] Williams SE, Roney CH, Connolly A, Sim I, Whitaker J, O’Hare D, Kotadia I, O’Neill L, Corrado C, Bishop M, Niederer SA, Wright M, O’Neill M and Linton NWF (2021) OpenEP: A Cross-Platform Electroanatomic Mapping Data Format and Analysis Platform for Electrophysiology Research. Front. Physiol. 12:646023. doi: 10.3389/fphys.2021.646023 + + """ + import pandas as pd + import xml.etree.ElementTree as ET + + directory = filename.rsplit('\\', 1)[0] + + # load and parse the XML file + + tree = ET.parse(filename) + + map_branch = tree.find('Maps') + + nmaps = int(map_branch.attrib['Count']) + + for i in range(nmaps): + + map = map_branch.findall('Map')[i] + map_name = map.attrib['Name'] + map_meshfile = map.attrib['FileNames'] + + if verbose > 0: + + print(f"Loading map {i+1}/{nmaps}: {map_name}") + + n_points = int(map.findall('CartoPoints')[0].attrib['Count']) + + if int(n_points) > 0: + ECG_file = _find_file(directory, map_name, "ECG_Export.txt") + + [channel_names, reference_channel, voltages] = _read_ecg_files(ECG_file) + + map_coordinates = np.zeros((n_points, 3)) + map_ids = np.zeros((n_points,), dtype=int) + + points_woi = np.zeros((n_points, 2)) + points_reference = np.zeros((n_points, 1)) + points_unipolar = np.zeros((n_points, 1)) + points_bipolar = np.zeros((n_points, 1)) + points_map_annotation = np.zeros((n_points, 1)) + + signals = np.zeros((n_points, voltages.shape[0], voltages.shape[1])) + + points_filename = map_name + '_Points_Export.xml' + all_points = ET.parse(os.path.join(directory, points_filename)) + + for k in range(n_points): + map_coordinates[k,:] = [float(j) for j in map.findall('CartoPoints')[0].findall('Point')[k].attrib['Position3D'].split()] + map_ids[k] = int(map.findall('CartoPoints')[0].findall('Point')[k].attrib['Id']) + + point_filename = map_name + '_P' + str(map_ids[k]) + '_Point_Export.xml' + point_tree = ET.parse(os.path.join(directory, point_filename)) + points_woi[k,0] = float(point_tree.find("WOI").attrib['From']) + points_woi[k,1] = float(point_tree.find("WOI").attrib['To']) + points_reference[k,0] = int(point_tree.find("Annotations").attrib['Reference_Annotation']) + points_map_annotation[k,0] = int(point_tree.find("Annotations").attrib['Map_Annotation']) + points_unipolar[k,0] = float(point_tree.find("Voltages").attrib['Unipolar']) + points_bipolar[k,0] = float(point_tree.find("Voltages").attrib['Bipolar']) + + + point_ecg_filename = (os.path.join(directory, map_name + '_P' + str(map_ids[k]) + '_ECG_Export.txt')) + + [channel_names, reference_channel, voltages] = _read_ecg_files(point_ecg_filename) + + for m in range(len(channel_names)): + try: + signals[k, :, m] = voltages[:, m] + except: + print(f"Warning: Could not load signal for point {map_ids[k]}, channel {channel_names[m]}") + + point_filename = all_points.findall('Point')[k].attrib['File_Name'] + point_filename = os.path.join(directory, point_filename) + + # check if RF files exist + RF_files = _find_all_files(directory, start_str='RF_'+map_name) + + contact_force = _find_all_files(directory, start_str='ContactForceInRF_'+map_name) + + for j in range(len(RF_files)): + if j == 0: + RF_df = pd.read_table(RF_files[j]) + else: + RF_df = pd.concat([RF_df, pd.read_table(RF_files[j])]) + + for j in range(len(contact_force)): + if j == 0: + CF_df = pd.read_table(contact_force[j]) + else: + CF_df = pd.concat([CF_df, pd.read_table(contact_force[j])]) + + try: + df_vertices, df_triangles, df_data = _read_mesh_vertices(os.path.join(directory,map_meshfile)) + except: + print(f"Could not read mesh file: {map_meshfile} for map {map_name}") + continue + + # create a subfolder for each map + + subfolder = os.path.join(directory, map_name) + if not os.path.exists(subfolder): + os.makedirs(subfolder) + df_vertices.to_csv(os.path.join(subfolder, map_name+'_vertices.csv'), index=False) + df_triangles.to_csv(os.path.join(subfolder, map_name+'_triangles.csv'), index=False) + df_data.to_csv(os.path.join(subfolder, map_name+'_data.csv'), index=False) + + # save channel names (point 1 header) as csv + channel_names_df = pd.DataFrame(channel_names, columns=['Channel_Name']) + channel_names_df.to_csv(os.path.join(subfolder, map_name+'_channel_names.csv'), index=False) + + # save woi as csv + points_woi = pd.DataFrame(points_woi, columns=['From', 'To']) + points_woi.to_csv(os.path.join(subfolder, map_name+'_woi.csv'), index=False) + + # save reference annotations as csv + points_reference = pd.DataFrame(points_reference, columns=['Reference_Annotation']) + points_reference.to_csv(os.path.join(subfolder, map_name+'_reference_annotations.csv'), index=False) + + # save map annotations as csv + points_map_annotation = pd.DataFrame(points_map_annotation, columns=['Map_Annotation']) + points_map_annotation.to_csv(os.path.join(subfolder, map_name+'_map_annotations.csv'), index=False) + + # save point ids as csv + points_df = pd.DataFrame(map_ids, columns=['Point_ID']) + points_df.to_csv(os.path.join(subfolder, map_name+'_point_ids.csv'), index=False) + + # save map coordinates as csv + map_coords_df = pd.DataFrame(map_coordinates, columns=['X','Y','Z']) + map_coords_df.to_csv(os.path.join(subfolder, map_name+'_point_coords.csv'), index=False) + + # save unipolar and bipolar voltages as csv + points_unipolar_df = pd.DataFrame(points_unipolar, columns=['Unipolar']) + points_unipolar_df.to_csv(os.path.join(subfolder, map_name+'_unipolar.csv'), index=False) + points_bipolar_df = pd.DataFrame(points_bipolar, columns=['Bipolar']) + points_bipolar_df.to_csv(os.path.join(subfolder, map_name+'_bipolar.csv'), index=False) + + # new subfolder inside each map's directory for signals per point + subsubfolder = os.path.join(subfolder, 'signals_per_point') + if not os.path.exists(subsubfolder): + os.makedirs(subsubfolder) + for p in range(n_points): + point_signals = pd.DataFrame(signals[p, :, :], columns=channel_names) + point_signals.to_csv(os.path.join(subsubfolder, f'Point_{map_ids[p]}_signals.csv'), index=False) + + try: + CF_df.to_csv(os.path.join(subfolder, map_name+'_contact_force.csv'), index=False) + except: + pass + try: + RF_df.to_csv(os.path.join(subfolder, map_name+'_rf_data.csv'), index=False) + except: + pass + + if verbose > 0: + print("Done") + diff --git a/requirements.txt b/requirements.txt index 83b105de..eff613dc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,5 @@ shortuuid>=0.5.0 six>=1.11.0 joblib>=0.11 pywavelets>=1.4.1 -PeakUtils>=1.3.5 \ No newline at end of file +PeakUtils>=1.3.5 +pyvista \ No newline at end of file