Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/copy_probe_features.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ jobs:
curl -o src/probeinterface/resources/neuropixels_probe_features.json \
https://raw.githubusercontent.com/billkarsh/ProbeTable/refs/heads/main/Tables/probe_features.json

- name: Derive IMRO type mappings from catalogue
run: python resources/postprocess_neuropixels_probe_features.py

- name: Commit changes if any
id: commit
run: |
Expand Down
131 changes: 131 additions & 0 deletions resources/postprocess_neuropixels_probe_features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""
Post-process neuropixels_probe_features.json after syncing from ProbeTable.

Derives two mappings from the catalogue and writes them back into the JSON:

- z_imro_format_type_to_imro_format: IMRO type code -> IMRO format name
(e.g. "0" -> "imro_np1000", "1110" -> "imro_np1110")

- z_imro_format_type_to_part_number: IMRO type code -> canonical probe part number
(e.g. "0" -> "NP1000", "1110" -> "NP1110")

This script is called by the GitHub Action workflow that syncs probe_features.json
from billkarsh/ProbeTable, and can also be run standalone.
"""

import json
import re
from pathlib import Path

PROBE_FEATURES_PATH = (
Path(__file__).absolute().parent
/ "../src/probeinterface/resources/neuropixels_probe_features.json"
)


def _parse_type_values_from_val_def(val_def: str) -> list[str]:
"""Extract IMRO type code(s) from a val_def string.

Two patterns in ProbeTable:
type:{0,1020,1030,...} -> set of values
type:1110 -> single value
"""
match = re.match(r"type:\{([^}]+)\}", val_def)
if match:
return [v.strip() for v in match.group(1).split(",")]

match = re.match(r"type:(\d+)", val_def)
if match:
return [match.group(1)]

raise ValueError(f"Cannot parse type from val_def: {val_def!r}")


def build_derived_mappings(probe_features: dict) -> tuple[dict, dict]:
"""Build type-to-format and type-to-part-number mappings from the catalogue."""

imro_formats = probe_features["z_imro_formats"]
probes = probe_features["neuropixels_probes"]

# 1. Build type -> format mapping from val_def entries
type_to_format = {}
for key, val_def in imro_formats.items():
if not key.endswith("_val_def"):
continue
# e.g. "imro_np1000_val_def" -> "imro_np1000"
format_name = key.removesuffix("_val_def")
for type_code in _parse_type_values_from_val_def(val_def):
if type_code in type_to_format:
raise ValueError(
f"IMRO type {type_code!r} maps to both "
f"{type_to_format[type_code]!r} and {format_name!r}"
)
type_to_format[type_code] = format_name

# 2. Build type -> canonical part number mapping
# For each type, find probes that use the matching format, then pick
# the first NP-prefixed part number alphabetically.
#
# We also need to verify the candidate actually belongs to this type,
# not just the same format. For example, NP1021 uses imro_np1000 format
# but its IMRO type is not "0". We filter by checking the format's
# val_def includes the type code we're resolving.

# Invert: format -> set of type codes it covers
format_to_types = {}
for type_code, format_name in type_to_format.items():
format_to_types.setdefault(format_name, set()).add(type_code)

type_to_part_number = {}
for type_code, format_name in sorted(type_to_format.items()):
candidates = [
pn
for pn, spec in probes.items()
if spec.get("imro_table_format_type") == format_name
]

# Prefer a probe whose part number contains the type code (e.g. NP1020 for type "1020").
# This matters because many probes share the same IMRO format but have different
# physical geometries (e.g. NP1000 has 960 contacts, NP1020 has 2496).
exact_matches = sorted(
pn for pn in candidates if pn.startswith("NP") and type_code in pn
)
if exact_matches:
type_to_part_number[type_code] = exact_matches[0]
continue

# Fall back to first NP-prefixed name alphabetically
np_candidates = sorted(pn for pn in candidates if pn.startswith("NP"))
other_candidates = sorted(pn for pn in candidates if not pn.startswith("NP"))
ordered = np_candidates + other_candidates

if ordered:
type_to_part_number[type_code] = ordered[0]

return type_to_format, type_to_part_number


def postprocess(filepath: Path = PROBE_FEATURES_PATH) -> None:
filepath = filepath.resolve()
with open(filepath) as f:
probe_features = json.load(f)

type_to_format, type_to_part_number = build_derived_mappings(probe_features)

probe_features["z_imro_format_type_to_imro_format"] = dict(sorted(type_to_format.items(), key=lambda kv: int(kv[0])))
probe_features["z_imro_format_type_to_part_number"] = dict(sorted(type_to_part_number.items(), key=lambda kv: int(kv[0])))

with open(filepath, "w") as f:
json.dump(probe_features, f, indent=4)
f.write("\n")

print(f"Wrote derived mappings to {filepath}")
print(f" z_imro_format_type_to_imro_format: {len(type_to_format)} entries")
print(f" z_imro_format_type_to_part_number: {len(type_to_part_number)} entries")
for type_code in sorted(type_to_format, key=int):
pn = type_to_part_number.get(type_code, "???")
print(f" type {type_code:>5s} -> format={type_to_format[type_code]}, part_number={pn}")


if __name__ == "__main__":
postprocess()
138 changes: 62 additions & 76 deletions src/probeinterface/neuropixels_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,23 @@
# Utils zone #
###############

# Map imDatPrb_pn (probe number) to imDatPrb_type (probe type) when the latter is missing
# ONLY needed for `read_imro` function
probe_part_number_to_probe_type = {
# for old version without a probe number we assume NP1.0
None: "0",
# NP1.0
"PRB_1_4_0480_1": "0",
"PRB_1_4_0480_1_C": "0", # This is the metal cap version
"PRB_1_2_0480_2": "0",
"NP1010": "0",
# NHP probes lin
"NP1015": "1015",
"NP1016": "1015",
"NP1017": "1015",
# NHP probes stag med
"NP1020": "1020",
"NP1021": "1021",
"NP1022": "1022",
# NHP probes stag long
"NP1030": "1030",
"NP1031": "1031",
"NP1032": "1032",
# NP2.0
"NP2000": "21",
"NP2010": "24",
"NP2013": "2013",
"NP2014": "2014",
"NP2003": "2003",
"NP2004": "2004",
"PRB2_1_2_0640_0": "21",
"PRB2_4_2_0640_0": "24",
# NXT
"NP2020": "2020",
# Ultra
"NP1100": "1100", # Ultra probe - 1 bank
"NP1110": "1110", # Ultra probe - 16 banks no handle because
"NP1121": "1121", # Ultra probe - beta configuration
# Opto
"NP1300": "1300", # Opto probe
# IMRO type codes not listed in any val_def entry in the ProbeTable catalogue.
# These probes all use the imro_np1000 or imro_np2003/imro_np2013 format, but their
# type codes are not in the corresponding val_def type sets.
# We don't know if SpikeGLX actually produces IMRO files with these type codes
# (there is no test data for them). They are kept here for backwards compatibility.
# Values are (imro_format_name, canonical_part_number).
#
# TODO: @team - Should these be added to ProbeTable's val_def, or can they be removed?
# If SpikeGLX never produces these type codes, this dict can be deleted entirely.
_imro_format_type_fallback = {
"1015": ("imro_np1000", "NP1015"),
"1021": ("imro_np1000", "NP1021"),
"1022": ("imro_np1000", "NP1022"),
"1031": ("imro_np1000", "NP1031"),
"1032": ("imro_np1000", "NP1032"),
"2004": ("imro_np2003", "NP2004"),
"2014": ("imro_np2013", "NP2014"),
}

# Map from imro format to ProbeInterface naming conventions
Expand Down Expand Up @@ -439,24 +417,20 @@ def _annotate_probe_with_adc_sampling_info(probe: Probe, adc_sampling_table: str
#########################


def _parse_imro_string(imro_table_string: str, probe_part_number: str) -> dict:
def _parse_imro_string(imro_table_string: str) -> dict:
"""
Parse IMRO (Imec ReadOut) table string into structured per-channel data.

