-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathLockService.php
More file actions
169 lines (139 loc) · 4.8 KB
/
LockService.php
File metadata and controls
169 lines (139 loc) · 4.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
<?php
declare(strict_types=1);
namespace PhpList\Core\Bounce\Service;
use PhpList\Core\Domain\Messaging\Repository\SendProcessRepository;
use PhpList\Core\Domain\Messaging\Service\Manager\SendProcessManager;
use Psr\Log\LoggerInterface;
use Symfony\Component\String\UnicodeString;
class LockService
{
private SendProcessRepository $repo;
private SendProcessManager $manager;
private LoggerInterface $logger;
private int $staleAfterSeconds;
private int $sleepSeconds;
private int $maxWaitCycles;
public function __construct(
SendProcessRepository $repo,
SendProcessManager $manager,
LoggerInterface $logger,
int $staleAfterSeconds = 600,
int $sleepSeconds = 20,
int $maxWaitCycles = 10
) {
$this->repo = $repo;
$this->manager = $manager;
$this->logger = $logger;
$this->staleAfterSeconds = $staleAfterSeconds;
$this->sleepSeconds = $sleepSeconds;
$this->maxWaitCycles = $maxWaitCycles;
}
public function acquirePageLock(
string $page,
bool $force = false,
bool $isCli = false,
bool $multiSend = false,
int $maxSendProcesses = 1,
?string $clientIp = null,
): ?int {
$page = $this->sanitizePage($page);
$max = $this->resolveMax($isCli, $multiSend, $maxSendProcesses);
if ($force) {
$this->logger->info('Force set, killing other send processes (deleting lock rows).');
$this->repo->deleteByPage($page);
}
$waited = 0;
while (true) {
$count = $this->repo->countAliveByPage($page);
$running = $this->manager->findNewestAliveWithAge($page);
if ($count >= $max) {
if ($this->tryStealIfStale($running)) {
continue;
}
$this->logAliveAge($running);
if ($isCli) {
$this->logger->info("Running commandline, quitting. We'll find out what to do in the next run.");
return null;
}
if (!$this->waitOrGiveUp($waited)) {
$this->logger->info('We have been waiting too long, I guess the other process is still going ok');
return null;
}
continue;
}
$processIdentifier = $this->buildProcessIdentifier($isCli, $clientIp);
$sendProcess = $this->manager->create($page, $processIdentifier);
return $sendProcess->getId();
}
}
public function keepLock(int $processId): void
{
$this->repo->incrementAlive($processId);
}
public function checkLock(int $processId): int
{
return $this->repo->getAliveValue($processId);
}
public function release(int $processId): void
{
$this->repo->markDeadById($processId);
}
private function sanitizePage(string $page): string
{
$unicodeString = new UnicodeString($page);
$clean = preg_replace('/\W/', '', (string) $unicodeString);
return $clean === '' ? 'default' : $clean;
}
private function resolveMax(bool $isCli, bool $multiSend, int $maxSendProcesses): int
{
if (!$isCli) {
return 1;
}
return $multiSend ? \max(1, $maxSendProcesses) : 1;
}
/**
* Returns true if it detected a stale process and killed it (so caller should loop again).
*
* @param array{id?: int, age?: int}|null $running
*/
private function tryStealIfStale(?array $running): bool
{
$age = (int)($running['age'] ?? 0);
if ($age > $this->staleAfterSeconds && isset($running['id'])) {
$this->repo->markDeadById((int)$running['id']);
return true;
}
return false;
}
/**
* @param array{id?: int, age?: int}|null $running
*/
private function logAliveAge(?array $running): void
{
$age = (int)($running['age'] ?? 0);
$this->logger->info(
\sprintf(
'A process for this page is already running and it was still alive %d seconds ago',
$age
)
);
}
/**
* Sleeps once and increments $waited. Returns false if we exceeded max wait cycles.
*/
private function waitOrGiveUp(int &$waited): bool
{
$this->logger->info(\sprintf('Sleeping for %d seconds, aborting will quit', $this->sleepSeconds));
\sleep($this->sleepSeconds);
$waited++;
return $waited <= $this->maxWaitCycles;
}
private function buildProcessIdentifier(bool $isCli, ?string $clientIp): string
{
if ($isCli) {
$host = \php_uname('n') ?: 'localhost';
return $host . ':' . \getmypid();
}
return $clientIp ?? '0.0.0.0';
}
}