diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 242e71bc2..bca538be5 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -75,7 +75,7 @@ jobs: sudo apt update --fix-missing --yes sudo apt upgrade --yes sudo apt-get install --yes git - sudo apt-get clean + sudo apt-get clean - uses: actions/checkout@v4 @@ -88,9 +88,9 @@ jobs: python --version pip show setuptools rm -rf ~/.cache/pip - + - name: Print available disk space before graphnet install - run: | + run: | df -h - name: Upgrade packages in virtual environment shell: bash @@ -128,7 +128,7 @@ jobs: pip show torch-scatter pip show jammy_flows - name: Print available disk space after graphnet install - run: | + run: | df -h - name: Run unit tests and generate coverage report shell: bash diff --git a/src/graphnet/models/easy_model.py b/src/graphnet/models/easy_model.py index b43b7d63e..f01baad12 100644 --- a/src/graphnet/models/easy_model.py +++ b/src/graphnet/models/easy_model.py @@ -1,6 +1,5 @@ """Suggested Model subclass that enables simple user syntax.""" -from collections import OrderedDict from typing import Any, Dict, List, Optional, Union, Type import numpy as np @@ -10,7 +9,7 @@ from torch import Tensor from torch.nn import ModuleList from torch.optim import Adam -from torch.utils.data import DataLoader, SequentialSampler +from torch.utils.data import DataLoader from torch_geometric.data import Data import pandas as pd from pytorch_lightning.loggers import Logger as LightningLogger @@ -288,14 +287,59 @@ def train(self, mode: bool = True) -> "Model": task.train_eval() return self + def predict_step(self, *args: Any, **kwargs: Any) -> List[Any]: + """Perform prediction step. + + Returns a list whose first entries are the per-task prediction + tensors and whose trailing entries are numpy arrays for any + attributes requested via `_predict_additional_attributes`. Pulling + attributes here avoids a second pass over the dataloader in + `predict_as_dataframe`. + """ + batch = kwargs.get("batch", args[0]) + pred = list(self(batch)) + + attrs = getattr(self, "_predict_additional_attributes", None) + if not attrs: + return pred + + # Pulse- vs event-level: pred[0] has one row per node when + # predictions are on the pulse level. + pulse_level_predictions = len(pred[0]) > len(batch) + n_pulses = ( + batch.n_pulses.detach().cpu().numpy() + if pulse_level_predictions + else None + ) + for attr in attrs: + value = batch[attr] + if isinstance(value, torch.Tensor): + value = value.detach().cpu().numpy() + else: + value = np.asarray(value) + if ( + pulse_level_predictions + and n_pulses is not None + and len(value) < n_pulses.sum() + ): + value = np.repeat(value, n_pulses) + pred.append(value) + return pred + def predict( self, dataloader: DataLoader, gpus: Optional[Union[List[int], int]] = None, distribution_strategy: Optional[str] = "auto", + additional_attributes: Optional[List[str]] = None, **trainer_kwargs: Any, - ) -> List[Tensor]: - """Return predictions for `dataloader`.""" + ) -> List[Union[Tensor, np.ndarray]]: + """Return predictions for `dataloader`. + + If `additional_attributes` is provided, the returned list has the + per-task prediction tensors followed by one numpy array per + requested attribute, gathered from the same dataloader pass. + """ self.inference() self.train(mode=False) @@ -310,14 +354,30 @@ def predict( **trainer_kwargs, ) - predictions_list = inference_trainer.predict(self, dataloader) + # Stash on self so predict_step can see it; always clear. + self._predict_additional_attributes = additional_attributes or None + try: + predictions_list = inference_trainer.predict(self, dataloader) + finally: + self._predict_additional_attributes = None assert len(predictions_list), "Got no predictions" + # The trailing entries in each batch's output are the gathered + # additional attributes (numpy arrays); everything before is a + # per-task prediction tensor. + nb_attrs = len(additional_attributes or []) nb_outputs = len(predictions_list[0]) - predictions: List[Tensor] = [ - torch.cat([preds[ix] for preds in predictions_list], dim=0) - for ix in range(nb_outputs) - ] + nb_task_outputs = nb_outputs - nb_attrs + predictions: List[Union[Tensor, np.ndarray]] = [] + for ix in range(nb_outputs): + if ix < nb_task_outputs: + predictions.append( + torch.cat([p[ix] for p in predictions_list], dim=0) + ) + else: + predictions.append( + np.concatenate([p[ix] for p in predictions_list], axis=0) + ) return predictions def predict_as_dataframe( @@ -333,79 +393,47 @@ def predict_as_dataframe( """Return predictions for `dataloader` as a DataFrame. Include `additional_attributes` as additional columns in the output - DataFrame. + DataFrame. Attributes are gathered during the prediction pass, so + the dataloader is iterated only once and shuffling is safe. """ if prediction_columns is None: prediction_columns = self.prediction_labels - if additional_attributes is None: additional_attributes = [] assert isinstance(additional_attributes, list) - if ( - not isinstance(dataloader.sampler, SequentialSampler) - and additional_attributes - ): - print(dataloader.sampler) - raise UserWarning( - "DataLoader has a `sampler` that is not `SequentialSampler`, " - "indicating that shuffling is enabled. Using " - "`predict_as_dataframe` with `additional_attributes` assumes " - "that the sequence of batches in `dataloader` are " - "deterministic. Either call this method a `dataloader` which " - "doesn't resample batches; or do not request " - "`additional_attributes`." - ) self.info(f"Column names for predictions are: \n {prediction_columns}") - predictions_torch = self.predict( + + outputs = self.predict( dataloader=dataloader, gpus=gpus, distribution_strategy=distribution_strategy, + additional_attributes=additional_attributes, **trainer_kwargs, ) - predictions = ( - torch.cat(predictions_torch, dim=1).detach().cpu().numpy() - ) + + # `predict` returns task tensors first, then one np.ndarray per + # requested attribute — split on that boundary. + split = len(outputs) - len(additional_attributes) + pred_tensors = outputs[:split] + attr_arrays = outputs[split:] + + predictions = torch.cat(pred_tensors, dim=1).detach().cpu().numpy() assert len(prediction_columns) == predictions.shape[1], ( f"Number of provided column names ({len(prediction_columns)}) and " f"number of output columns ({predictions.shape[1]}) don't match." ) - # Check if predictions are on event- or pulse-level - pulse_level_predictions = len(predictions) > len(dataloader.dataset) - - # Get additional attributes - attributes: Dict[str, List[np.ndarray]] = OrderedDict( - [(attr, []) for attr in additional_attributes] - ) - for batch in dataloader: - for attr in attributes: - attribute = batch[attr] - if isinstance(attribute, torch.Tensor): - attribute = attribute.detach().cpu().numpy() - - # Check if node level predictions - # If true, additional attributes are repeated - # to make dimensions fit - if pulse_level_predictions: - if len(attribute) < np.sum( - batch.n_pulses.detach().cpu().numpy() - ): - attribute = np.repeat( - attribute, batch.n_pulses.detach().cpu().numpy() - ) - attributes[attr].extend(attribute) - - # Confirm that attributes match length of predictions - skip_attributes = [] - for attr in attributes.keys(): - try: - assert len(attributes[attr]) == len(predictions) - except AssertionError: + columns: Dict[str, np.ndarray] = { + name: predictions[:, i] + for i, name in enumerate(prediction_columns) + } + for name, arr in zip(additional_attributes, attr_arrays): + if len(arr) != len(predictions): self.warning_once( "Could not automatically adjust length" - f" of additional attribute '{attr}' to match length of" - f" predictions.This error can be caused by heavy" + f" of additional attribute '{name}' to match length of" + " predictions. This error can be caused by heavy" " disagreement between number of examples in the" " dataset vs. actual events in the dataloader, e.g. " " heavy filtering of events in `collate_fn` passed to" @@ -413,26 +441,19 @@ def predict_as_dataframe( " pulse-level attributes for `Task`s that produce" " event-level predictions. Attribute skipped." ) - skip_attributes.append(attr) - - # Remove bad attributes - for attr in skip_attributes: - attributes.pop(attr) - additional_attributes.remove(attr) - - data = np.concatenate( - [predictions] - + [ - np.asarray(values)[:, np.newaxis] - for values in attributes.values() - ], - axis=1, - ) - - results = pd.DataFrame( - data, columns=prediction_columns + additional_attributes - ) - return results + continue + arr = np.asarray(arr) + if arr.ndim == 1: + columns[name] = arr + else: + # Multi-dim target (e.g. `direction` with x/y/z components): + # expand to one column per component so the DataFrame stays + # tabular. + flat = arr.reshape(len(arr), -1) + for i in range(flat.shape[1]): + columns[f"{name}_{i}"] = flat[:, i] + + return pd.DataFrame(columns) def _create_default_callbacks( self, diff --git a/tests/models/test_easy_model.py b/tests/models/test_easy_model.py new file mode 100644 index 000000000..4ae601a6c --- /dev/null +++ b/tests/models/test_easy_model.py @@ -0,0 +1,243 @@ +"""Unit tests for `EasySyntax.predict_as_dataframe`. + +Uses a minimal `EasySyntax` subclass and a synthetic in-memory dataset so +the tests exercise only the prediction/attribute-gathering machinery, +not GNN/Detector/Task code. +""" + +from typing import Iterator, List, Union + +import numpy as np +import pytest +import torch +from torch import Tensor +from torch_geometric.data import Data +from torch_geometric.loader import DataLoader + +from graphnet.models.easy_model import EasySyntax + + +class _FakeTask(torch.nn.Module): + def __init__(self, prediction_labels: List[str]) -> None: + super().__init__() + self._prediction_labels = list(prediction_labels) + self._target_labels: List[str] = [] + + def inference(self) -> None: + pass + + def train_eval(self) -> None: + pass + + +class _FakeModel(EasySyntax): + """Echoes `event_id` as the prediction so we can verify alignment.""" + + def __init__(self, prediction_labels: List[str]) -> None: + super().__init__(tasks=[_FakeTask(prediction_labels)]) + self._dummy = torch.nn.Linear(1, 1) # so Lightning sees parameters + + def validate_tasks(self) -> None: + pass + + def forward(self, data: Union[Data, List[Data]]) -> List[Tensor]: + if isinstance(data, list): + data = data[0] + # Per-event prediction: repeat event_id across all prediction columns. + eid = data.event_id.to(torch.float32).unsqueeze(1) + n_cols = len(self.prediction_labels) + return [eid.repeat(1, n_cols)] + + def compute_loss( # type: ignore[override] + self, preds: Tensor, data: List[Data], verbose: bool = False + ) -> Tensor: + raise NotImplementedError + + def shared_step( # type: ignore[override] + self, batch: List[Data], batch_idx: int + ) -> Tensor: + raise NotImplementedError + + +def _make_dataset(n_events: int) -> List[Data]: + """One Data per event with event_id, scalar value, pulse-level array.""" + out = [] + for i in range(n_events): + n_pulses = 2 + (i % 3) + out.append( + Data( + x=torch.zeros(n_pulses, 1), + event_id=torch.tensor([i], dtype=torch.long), + value=torch.tensor([float(i) * 10.0]), + # Per-event 3-vector — mimics `direction` (shape (1, 3)) so + # batches concatenate to (N, 3). + vec=torch.tensor( + [[float(i), float(i) + 0.5, float(i) + 0.25]] + ), + pulses=torch.arange(n_pulses, dtype=torch.float32), + n_pulses=torch.tensor([n_pulses]), + ) + ) + return out + + +def _loader( + dataset: List[Data], batch_size: int = 4, shuffle: bool = False +) -> DataLoader: + return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle) + + +@pytest.fixture +def model() -> _FakeModel: + return _FakeModel(prediction_labels=["pred_a", "pred_b"]) + + +def test_predict_as_dataframe_shape_and_columns(model: _FakeModel) -> None: + """No attributes: columns and length match prediction labels/dataset.""" + ds = _make_dataset(10) + df = model.predict_as_dataframe(_loader(ds, batch_size=3)) + assert list(df.columns) == ["pred_a", "pred_b"] + assert len(df) == len(ds) + # _FakeModel echoes event_id, so both columns should equal 0..9. + np.testing.assert_array_equal( + df["pred_a"].to_numpy(), np.arange(10, dtype=np.float32) + ) + np.testing.assert_array_equal( + df["pred_b"].to_numpy(), np.arange(10, dtype=np.float32) + ) + + +def test_predict_as_dataframe_attributes_aligned(model: _FakeModel) -> None: + """Additional attributes line up with predictions row-for-row.""" + ds = _make_dataset(8) + df = model.predict_as_dataframe( + _loader(ds, batch_size=3), + additional_attributes=["event_id", "value"], + ) + assert list(df.columns) == ["pred_a", "pred_b", "event_id", "value"] + np.testing.assert_array_equal(df["event_id"].to_numpy(), np.arange(8)) + np.testing.assert_array_equal( + df["value"].to_numpy(), np.arange(8, dtype=np.float32) * 10.0 + ) + # Prediction (= event_id) must equal the event_id attribute. + np.testing.assert_array_equal( + df["pred_a"].to_numpy().astype(np.int64), + df["event_id"].to_numpy(), + ) + + +class _CountingLoader(DataLoader): + """`DataLoader` that records how many times `__iter__` was called. + + Python's ``iter(obj)`` dispatches via ``type(obj).__iter__``, so the + counter has to live on the class — patching the instance has no effect. + """ + + def __iter__(self) -> Iterator: + type(self).iter_count = getattr(type(self), "iter_count", 0) + 1 + return super().__iter__() + + +def test_predict_as_dataframe_iterates_dataloader_once( + model: _FakeModel, +) -> None: + """Regression: with attributes, the dataloader is iterated exactly once. + + The previous implementation iterated a second time after `predict()` + purely to collect `additional_attributes`. + """ + ds = _make_dataset(6) + loader = _CountingLoader(ds, batch_size=2) + _CountingLoader.iter_count = 0 + + model.predict_as_dataframe( + loader, additional_attributes=["event_id", "value"] + ) + assert _CountingLoader.iter_count == 1 + + +def test_predict_as_dataframe_with_shuffled_loader( + model: _FakeModel, +) -> None: + """Shuffling is allowed now that attributes come from the same pass.""" + torch.manual_seed(0) + ds = _make_dataset(12) + loader = _loader(ds, batch_size=4, shuffle=True) + df = model.predict_as_dataframe( + loader, additional_attributes=["event_id", "value"] + ) + assert len(df) == 12 + # event_id may appear in any order, but pred and attr stay paired. + np.testing.assert_array_equal( + df["pred_a"].to_numpy().astype(np.int64), + df["event_id"].to_numpy(), + ) + np.testing.assert_array_equal( + df["value"].to_numpy(), + df["event_id"].to_numpy().astype(np.float32) * 10.0, + ) + # All events present exactly once. + assert sorted(df["event_id"].to_numpy().tolist()) == list(range(12)) + + +def test_predict_as_dataframe_respects_limit_predict_batches( + model: _FakeModel, +) -> None: + """`limit_predict_batches` truncates predictions and attributes + together.""" + ds = _make_dataset(20) + df = model.predict_as_dataframe( + _loader(ds, batch_size=4), + additional_attributes=["event_id"], + limit_predict_batches=2, + ) + # 2 batches * 4 events = 8 rows; predictions and attrs must still align. + assert len(df) == 8 + np.testing.assert_array_equal( + df["pred_a"].to_numpy().astype(np.int64), + df["event_id"].to_numpy(), + ) + + +def test_predict_as_dataframe_skips_misaligned_attribute( + model: _FakeModel, +) -> None: + """Pulse-level attr requested on event-level model is dropped, not + crashed.""" + ds = _make_dataset(6) + df = model.predict_as_dataframe( + _loader(ds, batch_size=2), + additional_attributes=["event_id", "pulses"], + ) + assert "event_id" in df.columns + assert "pulses" not in df.columns + assert len(df) == 6 + + +def test_predict_as_dataframe_expands_multidim_attribute( + model: _FakeModel, +) -> None: + """Per-event vector attribute is split into one column per component. + + Mirrors how `target_labels = ["direction"]` carries an (N, 3) tensor + on the batch — pandas can't take a 2D array as a single column, so + `predict_as_dataframe` flattens to `_`. + """ + n = 7 + ds = _make_dataset(n) + df = model.predict_as_dataframe( + _loader(ds, batch_size=3), + additional_attributes=["vec", "event_id"], + ) + assert list(df.columns) == [ + "pred_a", + "pred_b", + "vec_0", + "vec_1", + "vec_2", + "event_id", + ] + expected = np.arange(n, dtype=np.float32) + np.testing.assert_array_equal(df["vec_0"].to_numpy(), expected) + np.testing.assert_array_equal(df["vec_1"].to_numpy(), expected + 0.5) + np.testing.assert_array_equal(df["vec_2"].to_numpy(), expected + 0.25)