From 72aa45f5a6192df8b0e68d9fa63e6fa257a4aa40 Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Thu, 23 Apr 2026 12:34:03 +0200 Subject: [PATCH 1/3] feat(resampling): add configurable tick delay before processing - Add `tick_delay` to resampler configs with validation in both config variants. - Delay processing after each timer tick while preserving the original window boundary timestamp. - Restrict tick_delay to be non-negative and strictly smaller than resampling_period. This mitigates boundary races in cascaded resampling by delaying downstream processing. Signed-off-by: Malte Schaaf --- .../sdk/timeseries/_resampling/_config.py | 33 +++++++++++++++++++ .../sdk/timeseries/_resampling/_resampler.py | 5 +++ 2 files changed, 38 insertions(+) diff --git a/src/frequenz/sdk/timeseries/_resampling/_config.py b/src/frequenz/sdk/timeseries/_resampling/_config.py index 55de6854b..b420a55ee 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_config.py +++ b/src/frequenz/sdk/timeseries/_resampling/_config.py @@ -201,6 +201,25 @@ class ResamplerConfig: time the resampler is created. """ + tick_delay: timedelta = field(default=timedelta(0), kw_only=True) + """Delay before processing each resampling tick. + + This delays when resampling computation happens, while keeping the + resampling windows aligned to the original timer tick boundaries. This + delay is only a time-based buffer, not a strict synchronization mechanism. + + Warning: + This is an experimental option and may be changed or deprecated in + the future. + + Example: + This can be used in cascaded resampling setups to reduce timing races + where downstream windows are processed before upstream resampled values + are emitted. + + It must be non-negative and smaller than `resampling_period`. + """ + def __post_init__(self) -> None: """Check that config values are valid. @@ -245,6 +264,13 @@ def __post_init__(self) -> None: raise ValueError( f"align_to ({self.align_to}) should be a timezone aware datetime" ) + if self.tick_delay < timedelta(0): + raise ValueError(f"tick_delay ({self.tick_delay}) should be non-negative") + if self.tick_delay >= self.resampling_period: + raise ValueError( + f"tick_delay ({self.tick_delay}) should be smaller than " + f"resampling_period ({self.resampling_period})" + ) class ResamplingFunction2(Protocol): @@ -415,3 +441,10 @@ def __post_init__(self) -> None: raise ValueError( f"align_to ({self.align_to}) must be specified via timer_config" ) + if self.tick_delay < timedelta(0): + raise ValueError(f"tick_delay ({self.tick_delay}) should be non-negative") + if self.tick_delay >= self.resampling_period: + raise ValueError( + f"tick_delay ({self.tick_delay}) should be smaller than " + f"resampling_period ({self.resampling_period})" + ) diff --git a/src/frequenz/sdk/timeseries/_resampling/_resampler.py b/src/frequenz/sdk/timeseries/_resampling/_resampler.py index db2631c2e..7ac3b72a4 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_resampler.py +++ b/src/frequenz/sdk/timeseries/_resampling/_resampler.py @@ -200,6 +200,11 @@ async def resample(self, *, one_shot: bool = False) -> None: case unexpected: assert_never(unexpected) + # Delay processing to let upstream cascaded resamplers emit their + # boundary samples first; window boundaries still use next_tick_time. + if self._config.tick_delay: + await asyncio.sleep(self._config.tick_delay.total_seconds()) + # We need to make a copy here because we need to match the results to the # current resamplers, and since we await here, new resamplers could be added # or removed from the dict while we awaiting the resampling, which would From 5aceb13d50c82cebb8f725a63a175c41c8c6cf60 Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Thu, 23 Apr 2026 12:34:34 +0200 Subject: [PATCH 2/3] test(resampling): extend tick_delay coverage for timing semantics - Add validation tests for negative and oversized `tick_delay` values in both config variants. - Add fixture-based boundary tests that separate sample arrival time from sample timestamp filtering. - Add a with/without-delay late-arrival comparison to show that delayed processing can include in-window samples without shifting window boundaries. Signed-off-by: Malte Schaaf --- tests/timeseries/test_resampling.py | 345 +++++++++++++++++++++++++++- 1 file changed, 344 insertions(+), 1 deletion(-) diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index 627cefe0b..6d94a68bc 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -7,12 +7,13 @@ import logging from collections.abc import AsyncIterator from datetime import datetime, timedelta, timezone +from typing import Any, cast from unittest.mock import AsyncMock, MagicMock import async_solipsism import pytest import time_machine -from frequenz.channels import Broadcast, SenderError +from frequenz.channels import Broadcast, Receiver, Sender, SenderError from frequenz.quantities import Quantity from frequenz.sdk.timeseries import ( @@ -25,6 +26,7 @@ Sink, Source, SourceProperties, + TickInfo, WindowSide, ) from frequenz.sdk.timeseries._resampling._exceptions import ( @@ -37,6 +39,7 @@ # We relax some pylint checks as for tests they don't make a lot of sense for this test. # pylint: disable=too-many-lines,disable=too-many-locals +# pylint: disable=too-many-arguments,disable=too-many-positional-arguments @pytest.fixture(autouse=True) @@ -158,6 +161,31 @@ async def test_resampler_config_len_error( ) +@pytest.mark.parametrize("config_class", [ResamplerConfig, ResamplerConfig2]) +async def test_resampler_config_tick_delay_negative_error( + config_class: type[ResamplerConfig], +) -> None: + """Test that negative tick_delay values are rejected.""" + with pytest.raises(ValueError, match="tick_delay"): + _ = config_class( + resampling_period=timedelta(seconds=1.0), + tick_delay=timedelta(milliseconds=-1), + ) + + +@pytest.mark.parametrize("config_class", [ResamplerConfig, ResamplerConfig2]) +@pytest.mark.parametrize("tick_delay", [timedelta(seconds=1.0), timedelta(seconds=1.1)]) +async def test_resampler_config_tick_delay_too_big_error( + config_class: type[ResamplerConfig], tick_delay: timedelta +) -> None: + """Test that tick_delay must be smaller than resampling_period.""" + with pytest.raises(ValueError, match="smaller than resampling_period"): + _ = config_class( + resampling_period=timedelta(seconds=1.0), + tick_delay=tick_delay, + ) + + @pytest.mark.parametrize("config_class", [ResamplerConfig, ResamplerConfig2]) async def test_helper_buffer_too_big( config_class: type[ResamplerConfig], @@ -190,6 +218,321 @@ async def test_helper_buffer_too_big( assert helper._buffer.maxlen == DEFAULT_BUFFER_LEN_MAX +# Tick-delay test fixtures: time constants +@pytest.fixture +def window_end() -> datetime: + """Define the logical end of the resampling window.""" + return datetime(2020, 1, 1, 0, 0, 10, tzinfo=timezone.utc) + + +@pytest.fixture +def resampling_period() -> timedelta: + """Define the resampling period used by the test resampler.""" + return timedelta(seconds=10) + + +@pytest.fixture +def tick_delay() -> timedelta: + """Define the delay between timer tick and resampling processing.""" + return timedelta(milliseconds=200) + + +@pytest.fixture +def after_tick_delay(tick_delay: timedelta) -> timedelta: + """Define a deterministic delay that happens after tick_delay has elapsed.""" + return tick_delay + timedelta(milliseconds=10) + + +# Tick-delay test fixtures: channels and mocks +@pytest.fixture +def source_receiver( + source_chan: Broadcast[Sample[Quantity]], +) -> Receiver[Sample[Quantity]]: + """Define a receiver for samples sent to the test source channel.""" + return source_chan.new_receiver() + + +@pytest.fixture +def source_sender( + source_chan: Broadcast[Sample[Quantity]], +) -> Sender[Sample[Quantity]]: + """Define a sender for the test source channel.""" + return source_chan.new_sender() + + +@pytest.fixture +def sink_mock() -> AsyncMock: + """Define a sink mock used to collect resampled output samples.""" + return AsyncMock(spec=Sink, return_value=True) + + +@pytest.fixture +def resampling_fun_mock() -> MagicMock: + """Define a resampling function mock returning a fixed value.""" + return MagicMock(spec=ResamplingFunction, return_value=42.0) + + +# Tick-delay test fixtures: resampler setup +@pytest.fixture +async def tick_delay_resampler( + window_end: datetime, + source_receiver: Receiver[Sample[Quantity]], + sink_mock: AsyncMock, + resampling_fun_mock: MagicMock, + resampling_period: timedelta, + tick_delay: timedelta, +) -> AsyncIterator[Resampler]: + """Create a resampler configured with tick_delay and one deterministic tick.""" + + async def timer() -> AsyncIterator[TickInfo]: + yield TickInfo(expected_tick_time=window_end, sleep_infos=[]) + + config = ResamplerConfig2( + resampling_period=resampling_period, + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + closed=WindowSide.LEFT, + tick_delay=tick_delay, + ) + resampler = Resampler(config) + + # Use a deterministic timer tick so tests can control sample arrival time + # independently from the logical window boundary. + # pylint: disable=protected-access + resampler._timer = cast(Any, timer()) + + resampler.add_timeseries("test", source_receiver, sink_mock) + + try: + yield resampler + finally: + await resampler.stop() + + +# Tick-delay test fixtures: sample timestamps +@pytest.fixture +def sample_at_window_start( + window_end: datetime, + resampling_period: timedelta, +) -> Sample[Quantity]: + """Define a sample exactly at the included left boundary of the window.""" + return Sample(window_end - resampling_period, value=Quantity(1.0)) + + +@pytest.fixture +def sample_inside_window(window_end: datetime) -> Sample[Quantity]: + """Define a sample inside the selected resampling window.""" + return Sample(window_end - timedelta(milliseconds=100), value=Quantity(2.0)) + + +@pytest.fixture +def sample_at_window_end(window_end: datetime) -> Sample[Quantity]: + """Define a sample exactly at the excluded right boundary of the window.""" + return Sample(window_end, value=Quantity(3.0)) + + +@pytest.fixture +def sample_inside_tick_delay( + window_end: datetime, + tick_delay: timedelta, +) -> Sample[Quantity]: + """Define a sample timestamped after the window end but before tick_delay ends.""" + return Sample(window_end + (tick_delay / 2), value=Quantity(4.0)) + + +@pytest.fixture +def sample_at_tick_delay_end( + window_end: datetime, + tick_delay: timedelta, +) -> Sample[Quantity]: + """Define a sample timestamped exactly at window_end + tick_delay.""" + return Sample(window_end + tick_delay, value=Quantity(5.0)) + + +@pytest.fixture +def sample_after_tick_delay_end( + window_end: datetime, + after_tick_delay: timedelta, +) -> Sample[Quantity]: + """Define a sample timestamped after window_end + tick_delay.""" + return Sample(window_end + after_tick_delay, value=Quantity(6.0)) + + +async def test_tick_delay_prebuffered_samples_follow_timestamp_boundaries( + tick_delay_resampler: Resampler, + source_receiver: Receiver[Sample[Quantity]], + source_sender: Sender[Sample[Quantity]], + sink_mock: AsyncMock, + resampling_fun_mock: MagicMock, + window_end: datetime, + sample_at_window_start: Sample[Quantity], + sample_inside_window: Sample[Quantity], + sample_at_window_end: Sample[Quantity], + sample_inside_tick_delay: Sample[Quantity], + sample_at_tick_delay_end: Sample[Quantity], +) -> None: + """Prebuffered samples are selected strictly by timestamp window boundaries. + + All samples are received before tick handling starts, so arrival time is + intentionally not a factor in this scenario. The test verifies that only + timestamps inside the configured [start, end) window are used, while + boundary and post-window timestamps are excluded. + """ + await source_sender.send(sample_at_window_start) + await source_sender.send(sample_inside_window) + await source_sender.send(sample_at_window_end) + await source_sender.send(sample_inside_tick_delay) + await source_sender.send(sample_at_tick_delay_end) + + # Let the resampler's background receiving task buffer the samples before + # the timer tick is processed. + await asyncio.sleep(0) + + await tick_delay_resampler.resample(one_shot=True) + + resampling_fun_mock.assert_called_once_with( + a_sequence( + as_float_tuple(sample_at_window_start), + as_float_tuple(sample_inside_window), + ), + tick_delay_resampler.config, + tick_delay_resampler.get_source_properties(source_receiver), + ) + sink_mock.assert_called_once_with(Sample(window_end, Quantity(42.0))) + + +async def test_without_tick_delay_late_window_samples_are_missed( + window_end: datetime, + source_receiver: Receiver[Sample[Quantity]], + source_sender: Sender[Sample[Quantity]], + sink_mock: AsyncMock, + resampling_fun_mock: MagicMock, + resampling_period: timedelta, + sample_at_window_start: Sample[Quantity], + sample_inside_window: Sample[Quantity], +) -> None: + """Regression test: without `tick_delay`, late in-window samples are missed. + + The samples have timestamps inside the selected window, but they arrive + after the timer tick. With zero delay, processing happens immediately, so + they are not buffered in time for this resampling run. + """ + + async def timer() -> AsyncIterator[TickInfo]: + yield TickInfo(expected_tick_time=window_end, sleep_infos=[]) + + config = ResamplerConfig2( + resampling_period=resampling_period, + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + closed=WindowSide.LEFT, + tick_delay=timedelta(0), + ) + resampler = Resampler(config) + # pylint: disable=protected-access + resampler._timer = cast(Any, timer()) + resampler.add_timeseries("test", source_receiver, sink_mock) + + async def send_samples_after_tick() -> None: + await asyncio.sleep(0.05) + + await source_sender.send(sample_at_window_start) + await source_sender.send(sample_inside_window) + + await asyncio.sleep(0) + + try: + async with asyncio.TaskGroup() as task_group: + task_group.create_task(send_samples_after_tick()) + await resampler.resample(one_shot=True) + finally: + await resampler.stop() + + resampling_fun_mock.assert_not_called() + sink_mock.assert_called_once_with(Sample(window_end, None)) + + +async def test_tick_delay_includes_only_window_samples_arriving_during_delay( + tick_delay_resampler: Resampler, + source_receiver: Receiver[Sample[Quantity]], + source_sender: Sender[Sample[Quantity]], + sink_mock: AsyncMock, + resampling_fun_mock: MagicMock, + window_end: datetime, + tick_delay: timedelta, + sample_at_window_start: Sample[Quantity], + sample_inside_window: Sample[Quantity], + sample_at_window_end: Sample[Quantity], + sample_inside_tick_delay: Sample[Quantity], + sample_at_tick_delay_end: Sample[Quantity], + sample_after_tick_delay_end: Sample[Quantity], +) -> None: + """Late arrivals are included only when their timestamps are in-window. + + Samples arrive after the timer tick but before delayed processing happens. + The in-window samples should be included; timestamps at or after the window + end should still be excluded. + """ + + async def send_samples_during_tick_delay() -> None: + await asyncio.sleep((tick_delay / 2).total_seconds()) + + await source_sender.send(sample_at_window_start) + await source_sender.send(sample_inside_window) + await source_sender.send(sample_at_window_end) + await source_sender.send(sample_inside_tick_delay) + await source_sender.send(sample_at_tick_delay_end) + await source_sender.send(sample_after_tick_delay_end) + + await asyncio.sleep(0) + + async with asyncio.TaskGroup() as task_group: + task_group.create_task(send_samples_during_tick_delay()) + await tick_delay_resampler.resample(one_shot=True) + + resampling_fun_mock.assert_called_once_with( + a_sequence( + as_float_tuple(sample_at_window_start), + as_float_tuple(sample_inside_window), + ), + tick_delay_resampler.config, + tick_delay_resampler.get_source_properties(source_receiver), + ) + sink_mock.assert_called_once_with(Sample(window_end, Quantity(42.0))) + + +async def test_tick_delay_excludes_window_samples_arriving_after_delay( + tick_delay_resampler: Resampler, + source_sender: Sender[Sample[Quantity]], + sink_mock: AsyncMock, + resampling_fun_mock: MagicMock, + window_end: datetime, + after_tick_delay: timedelta, + sample_at_window_start: Sample[Quantity], + sample_inside_window: Sample[Quantity], +) -> None: + """In-window samples arriving after tick_delay are not considered. + + The sample timestamps belong to the selected window, but their arrival time + is after delayed processing has already started. + """ + + async def send_samples_after_tick_delay() -> None: + await asyncio.sleep(after_tick_delay.total_seconds()) + + await source_sender.send(sample_at_window_start) + await source_sender.send(sample_inside_window) + + await asyncio.sleep(0) + + async with asyncio.TaskGroup() as task_group: + task_group.create_task(send_samples_after_tick_delay()) + await tick_delay_resampler.resample(one_shot=True) + + resampling_fun_mock.assert_not_called() + sink_mock.assert_called_once_with(Sample(window_end, None)) + + @pytest.mark.parametrize( "resampling_period_s,now,align_to,result", ( From 56c48683c8882dabf7332ddf5509d3afa109900e Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Thu, 23 Apr 2026 12:44:18 +0200 Subject: [PATCH 3/3] docs(release-notes): update release notes Signed-off-by: Malte Schaaf --- RELEASE_NOTES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 61ee6f2ad..f75ff8ae9 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,7 +10,7 @@ ## New Features - +* A new `tick_delay` option was added to `ResamplerConfig` and `ResamplerConfig2` to delay resampling execution after each timer tick. The delay was designed to postpone processing while keeping window boundaries aligned to the original tick times, which can be used for cascaded resampling pipelines. This option is experimental and may be changed or deprecated in a future release. ## Bug Fixes