IMRO format: "(probe_type,num_chans)(ch0 bank0 ref0 ...)(ch1 bank1 ref1 ...)..."
Example: "(0,384)(0 1 0 500 250 1)(1 0 0 500 250 1)..."

Note: The IMRO header contains a probe_type field (e.g., "0", "21", "24"), which is
a numeric format version identifier that specifies which IMRO table structure was used.
Different probe generations use different IMRO formats. This is a file format detail,
not a physical probe property.
The IMRO type is extracted from the header and used to look up the field schema
from the catalogue (z_imro_format_type_to_imro_format). No probe part number is needed.

Parameters
----------
imro_table_string : str
IMRO table string from SpikeGLX metadata file
probe_part_number : str
Probe part number (e.g., "NP1000", "NP2000")

Returns
-------
Expand All @@ -473,22 +447,41 @@ def _parse_imro_string(imro_table_string: str, probe_part_number: str) -> dict:
Example for NP1110: {"header": {"type": 1110, "col_mode": 2, "ref_id": 0, ...},
"group": [0,1,...], "bankA": [0,0,...], "bankB": [0,0,...]} # 24 entries, not 384
"""
# Get IMRO field format from catalogue
# Parse IMRO header and per-entry values
header_str, *imro_table_values_list, _ = imro_table_string.strip().split(")")
header_values = tuple(map(int, header_str[1:].split(",")))

# Extract IMRO type from header. Phase3A probes have a 3-field header; all others
# have 2+ fields with type as the first. Phase3A is treated as type 0.
if len(header_values) == 3:
imro_format_type = "0"
else:
imro_format_type = str(header_values[0])

# Look up the IMRO format schema from the catalogue's derived mappings
probe_features = _load_np_probe_features()
probe_spec = probe_features["neuropixels_probes"][probe_part_number]
imro_format = probe_spec["imro_table_format_type"]
type_to_format = probe_features["z_imro_format_type_to_imro_format"]

if imro_format_type in type_to_format:
imro_format = type_to_format[imro_format_type]
elif imro_format_type in _imro_format_type_fallback:
imro_format = _imro_format_type_fallback[imro_format_type][0]
else:
valid_types = sorted(set(type_to_format) | set(_imro_format_type_fallback), key=int)
raise ValueError(f"Unknown IMRO type '{imro_format_type}'. Valid types: {valid_types}")

imro_fields_string = probe_features["z_imro_formats"][imro_format + "_elm_flds"]
imro_fields = tuple(imro_fields_string.replace("(", "").replace(")", "").split(" "))

# Parse IMRO header and per-entry values
header_str, *imro_table_values_list, _ = imro_table_string.strip().split(")")

# Parse header fields using the catalogue schema
imro_header_fields_string = probe_features["z_imro_formats"][imro_format + "_hdr_flds"]
imro_header_fields = tuple(imro_header_fields_string.replace("(", "").replace(")", "").split(","))
header_values = tuple(map(int, header_str[1:].split(",")))
# Initialize with parsed header and empty lists for per-entry fields (filled below)
# Initialize with parsed header and empty lists for per-entry fields (filled below).
# For Phase3A (3-field header), zip silently drops the extra value, which is correct.
imro_per_channel = {"header": dict(zip(imro_header_fields, header_values))}
# Normalize Phase3A header type to 0 so downstream code reads it consistently
if len(header_values) == 3:
imro_per_channel["header"]["type"] = 0
for field in imro_fields:
imro_per_channel[field] = []
for field_values_str in imro_table_values_list:
Expand Down Expand Up @@ -716,34 +709,27 @@ def read_imro(file_path: str | Path) -> Probe:
https://billkarsh.github.io/SpikeGLX/help/imroTables/

"""
# ===== 1. Read file and determine probe part number from IMRO header =====
# ===== 1. Read file =====
meta_file = Path(file_path)
assert meta_file.suffix == ".imro", "'file' should point to the .imro file"
with meta_file.open(mode="r") as f:
imro_str = str(f.read())

