diff --git a/doc/changes/dev/13869.bugfix.rst b/doc/changes/dev/13869.bugfix.rst new file mode 100644 index 00000000000..259f7aaff79 --- /dev/null +++ b/doc/changes/dev/13869.bugfix.rst @@ -0,0 +1 @@ +Fix SourceChGain/SourceChOffset calibration in BCI2000 reader so EEG data is returned in volts, by `Hansuja Budhiraja`_. \ No newline at end of file diff --git a/mne/io/bci2k/bci2k.py b/mne/io/bci2k/bci2k.py index 39cd709a11a..4ee0e65e60c 100644 --- a/mne/io/bci2k/bci2k.py +++ b/mne/io/bci2k/bci2k.py @@ -11,16 +11,23 @@ from ...utils import verbose from ..base import BaseRaw +_VOLT_SCALE = {"v": 1.0, "mv": 1e-3, "muv": 1e-6, "uv": 1e-6, "nv": 1e-9, "": 1e-6} +_FREQ_SCALE = {"hz": 1.0, "khz": 1e3, "": 1.0} -def _parse_sampling_rate(val): - # Accept e.g. "256", "256Hz", "256.0 Hz" - text = str(val).strip() - text = re.sub(r"\s*Hz\s*$", "", text, flags=re.IGNORECASE) - # Grab the first float-looking token - m = re.search(r"[-+]?\d*\.?\d+(?:[eE][-+]?\d+)?", text) - if m is None: - raise ValueError(f"Could not parse SamplingRate from {val!r}") - return float(m.group(0)) + +def _parse_value_with_unit(token, unit_scale): + """Split a numeric token with optional unit into value and scale.""" + text = str(token).strip().replace("µ", "u") + m = re.search(r"[a-zA-Z]+$", text) + if m: + num = float(text[: m.start()]) + unit = m.group().lower() + else: + num = float(text) + unit = "" + if unit not in unit_scale: + raise ValueError(f"Unrecognized unit '{unit}' in token '{token}'.") + return num, unit_scale[unit] def _parse_bci2k_header(fname): @@ -73,8 +80,20 @@ def _parse_bci2k_header(fname): if "Parameter Definition" in current_section: if "=" in line: left, right = line.split("=", 1) - name = left.strip().split()[-1] - value = right.strip().split()[0] + left_tokens = left.strip().split() + name = left_tokens[-1] + param_type = ( + left_tokens[-2].lower() if len(left_tokens) >= 2 else "" + ) + rhs = right.split("//", 1)[0].strip() + rhs_tokens = rhs.split() + if not rhs_tokens: + continue + if param_type.endswith("list"): + n_vals = int(rhs_tokens[0]) + value = rhs_tokens[1 : n_vals + 1] + else: + value = rhs_tokens[0] params[name] = value continue @@ -101,7 +120,10 @@ def _parse_bci2k_header(fname): "Could not find 'SamplingRate' in the BCI2000 Parameter Definition section." ) - sfreq = _parse_sampling_rate(params["SamplingRate"]) + sfreq_val, sfreq_scale = _parse_value_with_unit( + params["SamplingRate"], unit_scale=_FREQ_SCALE + ) + sfreq = sfreq_val * sfreq_scale return { "header_len": header_len, @@ -154,6 +176,23 @@ def _read_bci2k_data(fname, info_dict): ) signal = signal.T.astype(np.float64) # (n_channels, n_samples) + params = info_dict["params"] + if "SourceChOffset" in params and "SourceChGain" in params: + offsets = params["SourceChOffset"] + gains = params["SourceChGain"] + if len(offsets) != n_channels or len(gains) != n_channels: + raise ValueError( + "Expected SourceChOffset and SourceChGain lengths to match SourceCh." + ) + offsets_arr = np.array([float(val) for val in offsets]) + gain_parsed = [ + _parse_value_with_unit(val, unit_scale=_VOLT_SCALE) for val in gains + ] + gains_arr = np.array([val for val, _ in gain_parsed]) + gain_scales = np.array([scale for _, scale in gain_parsed]) + signal = (signal + offsets_arr[:, np.newaxis]) * ( + gains_arr[:, np.newaxis] * gain_scales[:, np.newaxis] + ) state_bytes = state_bytes.T # (state_vec_len, n_samples), dtype=uint8 return signal, state_bytes diff --git a/mne/io/bci2k/tests/test_bci2k.py b/mne/io/bci2k/tests/test_bci2k.py index 3aad8661740..44bf99c8c8c 100644 --- a/mne/io/bci2k/tests/test_bci2k.py +++ b/mne/io/bci2k/tests/test_bci2k.py @@ -4,6 +4,10 @@ import mne from mne.datasets import testing +from mne.io.bci2k.bci2k import ( + _parse_bci2k_header, + _parse_value_with_unit, +) data_path = testing.data_path(download=False) bci2k_fname = data_path / "BCI2k" / "bci2k_test.dat" @@ -25,3 +29,21 @@ def test_read_raw_bci2k(): assert events.ndim == 2 assert events.shape[1] == 3 assert "RawBCI2k" in repr(raw) + + info_dict = _parse_bci2k_header(bci2k_fname) + assert info_dict["params"]["SourceChOffset"] == ["0", "0"] + assert info_dict["params"]["SourceChGain"] == ["0.1muV", "0.1muV"] + + +def test_parse_value_with_unit(): + """Test numeric token parsing with embedded unit suffixes.""" + volt_scale = {"v": 1.0, "mv": 1e-3, "muv": 1e-6, "uv": 1e-6, "nv": 1e-9} + assert _parse_value_with_unit("0.1muV", unit_scale=volt_scale) == (0.1, 1e-6) + assert _parse_value_with_unit("2mV", unit_scale=volt_scale) == (2.0, 1e-3) + assert _parse_value_with_unit("-3.5µV", unit_scale=volt_scale) == (-3.5, 1e-6) + + freq_scale = {"hz": 1.0, "khz": 1e3} + value, scale = _parse_value_with_unit("256Hz", unit_scale=freq_scale) + assert value * scale == 256 + value, scale = _parse_value_with_unit("0.5kHz", unit_scale=freq_scale) + assert value * scale == 500