251 lines
7.8 KiB
Python
251 lines
7.8 KiB
Python
|
|
# -*- coding: utf-8 -*-
|
||
|
|
"""
|
||
|
|
bdf_analyzer.py
|
||
|
|
|
||
|
|
Analyze .bdf files - print data amplitude range and mean values.
|
||
|
|
Supports single file or batch processing of all .bdf files in a directory.
|
||
|
|
"""
|
||
|
|
import os
|
||
|
|
import glob
|
||
|
|
import numpy as np
|
||
|
|
import mne
|
||
|
|
import scipy.signal as signal
|
||
|
|
|
||
|
|
|
||
|
|
def analyze_bdf(filepath: str, unit: str = "uV") -> dict:
|
||
|
|
"""
|
||
|
|
Analyze a single .bdf file and compute statistics.
|
||
|
|
|
||
|
|
Parameters
|
||
|
|
----------
|
||
|
|
filepath : str
|
||
|
|
Path to .bdf file
|
||
|
|
unit : str, optional
|
||
|
|
Display unit (default: uV for microvolts)
|
||
|
|
|
||
|
|
Returns
|
||
|
|
-------
|
||
|
|
dict
|
||
|
|
Dictionary containing statistics
|
||
|
|
"""
|
||
|
|
print("=" * 60)
|
||
|
|
print(f"File: {os.path.basename(filepath)}")
|
||
|
|
print("=" * 60)
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Read BDF file
|
||
|
|
raw = mne.io.read_raw_bdf(filepath, preload=True, verbose=False)
|
||
|
|
|
||
|
|
# Get data (n_channels, n_times) in V
|
||
|
|
data = raw.get_data()
|
||
|
|
n_channels, n_times = data.shape
|
||
|
|
sfreq = raw.info["sfreq"]
|
||
|
|
|
||
|
|
# Convert to microvolts (uV)
|
||
|
|
data_uv = data * 1e6
|
||
|
|
|
||
|
|
# Raw data statistics (V)
|
||
|
|
raw_all = data.flatten()
|
||
|
|
raw_min = float(np.min(raw_all))
|
||
|
|
raw_max = float(np.max(raw_all))
|
||
|
|
raw_mean = float(np.mean(raw_all))
|
||
|
|
raw_std = float(np.std(raw_all))
|
||
|
|
|
||
|
|
# Overall statistics
|
||
|
|
all_values = data_uv.flatten()
|
||
|
|
min_val = np.min(all_values)
|
||
|
|
max_val = np.max(all_values)
|
||
|
|
mean_val = np.mean(all_values)
|
||
|
|
std_val = np.std(all_values)
|
||
|
|
|
||
|
|
print(f"Sampling rate: {sfreq:.2f} Hz")
|
||
|
|
print(f"Channels: {n_channels}")
|
||
|
|
print(f"Samples: {n_times:,}")
|
||
|
|
print(f"Duration: {n_times / sfreq:.2f} sec")
|
||
|
|
print("-" * 40)
|
||
|
|
print(f"[RAW - V]")
|
||
|
|
print(f"Amplitude range: [{raw_min:.6f}, {raw_max:.6f}] V")
|
||
|
|
print(f"Mean value: {raw_mean:.6f} V")
|
||
|
|
print(f"Std deviation: {raw_std:.6f} V")
|
||
|
|
print(f"[RAW - uV]")
|
||
|
|
print(f"Amplitude range: [{min_val:.4f}, {max_val:.4f}] uV")
|
||
|
|
print(f"Mean value: {mean_val:.4f} uV")
|
||
|
|
print(f"Std deviation: {std_val:.4f} uV")
|
||
|
|
print("-" * 40)
|
||
|
|
|
||
|
|
# Per-channel statistics
|
||
|
|
print("\nPer-channel statistics:")
|
||
|
|
print(f"{'Channel':<15} {'Min (uV)':<15} {'Max (uV)':<15} {'Mean (uV)':<15} {'PSD Peak (Hz)':<15}")
|
||
|
|
print("-" * 75)
|
||
|
|
|
||
|
|
channel_stats = []
|
||
|
|
for i, ch_name in enumerate(raw.ch_names):
|
||
|
|
ch_data = data_uv[i, :]
|
||
|
|
ch_min = np.min(ch_data)
|
||
|
|
ch_max = np.max(ch_data)
|
||
|
|
ch_mean = np.mean(ch_data)
|
||
|
|
|
||
|
|
# PSD peak frequency
|
||
|
|
nperseg = min(1024, n_times)
|
||
|
|
freqs, pxx = signal.welch(ch_data, fs=sfreq, nperseg=nperseg)
|
||
|
|
peak_idx = np.argmax(pxx)
|
||
|
|
peak_freq = freqs[peak_idx]
|
||
|
|
|
||
|
|
print(f"{ch_name:<15} {ch_min:<15.4f} {ch_max:<15.4f} {ch_mean:<15.4f} {peak_freq:<15.2f}")
|
||
|
|
channel_stats.append({
|
||
|
|
"name": ch_name,
|
||
|
|
"min": ch_min,
|
||
|
|
"max": ch_max,
|
||
|
|
"mean": ch_mean,
|
||
|
|
"psd_peak_hz": peak_freq
|
||
|
|
})
|
||
|
|
|
||
|
|
print("=" * 60)
|
||
|
|
print()
|
||
|
|
|
||
|
|
return {
|
||
|
|
"filepath": filepath,
|
||
|
|
"sfreq": sfreq,
|
||
|
|
"n_channels": n_channels,
|
||
|
|
"n_times": n_times,
|
||
|
|
"duration": n_times / sfreq,
|
||
|
|
"raw_min": raw_min,
|
||
|
|
"raw_max": raw_max,
|
||
|
|
"raw_mean": raw_mean,
|
||
|
|
"raw_std": raw_std,
|
||
|
|
"min": min_val,
|
||
|
|
"max": max_val,
|
||
|
|
"mean": mean_val,
|
||
|
|
"std": std_val,
|
||
|
|
"channels": channel_stats
|
||
|
|
}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[ERROR] Failed to read file: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def analyze_directory(dir_path: str) -> list:
|
||
|
|
"""
|
||
|
|
Analyze all .bdf files in a directory.
|
||
|
|
|
||
|
|
Parameters
|
||
|
|
----------
|
||
|
|
dir_path : str
|
||
|
|
Directory path
|
||
|
|
|
||
|
|
Returns
|
||
|
|
-------
|
||
|
|
list
|
||
|
|
List of analysis results for all files
|
||
|
|
"""
|
||
|
|
# Find all .bdf files
|
||
|
|
bdf_files = sorted(glob.glob(os.path.join(dir_path, "*.bdf")))
|
||
|
|
|
||
|
|
if not bdf_files:
|
||
|
|
print(f"[WARNING] No .bdf files found in: {dir_path}")
|
||
|
|
return []
|
||
|
|
|
||
|
|
print(f"Found {len(bdf_files)} .bdf file(s)\n")
|
||
|
|
|
||
|
|
results = []
|
||
|
|
for filepath in bdf_files:
|
||
|
|
result = analyze_bdf(filepath)
|
||
|
|
if result:
|
||
|
|
results.append(result)
|
||
|
|
|
||
|
|
# Summary statistics
|
||
|
|
if results:
|
||
|
|
print("\n" + "=" * 60)
|
||
|
|
print("Summary")
|
||
|
|
print("=" * 60)
|
||
|
|
|
||
|
|
all_means = [r["mean"] for r in results]
|
||
|
|
all_mins = [r["min"] for r in results]
|
||
|
|
all_maxs = [r["max"] for r in results]
|
||
|
|
|
||
|
|
print(f"File count: {len(results)}")
|
||
|
|
print(f"[RAW - V] Overall range: [{min(r['raw_min'] for r in results):.6f}, {max(r['raw_max'] for r in results):.6f}] V")
|
||
|
|
print(f"[RAW - V] Avg mean: {np.mean([r['raw_mean'] for r in results]):.6f} V")
|
||
|
|
print(f"[RAW - uV] Overall range: [{min(all_mins):.4f}, {max(all_maxs):.4f}] uV")
|
||
|
|
print(f"[RAW - uV] Avg mean: {np.mean(all_means):.4f} uV")
|
||
|
|
print(f"Max value file: {results[np.argmax(all_maxs)]['filepath']}")
|
||
|
|
print(f"Min value file: {results[np.argmin(all_mins)]['filepath']}")
|
||
|
|
|
||
|
|
# Per-channel mean summary across all files
|
||
|
|
n_channels = len(results[0]["channels"])
|
||
|
|
ch_names = [results[0]["channels"][i]["name"] for i in range(n_channels)]
|
||
|
|
ch_mean_over_files = []
|
||
|
|
for ch_idx in range(n_channels):
|
||
|
|
ch_means = [results[f_idx]["channels"][ch_idx]["mean"] for f_idx in range(len(results))]
|
||
|
|
ch_mean_over_files.append(np.mean(ch_means))
|
||
|
|
|
||
|
|
ch_peak_over_files = []
|
||
|
|
for ch_idx in range(n_channels):
|
||
|
|
ch_peaks = [results[f_idx]["channels"][ch_idx]["psd_peak_hz"] for f_idx in range(len(results))]
|
||
|
|
ch_peak_over_files.append(np.mean(ch_peaks))
|
||
|
|
|
||
|
|
print("\nPer-channel mean across all files:")
|
||
|
|
print(f"{'Channel':<15} {'Mean (uV)':<15} {'PSD Peak (Hz)':<15}")
|
||
|
|
print("-" * 45)
|
||
|
|
for ch_name, ch_mean, ch_peak in zip(ch_names, ch_mean_over_files, ch_peak_over_files):
|
||
|
|
print(f"{ch_name:<15} {ch_mean:<15.4f} {ch_peak:<15.2f}")
|
||
|
|
|
||
|
|
return results
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
"""Main function with CLI support."""
|
||
|
|
import argparse
|
||
|
|
|
||
|
|
# Default analysis directory
|
||
|
|
default_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "raw_data")
|
||
|
|
|
||
|
|
parser = argparse.ArgumentParser(
|
||
|
|
description="Analyze .bdf files - print amplitude range and mean values",
|
||
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
|
|
epilog=f"""
|
||
|
|
Examples:
|
||
|
|
python bdf_analyzer.py # Analyze all .bdf in raw_data/
|
||
|
|
python bdf_analyzer.py data/test.bdf # Analyze single file
|
||
|
|
python bdf_analyzer.py data/ # Analyze all .bdf in directory
|
||
|
|
python bdf_analyzer.py . -u mV # Current dir, unit mV
|
||
|
|
"""
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"path",
|
||
|
|
nargs="?",
|
||
|
|
default=default_dir,
|
||
|
|
help="Path to BDF file or directory containing BDF files (default: raw_data/)"
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"-u", "--unit",
|
||
|
|
choices=["uV", "mV", "V"],
|
||
|
|
default="uV",
|
||
|
|
help="Display unit (default: uV)"
|
||
|
|
)
|
||
|
|
|
||
|
|
args = parser.parse_args()
|
||
|
|
|
||
|
|
filepath = args.path
|
||
|
|
|
||
|
|
# Determine file or directory mode
|
||
|
|
if os.path.isfile(filepath):
|
||
|
|
# Single file mode
|
||
|
|
result = analyze_bdf(filepath, unit=args.unit)
|
||
|
|
if result:
|
||
|
|
print("Analysis complete!")
|
||
|
|
elif os.path.isdir(filepath):
|
||
|
|
# Directory mode
|
||
|
|
results = analyze_directory(filepath)
|
||
|
|
if results:
|
||
|
|
print("\nBatch analysis complete!")
|
||
|
|
else:
|
||
|
|
print("No analyzable files found")
|
||
|
|
else:
|
||
|
|
print(f"[ERROR] File does not exist: {filepath}")
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|