-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathEventStream.php
More file actions
83 lines (66 loc) · 2.02 KB
/
EventStream.php
File metadata and controls
83 lines (66 loc) · 2.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
<?php
namespace Casper\EventStream;
class EventStream
{
public const STREAM_PATH_MAIN = '/events/main';
public const STREAM_PATH_DEPLOYS = '/events/deploys';
public const STREAM_PATH_SIGS = '/events/sigs';
private string $nodeUrl;
private string $streamPath;
private int $startFromEvent;
private $onEvent;
private bool $aborted = false;
/**
* @throws \Exception
*/
public function __construct(string $nodeUrl, string $streamPath, int $startFromEvent = 0)
{
$this->assertSteamPathIsValid($streamPath);
$this->nodeUrl = $nodeUrl;
$this->streamPath = $streamPath;
$this->startFromEvent = $startFromEvent;
}
public function onEvent(callable $onEvent): void
{
$this->onEvent = $onEvent;
}
public function abort()
{
$this->aborted = true;
}
/**
* @throws \Exception
*/
public function listen(): void
{
$curl = curl_init($this->nodeUrl . $this->streamPath);
curl_setopt_array($curl, array(
CURLOPT_WRITEFUNCTION => function ($_, $data) {
$event = new Event(trim($data));
if (is_callable($this->onEvent) && $event->getId() >= $this->startFromEvent && $event->getData()) {
($this->onEvent)($event);
}
return strlen($data);
},
CURLOPT_NOPROGRESS => false,
CURLOPT_PROGRESSFUNCTION => function () {
return $this->aborted;
}
));
curl_exec($curl);
$error = curl_error($curl);
if (!$this->aborted && $error) {
throw new \Exception($error);
}
curl_close($curl);
}
/**
* @throws \Exception
*/
private function assertSteamPathIsValid(string $streamPath): void
{
if (!in_array($streamPath, [self::STREAM_PATH_MAIN, self::STREAM_PATH_DEPLOYS, self::STREAM_PATH_SIGS])) {
throw new \Exception('Invalid stream path');
}
}
}