Skip to content

Commit c62dda5

Browse files
authored
Feat: Timing improvements (#624)
* feat: timing improvements * with current time * allow to change time using duration * fixes * fixes * clock fixes * fixes * fixes * simplification
1 parent 2a8bc02 commit c62dda5

9 files changed

Lines changed: 360 additions & 34 deletions

File tree

packages/Dbal/tests/Integration/DbalBackedMessageChannelTest.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -253,11 +253,11 @@ public function test_delaying_the_message_with_native_clock()
253253
->build()
254254
);
255255

256-
$ecotoneLite->waitTill(TimeSpan::withSeconds(1));
256+
$ecotoneLite->advanceTimeTo(Duration::seconds(1));
257257

258258
$this->assertNull($messageChannel->receive());
259259

260-
$ecotoneLite->waitTill(TimeSpan::withSeconds(3));
260+
$ecotoneLite->advanceTimeTo(Duration::seconds(3));
261261

262262
$this->assertNotNull($messageChannel->receive());
263263
}
@@ -289,11 +289,11 @@ public function test_delaying_the_message_with_native_clock_using_date_time()
289289

290290
/** @var EcotoneClockInterface $clock */
291291
$clock = $ecotoneLite->getServiceFromContainer(EcotoneClockInterface::class);
292-
$ecotoneLite->waitTill($clock->now()->add(Duration::seconds(1)));
292+
$ecotoneLite->changeTimeTo($clock->now()->add(Duration::seconds(1)));
293293

294294
$this->assertNull($messageChannel->receive());
295295

296-
$ecotoneLite->waitTill($clock->now()->add(Duration::seconds(3)));
296+
$ecotoneLite->changeTimeTo($clock->now()->add(Duration::seconds(3)));
297297

298298
$this->assertNotNull($messageChannel->receive());
299299
}

packages/Ecotone/src/Lite/Test/ConfiguredMessagingSystemWithTestSupport.php

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace Ecotone\Lite\Test;
66

7+
use DateTimeImmutable;
78
use Ecotone\Messaging\Config\ConfiguredMessagingSystem;
89
use Ecotone\Messaging\Config\Container\GatewayProxyMethodReference;
910
use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata;
@@ -12,12 +13,16 @@
1213
use Ecotone\Messaging\MessageChannel;
1314
use Ecotone\Messaging\MessageHeaders;
1415
use Ecotone\Messaging\MessagePublisher;
16+
use Ecotone\Messaging\Scheduling\Duration;
1517
use Ecotone\Messaging\Scheduling\EcotoneClockInterface;
1618
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\AggregateDefinitionRegistry;
1719
use Ecotone\Modelling\CommandBus;
1820
use Ecotone\Modelling\DistributedBus;
1921
use Ecotone\Modelling\EventBus;
2022
use Ecotone\Modelling\QueryBus;
23+
use Ecotone\Test\StaticPsrClock;
24+
use InvalidArgumentException;
25+
use Psr\Clock\ClockInterface;
2126