imro_table_header_str, *imro_table_values_list, _ = imro_str.strip().split(")")
imro_table_header = tuple(map(int, imro_table_header_str[1:].split(",")))
# ===== 2. Parse IMRO table (type is extracted from the header automatically) =====
imro_per_channel = _parse_imro_string(imro_str)

if len(imro_table_header) == 3:
# In older versions of neuropixel arrays (phase 3A), imro tables were structured differently.
# We use probe_type "0", which maps to probe_part_number NP1010 as a proxy for Phase3a.
imDatPrb_type = "0"
elif len(imro_table_header) == 2:
imDatPrb_type, _ = imro_table_header
# ===== 3. Resolve probe part number and build full probe =====
imro_format_type = str(imro_per_channel["header"]["type"])
probe_features = _load_np_probe_features()
type_to_pn = probe_features["z_imro_format_type_to_part_number"]
if imro_format_type in type_to_pn:
probe_part_number = type_to_pn[imro_format_type]
elif imro_format_type in _imro_format_type_fallback:
probe_part_number = _imro_format_type_fallback[imro_format_type][1]
else:
raise ValueError(f"read_imro error, the header has a strange length: {imro_table_header}")
imDatPrb_type = str(imDatPrb_type)

for probe_part_number, probe_type in probe_part_number_to_probe_type.items():
if imDatPrb_type == probe_type:
imDatPrb_pn = probe_part_number

# ===== 2. Interpret IMRO table =====
imro_per_channel = _parse_imro_string(imro_str, imDatPrb_pn)

# ===== 3. Build full probe with all possible contacts =====
full_probe = build_neuropixels_probe(probe_part_number=imDatPrb_pn)
valid_types = sorted(set(type_to_pn) | set(_imro_format_type_fallback), key=int)
raise ValueError(f"Unknown IMRO type '{imro_format_type}'. Valid types: {valid_types}")
full_probe = build_neuropixels_probe(probe_part_number=probe_part_number)

# ===== 4. Slice full probe to active electrodes =====
active_contact_ids = _get_imro_active_contact_ids(imro_per_channel)
Expand Down Expand Up @@ -820,7 +806,7 @@ def read_spikeglx(file: str | Path) -> Probe:
# Specifies which electrodes were selected for recording (e.g., 384 of 960) plus their
# acquisition settings (gains, references, filters). See: https://billkarsh.github.io/SpikeGLX/help/imroTables/
imro_table_string = meta["imroTbl"]
imro_per_channel = _parse_imro_string(imro_table_string, imDatPrb_pn)
imro_per_channel = _parse_imro_string(imro_table_string)

# ===== 4. Slice full probe to active electrodes =====
active_contact_ids = _get_imro_active_contact_ids(imro_per_channel)
Expand Down
Loading
Loading