diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 61ee6f2ad..f75ff8ae9 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,7 +10,7 @@ ## New Features - +* A new `tick_delay` option was added to `ResamplerConfig` and `ResamplerConfig2` to delay resampling execution after each timer tick. The delay was designed to postpone processing while keeping window boundaries aligned to the original tick times, which can be used for cascaded resampling pipelines. This option is experimental and may be changed or deprecated in a future release. ## Bug Fixes diff --git a/src/frequenz/sdk/timeseries/_resampling/_config.py b/src/frequenz/sdk/timeseries/_resampling/_config.py index 55de6854b..b420a55ee 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_config.py +++ b/src/frequenz/sdk/timeseries/_resampling/_config.py @@ -201,6 +201,25 @@ class ResamplerConfig: time the resampler is created. """ + tick_delay: timedelta = field(default=timedelta(0), kw_only=True) + """Delay before processing each resampling tick. + + This delays when resampling computation happens, while keeping the + resampling windows aligned to the original timer tick boundaries. This + delay is only a time-based buffer, not a strict synchronization mechanism. + + Warning: + This is an experimental option and may be changed or deprecated in + the future. + + Example: + This can be used in cascaded resampling setups to reduce timing races + where downstream windows are processed before upstream resampled values + are emitted. + + It must be non-negative and smaller than `resampling_period`. + """ + def __post_init__(self) -> None: """Check that config values are valid. @@ -245,6 +264,13 @@ def __post_init__(self) -> None: raise ValueError( f"align_to ({self.align_to}) should be a timezone aware datetime" ) + if self.tick_delay < timedelta(0): + raise ValueError(f"tick_delay ({self.tick_delay}) should be non-negative") + if self.tick_delay >= self.resampling_period: + raise ValueError( + f"tick_delay ({self.tick_delay}) should be smaller than " + f"resampling_period ({self.resampling_period})" + ) class ResamplingFunction2(Protocol): @@ -415,3 +441,10 @@ def __post_init__(self) -> None: raise ValueError( f"align_to ({self.align_to}) must be specified via timer_config" ) + if self.tick_delay < timedelta(0): + raise ValueError(f"tick_delay ({self.tick_delay}) should be non-negative") + if self.tick_delay >= self.resampling_period: + raise ValueError( + f"tick_delay ({self.tick_delay}) should be smaller than " + f"resampling_period ({self.resampling_period})" + ) diff --git a/src/frequenz/sdk/timeseries/_resampling/_resampler.py b/src/frequenz/sdk/timeseries/_resampling/_resampler.py index db2631c2e..7ac3b72a4 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_resampler.py +++ b/src/frequenz/sdk/timeseries/_resampling/_resampler.py @@ -200,6 +200,11 @@ async def resample(self, *, one_shot: bool = False) -> None: case unexpected: assert_never(unexpected) + # Delay processing to let upstream cascaded resamplers emit their + # boundary samples first; window boundaries still use next_tick_time. + if self._config.tick_delay: + await asyncio.sleep(self._config.tick_delay.total_seconds()) + # We need to make a copy here because we need to match the results to the # current resamplers, and since we await here, new resamplers could be added # or removed from the dict while we awaiting the resampling, which would diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index 627cefe0b..6d94a68bc 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -7,12 +7,13 @@ import logging from collections.abc import AsyncIterator from datetime import datetime, timedelta, timezone +from typing import Any, cast from unittest.mock import AsyncMock, MagicMock import async_solipsism import pytest import time_machine -from frequenz.channels import Broadcast, SenderError +from frequenz.channels import Broadcast, Receiver, Sender, SenderError from frequenz.quantities import Quantity from frequenz.sdk.timeseries import ( @@ -25,6 +26,7 @@ Sink, Source, SourceProperties, + TickInfo, WindowSide, ) from frequenz.sdk.timeseries._resampling._exceptions import ( @@ -37,6 +39,7 @@ # We relax some pylint checks as for tests they don't make a lot of sense for this test. # pylint: disable=too-many-lines,disable=too-many-locals +# pylint: disable=too-many-arguments,disable=too-many-positional-arguments @pytest.fixture(autouse=True) @@ -158,6 +161,31 @@ async def test_resampler_config_len_error( ) +@pytest.mark.parametrize("config_class", [ResamplerConfig, ResamplerConfig2]) +async def test_resampler_config_tick_delay_negative_error( + config_class: type[ResamplerConfig], +) -> None: + """Test that negative tick_delay values are rejected.""" + with pytest.raises(ValueError, match="tick_delay"): + _ = config_class( + resampling_period=timedelta(seconds=1.0), + tick_delay=timedelta(milliseconds=-1), + ) + + +@pytest.mark.parametrize("config_class", [ResamplerConfig, ResamplerConfig2]) +@pytest.mark.parametrize("tick_delay", [timedelta(seconds=1.0), timedelta(seconds=1.1)]) +async def test_resampler_config_tick_delay_too_big_error( + config_class: type[ResamplerConfig], tick_delay: timedelta +) -> None: + """Test that tick_delay must be smaller than resampling_period.""" + with pytest.raises(ValueError, match="smaller than resampling_period"): + _ = config_class( + resampling_period=timedelta(seconds=1.0), + tick_delay=tick_delay, + ) + + @pytest.mark.parametrize("config_class", [ResamplerConfig, ResamplerConfig2]) async def test_helper_buffer_too_big( config_class: type[ResamplerConfig], @@ -190,6 +218,321 @@ async def test_helper_buffer_too_big( assert helper._buffer.maxlen == DEFAULT_BUFFER_LEN_MAX +# Tick-delay test fixtures: time constants +@pytest.fixture +def window_end() -> datetime: + """Define the logical end of the resampling window.""" + return datetime(2020, 1, 1, 0, 0, 10, tzinfo=timezone.utc) + + +@pytest.fixture +def resampling_period() -> timedelta: + """Define the resampling period used by the test resampler.""" + return timedelta(seconds=10) + + +@pytest.fixture +def tick_delay() -> timedelta: + """Define the delay between timer tick and resampling processing.""" + return timedelta(milliseconds=200) + + +@pytest.fixture +def after_tick_delay(tick_delay: timedelta) -> timedelta: + """Define a deterministic delay that happens after tick_delay has elapsed.""" + return tick_delay + timedelta(milliseconds=10) + + +# Tick-delay test fixtures: channels and mocks +@pytest.fixture +def source_receiver( + source_chan: Broadcast[Sample[Quantity]], +) -> Receiver[Sample[Quantity]]: + """Define a receiver for samples sent to the test source channel.""" + return source_chan.new_receiver() + + +@pytest.fixture +def source_sender( + source_chan: Broadcast[Sample[Quantity]], +) -> Sender[Sample[Quantity]]: + """Define a sender for the test source channel.""" + return source_chan.new_sender() + + +@pytest.fixture +def sink_mock() -> AsyncMock: + """Define a sink mock used to collect resampled output samples.""" + return AsyncMock(spec=Sink, return_value=True) + + +@pytest.fixture +def resampling_fun_mock() -> MagicMock: + """Define a resampling function mock returning a fixed value.""" + return MagicMock(spec=ResamplingFunction, return_value=42.0) + + +# Tick-delay test fixtures: resampler setup +@pytest.fixture +async def tick_delay_resampler( + window_end: datetime, + source_receiver: Receiver[Sample[Quantity]], + sink_mock: AsyncMock, + resampling_fun_mock: MagicMock, + resampling_period: timedelta, + tick_delay: timedelta, +) -> AsyncIterator[Resampler]: + """Create a resampler configured with tick_delay and one deterministic tick.""" + + async def timer() -> AsyncIterator[TickInfo]: + yield TickInfo(expected_tick_time=window_end, sleep_infos=[]) + + config = ResamplerConfig2( + resampling_period=resampling_period, + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + closed=WindowSide.LEFT, + tick_delay=tick_delay, + ) + resampler = Resampler(config) + + # Use a deterministic timer tick so tests can control sample arrival time + # independently from the logical window boundary. + # pylint: disable=protected-access + resampler._timer = cast(Any, timer()) + + resampler.add_timeseries("test", source_receiver, sink_mock) + + try: + yield resampler + finally: + await resampler.stop() + + +# Tick-delay test fixtures: sample timestamps +@pytest.fixture +def sample_at_window_start( + window_end: datetime, + resampling_period: timedelta, +) -> Sample[Quantity]: + """Define a sample exactly at the included left boundary of the window.""" + return Sample(window_end - resampling_period, value=Quantity(1.0)) + + +@pytest.fixture +def sample_inside_window(window_end: datetime) -> Sample[Quantity]: + """Define a sample inside the selected resampling window.""" + return Sample(window_end - timedelta(milliseconds=100), value=Quantity(2.0)) + + +@pytest.fixture +def sample_at_window_end(window_end: datetime) -> Sample[Quantity]: + """Define a sample exactly at the excluded right boundary of the window.""" + return Sample(window_end, value=Quantity(3.0)) + + +@pytest.fixture +def sample_inside_tick_delay( + window_end: datetime, + tick_delay: timedelta, +) -> Sample[Quantity]: + """Define a sample timestamped after the window end but before tick_delay ends.""" + return Sample(window_end + (tick_delay / 2), value=Quantity(4.0)) + + +@pytest.fixture +def sample_at_tick_delay_end( + window_end: datetime, + tick_delay: timedelta, +) -> Sample[Quantity]: + """Define a sample timestamped exactly at window_end + tick_delay.""" + return Sample(window_end + tick_delay, value=Quantity(5.0)) + + +@pytest.fixture +def sample_after_tick_delay_end( + window_end: datetime, + after_tick_delay: timedelta, +) -> Sample[Quantity]: + """Define a sample timestamped after window_end + tick_delay.""" + return Sample(window_end + after_tick_delay, value=Quantity(6.0)) + + +async def test_tick_delay_prebuffered_samples_follow_timestamp_boundaries( + tick_delay_resampler: Resampler, + source_receiver: Receiver[Sample[Quantity]], + source_sender: Sender[Sample[Quantity]], + sink_mock: AsyncMock, + resampling_fun_mock: MagicMock, + window_end: datetime, + sample_at_window_start: Sample[Quantity], + sample_inside_window: Sample[Quantity], + sample_at_window_end: Sample[Quantity], + sample_inside_tick_delay: Sample[Quantity], + sample_at_tick_delay_end: Sample[Quantity], +) -> None: + """Prebuffered samples are selected strictly by timestamp window boundaries. + + All samples are received before tick handling starts, so arrival time is + intentionally not a factor in this scenario. The test verifies that only + timestamps inside the configured [start, end) window are used, while + boundary and post-window timestamps are excluded. + """ + await source_sender.send(sample_at_window_start) + await source_sender.send(sample_inside_window) + await source_sender.send(sample_at_window_end) + await source_sender.send(sample_inside_tick_delay) + await source_sender.send(sample_at_tick_delay_end) + + # Let the resampler's background receiving task buffer the samples before + # the timer tick is processed. + await asyncio.sleep(0) + + await tick_delay_resampler.resample(one_shot=True) + + resampling_fun_mock.assert_called_once_with( + a_sequence( + as_float_tuple(sample_at_window_start), + as_float_tuple(sample_inside_window), + ), + tick_delay_resampler.config, + tick_delay_resampler.get_source_properties(source_receiver), + ) + sink_mock.assert_called_once_with(Sample(window_end, Quantity(42.0))) + + +async def test_without_tick_delay_late_window_samples_are_missed( + window_end: datetime, + source_receiver: Receiver[Sample[Quantity]], + source_sender: Sender[Sample[Quantity]], + sink_mock: AsyncMock, + resampling_fun_mock: MagicMock, + resampling_period: timedelta, + sample_at_window_start: Sample[Quantity], + sample_inside_window: Sample[Quantity], +) -> None: + """Regression test: without `tick_delay`, late in-window samples are missed. + + The samples have timestamps inside the selected window, but they arrive + after the timer tick. With zero delay, processing happens immediately, so + they are not buffered in time for this resampling run. + """ + + async def timer() -> AsyncIterator[TickInfo]: + yield TickInfo(expected_tick_time=window_end, sleep_infos=[]) + + config = ResamplerConfig2( + resampling_period=resampling_period, + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + closed=WindowSide.LEFT, + tick_delay=timedelta(0), + ) + resampler = Resampler(config) + # pylint: disable=protected-access + resampler._timer = cast(Any, timer()) + resampler.add_timeseries("test", source_receiver, sink_mock) + + async def send_samples_after_tick() -> None: + await asyncio.sleep(0.05) + + await source_sender.send(sample_at_window_start) + await source_sender.send(sample_inside_window) + + await asyncio.sleep(0) + + try: + async with asyncio.TaskGroup() as task_group: + task_group.create_task(send_samples_after_tick()) + await resampler.resample(one_shot=True) + finally: + await resampler.stop() + + resampling_fun_mock.assert_not_called() + sink_mock.assert_called_once_with(Sample(window_end, None)) + + +async def test_tick_delay_includes_only_window_samples_arriving_during_delay( + tick_delay_resampler: Resampler, + source_receiver: Receiver[Sample[Quantity]], + source_sender: Sender[Sample[Quantity]], + sink_mock: AsyncMock, + resampling_fun_mock: MagicMock, + window_end: datetime, + tick_delay: timedelta, + sample_at_window_start: Sample[Quantity], + sample_inside_window: Sample[Quantity], + sample_at_window_end: Sample[Quantity], + sample_inside_tick_delay: Sample[Quantity], + sample_at_tick_delay_end: Sample[Quantity], + sample_after_tick_delay_end: Sample[Quantity], +) -> None: + """Late arrivals are included only when their timestamps are in-window. + + Samples arrive after the timer tick but before delayed processing happens. + The in-window samples should be included; timestamps at or after the window + end should still be excluded. + """ + + async def send_samples_during_tick_delay() -> None: + await asyncio.sleep((tick_delay / 2).total_seconds()) + + await source_sender.send(sample_at_window_start) + await source_sender.send(sample_inside_window) + await source_sender.send(sample_at_window_end) + await source_sender.send(sample_inside_tick_delay) + await source_sender.send(sample_at_tick_delay_end) + await source_sender.send(sample_after_tick_delay_end) + + await asyncio.sleep(0) + + async with asyncio.TaskGroup() as task_group: + task_group.create_task(send_samples_during_tick_delay()) + await tick_delay_resampler.resample(one_shot=True) + + resampling_fun_mock.assert_called_once_with( + a_sequence( + as_float_tuple(sample_at_window_start), + as_float_tuple(sample_inside_window), + ), + tick_delay_resampler.config, + tick_delay_resampler.get_source_properties(source_receiver), + ) + sink_mock.assert_called_once_with(Sample(window_end, Quantity(42.0))) + + +async def test_tick_delay_excludes_window_samples_arriving_after_delay( + tick_delay_resampler: Resampler, + source_sender: Sender[Sample[Quantity]], + sink_mock: AsyncMock, + resampling_fun_mock: MagicMock, + window_end: datetime, + after_tick_delay: timedelta, + sample_at_window_start: Sample[Quantity], + sample_inside_window: Sample[Quantity], +) -> None: + """In-window samples arriving after tick_delay are not considered. + + The sample timestamps belong to the selected window, but their arrival time + is after delayed processing has already started. + """ + + async def send_samples_after_tick_delay() -> None: + await asyncio.sleep(after_tick_delay.total_seconds()) + + await source_sender.send(sample_at_window_start) + await source_sender.send(sample_inside_window) + + await asyncio.sleep(0) + + async with asyncio.TaskGroup() as task_group: + task_group.create_task(send_samples_after_tick_delay()) + await tick_delay_resampler.resample(one_shot=True) + + resampling_fun_mock.assert_not_called() + sink_mock.assert_called_once_with(Sample(window_end, None)) + + @pytest.mark.parametrize( "resampling_period_s,now,align_to,result", (