2227
/**
2328
* licence Apache-2.0
@@ -129,4 +134,51 @@ public function replaceWith(ConfiguredMessagingSystem $messagingSystem): void
129134
{
130135
$this->configuredMessagingSystem->replaceWith($messagingSystem);
131136
}
137+
138+
public function changeTime(DateTimeImmutable|Duration $time): self
139+
{
140+
$psrClock = $this->getStaticPsrClockFromContainer();
141+
142+
if ($time instanceof Duration) {
143+
$psrClock->setCurrentTime(
144+
DateTimeImmutable::createFromInterface($psrClock->now())->modify("+{$time->inMicroseconds()} microseconds")
145+
);
146+
return $this;
147+
}
148+
149+
if ($psrClock->hasBeenChanged() && $time <= $psrClock->now()) {
150+
throw new InvalidArgumentException(
151+
sprintf(
152+
'Cannot move time backwards. Current clock time: %s, requested time: %s',
153+
$psrClock->now()->format('Y-m-d H:i:s.u'),
154+
$time->format('Y-m-d H:i:s.u')
155+
)
156+
);
157+
}
158+
159+
$psrClock->setCurrentTime($time);
160+
161+
return $this;
162+
}
163+
164+
private function getStaticPsrClockFromContainer(): StaticPsrClock
165+
{
166+
try {
167+
$psrClock = $this->configuredMessagingSystem->getServiceFromContainer(ClockInterface::class);
168+
} catch (\Throwable) {
169+
throw new InvalidArgumentException(
170+
'Changing time is only possible when using StaticPsrClock as the ClockInterface. ' .
171+
'Register ClockInterface::class => new StaticPsrClock() in your container services.'
172+
);
173+
}
174+
175+
if (! $psrClock instanceof StaticPsrClock) {
176+
throw new InvalidArgumentException(
177+
'Changing time is only possible when using StaticPsrClock as the ClockInterface. ' .
178+
'Register ClockInterface::class => new StaticPsrClock() in your container services.'
179+
);
180+
}
181+
182+
return $psrClock;
183+
}
132184
}

packages/Ecotone/src/Lite/Test/FlowTestSupport.php

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace Ecotone\Lite\Test;
66

7+
use DateTimeImmutable;
78
use DateTimeInterface;
89
use Ecotone\EventSourcing\EventStore;
910
use Ecotone\EventSourcing\ProjectionManager;
@@ -18,6 +19,7 @@
1819
use Ecotone\Messaging\MessagingException;
1920
use Ecotone\Messaging\PollableChannel;
2021
use Ecotone\Messaging\Scheduling\Clock;
22+
use Ecotone\Messaging\Scheduling\Duration;
2123
use Ecotone\Messaging\Scheduling\EcotoneClockInterface;
2224
use Ecotone\Messaging\Scheduling\TimeSpan;
2325
use Ecotone\Messaging\Support\Assert;
@@ -32,6 +34,8 @@
3234
use Ecotone\Modelling\EventBus;
3335
use Ecotone\Modelling\QueryBus;
3436
use Ecotone\Projecting\ProjectionRegistry;
37+
use Ecotone\Test\StaticPsrClock;
38+
use InvalidArgumentException;
3539

3640
/**
3741
* @template T
@@ -191,23 +195,65 @@ public function getEventStreamEvents(string $streamName): array
191195
return $this->getGateway(EventStore::class)->load($streamName);
192196
}
193197

194-
public function waitTill(TimeSpan|DateTimeInterface $time): self
198+
public function changeTimeTo(DateTimeImmutable $time): self
195199
{
196-
if ($time instanceof DateTimeInterface) {
197-
if ($time < $this->clock->now()) {
198-
throw new MessagingException("Time to wait is in the past. Now: {$this->clock->now()}, time to wait: {$time}");
199-
}
200+
$psrClock = $this->getStaticPsrClockFromContainer();
201+
202+
if ($psrClock->hasBeenChanged() && $time <= $psrClock->now()) {
203+
throw new InvalidArgumentException(
204+
\sprintf(
205+
'Cannot move time backwards. Current clock time: %s, requested time: %s',
206+
$psrClock->now()->format('Y-m-d H:i:s.u'),
207+
$time->format('Y-m-d H:i:s.u')
208+
)
209+
);
200210
}
201211

202-
$this->clock->sleep(
203-
$time instanceof TimeSpan
204-
? $time->toDuration()
205-
: TimeSpan::fromDateInterval($time->diff($this->clock->now()))->toDuration()
212+
$psrClock->setCurrentTime($time);
213+
214+
return $this;
215+
}
216+
217+
public function advanceTimeTo(Duration $duration): self
218+
{
219+
$psrClock = $this->getStaticPsrClockFromContainer();
220+
$psrClock->setCurrentTime(
221+
DateTimeImmutable::createFromInterface($psrClock->now())->modify("+{$duration->inMicroseconds()} microseconds")
206222
);
207223

208224
return $this;
209225
}
210226

227+
private function getStaticPsrClockFromContainer(): StaticPsrClock
228+
{
229+
try {
230+
/** @var Clock $clock */
231+
$clock = $this->configuredMessagingSystem->getServiceFromContainer(EcotoneClockInterface::class);
232+
} catch (\Throwable) {
233+
throw new InvalidArgumentException(
234+
'Changing time is only possible when using StaticPsrClock as the ClockInterface. ' .
235+
'Register ClockInterface::class => new StaticPsrClock() in your container services.'
236+
);
237+
}
238+
239+
if (! $clock instanceof Clock) {
240+
throw new InvalidArgumentException(
241+
'Changing time is only possible when using Clock as the EcotoneClockInterface. ' .
242+
'Register EcotoneClockInterface::class => new Clock(new StaticPsrClock()) in your container services.'
243+
);
244+
}
245+
246+
$clock = $clock->internalClock();
247+
if (! $clock instanceof StaticPsrClock) {
248+
throw new InvalidArgumentException(
249+
'Changing time is only possible when using StaticPsrClock as the ClockInterface. ' .
250+
'Register ClockInterface::class => new StaticPsrClock() in your container services.'
251+
);
252+
}
253+
254+
return $clock;
255+
}
256+
211257
/**
212258
* @param Event[]|object[]|array[] $events
213259
*/

packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/EndpointHeaders/EndpointHeadersInterceptor.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@
44

55
namespace Ecotone\Messaging\Config\Annotation\ModuleConfiguration\EndpointHeaders;
66

7+
use DateTimeImmutable;
78
use DateTimeInterface;
89
use Ecotone\Messaging\Attribute\Endpoint\AddHeader;
910
use Ecotone\Messaging\Attribute\Endpoint\Delayed;
11+
12+
use function is_string;
13+
use function preg_match;
14+
use function str_ends_with;
1015
use Ecotone\Messaging\Attribute\Endpoint\Priority;
1116
use Ecotone\Messaging\Attribute\Endpoint\RemoveHeader;
1217
use Ecotone\Messaging\Attribute\Endpoint\TimeToLive;
@@ -62,6 +67,10 @@ public function addMetadata(Message $message, ?AddHeader $addHeader, ?Delayed $d
6267
]);
6368
}
6469

70+
if (is_string($metadata[MessageHeaders::DELIVERY_DELAY])) {
71+
$metadata[MessageHeaders::DELIVERY_DELAY] = $this->parseDateTimeStringWithRequiredOffset($metadata[MessageHeaders::DELIVERY_DELAY]);
72+
}
73+
6574
$type = Type::createFromVariable($metadata[MessageHeaders::DELIVERY_DELAY]);
6675
if (! $type->isCompatibleWith(UnionType::createWith([
6776
Type::int(),
@@ -120,4 +129,20 @@ public function getDefinition(): Definition
120129
Reference::to(ExpressionEvaluationService::REFERENCE),
121130
]);
122131
}
132+
133+
private function parseDateTimeStringWithRequiredOffset(string $dateTimeString): DateTimeImmutable
134+
{
135+
if (! $this->hasUtcOffset($dateTimeString)) {
136+
throw ConfigurationException::create("Delivery delay string '{$dateTimeString}' must contain a UTC offset (e.g., '+02:00' or 'Z'). Dates without timezone information are ambiguous.");
137+
}
138+
139+
return new DateTimeImmutable($dateTimeString);
140+
}
141+
142+
private function hasUtcOffset(string $dateTimeString): bool
143+
{
144+
return preg_match('/[+-]\d{2}:\d{2}$/', $dateTimeString) === 1
145+
|| preg_match('/[+-]\d{4}$/', $dateTimeString) === 1
146+
|| str_ends_with($dateTimeString, 'Z');
147+
}
123148
}

packages/Ecotone/src/Messaging/Scheduling/Clock.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ public static function get(): EcotoneClockInterface
3636
return self::$globalClock ?? new self(self::defaultClock());
3737
}
3838

39+
public function internalClock(): PsrClockInterface
40+
{
41+
return $this->clock;
42+
}
43+
3944
public function now(): DatePoint
4045
{
4146
$now = $this->clock->now();

packages/Ecotone/src/Test/StaticPsrClock.php

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,46 @@
1414
*/
1515
final class StaticPsrClock implements ClockInterface, SleepInterface
1616
{
17-
private Duration $sleepDuration;
17+
private bool $hasBeenChanged = false;
18+
private ?DateTimeImmutable $now = null;
1819

19-
public function __construct(private ?string $now = null)
20+
public function __construct(?string $now = null)
2021
{
21-
$this->sleepDuration = Duration::zero();
22+
$this->now = ($now === null || $now === 'now') ? null : new DateTimeImmutable($now);
2223
}
2324

2425
public function now(): DateTimeImmutable
2526
{
26-
$now = $this->now === null ? new DateTimeImmutable() : new DateTimeImmutable($this->now);
27+
if ($this->now !== null) {
28+
return $this->now;
29+
}
2730

28-
return $now->modify("+{$this->sleepDuration->zeroIfNegative()->inMicroseconds()} microseconds");
31+
return new DateTimeImmutable('now');
2932
}
3033

3134
public function sleep(Duration $duration): void
3235
{
33-
$this->sleepDuration = $this->sleepDuration->add($duration);
36+
if ($duration->isNegativeOrZero()) {
37+
return;
38+
}
39+
40+
if ($this->now === null) {
41+
42+
usleep($duration->inMicroseconds());
43+
return;
44+
}
45+
46+
$this->now = $this->now()->modify("+{$duration->inMicroseconds()} microseconds");
47+
}
48+
49+
public function hasBeenChanged(): bool
50+
{
51+
return $this->hasBeenChanged;
52+
}
53+
54+
public function setCurrentTime(DateTimeImmutable $time): void
55+
{
56+
$this->now = $time;
57+
$this->hasBeenChanged = true;
3458
}
3559
}

packages/Ecotone/tests/Messaging/Fixture/AddHeaders/AddingMultipleHeaders.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,12 @@ public function testKeepTtlHeader(): void
6363
{
6464

6565
}
66+
67+
#[Delayed(expression: 'payload.delay')]
68+
#[Asynchronous('async')]
69+
#[CommandHandler('addHeadersWithStringDelayExpression', endpointId: 'addHeadersWithStringDelayExpressionEndpoint')]
70+
public function withStringDelayExpression(): void
71+
{
72+
73+
}
6674
}

0 commit comments

Comments
 (0)