diff --git a/Command/ConsumeCommand.php b/Command/ConsumeCommand.php index 9bc5fc1b..21e9203c 100644 --- a/Command/ConsumeCommand.php +++ b/Command/ConsumeCommand.php @@ -17,6 +17,8 @@ class ConsumeCommand extends ContainerAwareCommand { const OPTION_MAX_MESSAGES = 'killAfter'; const OPTION_MAX_MESSAGES_DEFAULT_VALUE = -1; // -1 = No limit + const OPTION_MAX_TIME = 'killAfterTime'; + const OPTION_MAX_TIME_DEFAULT_VALUE = 0; // 0 = No limit /** @var EndpointInterface */ protected $endpoint; @@ -64,6 +66,13 @@ protected function configure() 'How many messages should be processed before the worker is killed? -1 for never, default value is '.self::OPTION_MAX_MESSAGES_DEFAULT_VALUE.'.', self::OPTION_MAX_MESSAGES_DEFAULT_VALUE ); + $this->addOption( + self::OPTION_MAX_TIME, + 't', + InputOption::VALUE_REQUIRED, + 'How long before the worker is killed? 0 for never, default value is '.self::OPTION_MAX_TIME_DEFAULT_VALUE.'.', + self::OPTION_MAX_TIME_DEFAULT_VALUE + ); } @@ -80,12 +89,15 @@ protected function execute(InputInterface $input, OutputInterface $output) if ($input->getOption(self::OPTION_MAX_MESSAGES) > 0) { $message .= ' limited to '.$input->getOption(self::OPTION_MAX_MESSAGES).' messages'; } + if ($input->getOption(self::OPTION_MAX_TIME) > 0) { + $message .= ' limited to '.$input->getOption(self::OPTION_MAX_TIME).' seconds'; + } $message .= '.'; $output->writeln($message); pcntl_signal(SIGINT, [$this, 'handleSignal']); pcntl_signal(SIGTERM, [$this, 'handleSignal']); - $this->endpoint->consume($input->getOption(self::OPTION_MAX_MESSAGES)); + $this->endpoint->consume($input->getOption(self::OPTION_MAX_MESSAGES), $input->getOption(self::OPTION_MAX_TIME)); $output->writeln('Consumer was gracefully stopped for: '.$this->endpoint->getURI().''); } diff --git a/Core/Consumers/ConsumerInterface.php b/Core/Consumers/ConsumerInterface.php index 0fbc0999..7e792a3b 100644 --- a/Core/Consumers/ConsumerInterface.php +++ b/Core/Consumers/ConsumerInterface.php @@ -21,6 +21,11 @@ public function stop(); */ public function setExpirationCount($count); + /** + * @param $time + */ + public function setExpirationTime($time); + /** * Consumes messages from the given $endpoint until either the expirationCount reaches 0 or ::stop() is called. * diff --git a/Core/Consumers/IsStopableConsumer.php b/Core/Consumers/IsStopableConsumer.php index 0aa91527..153531c9 100644 --- a/Core/Consumers/IsStopableConsumer.php +++ b/Core/Consumers/IsStopableConsumer.php @@ -10,6 +10,9 @@ trait IsStopableConsumer { /** @var int */ protected $expirationCount = -1; + /** @var int */ + protected $expirationTime = 100 * 365 * 24 * 60 * 60; + /** * {@inheritDoc} */ @@ -26,6 +29,18 @@ public function setExpirationCount($count) $this->expirationCount = $count; } + /** + * {@inheritDoc} + */ + public function setExpirationTime($time) + { + $this->expirationTime = 100 * 365 * 24 * 60 * 60; // 100 years after Unix Epoch + if ($time > 0) { + $this->expirationTime = time() + $time; // $time seconds after now + } + + } + /** * Checks if it should stop at the current iteration. * @@ -34,6 +49,6 @@ public function setExpirationCount($count) protected function shouldStop() { pcntl_signal_dispatch(); - return $this->stop || $this->expirationCount == 0; + return $this->stop || $this->expirationCount == 0 || time() > $this->expirationTime; } } \ No newline at end of file diff --git a/Core/Endpoints/Endpoint.php b/Core/Endpoints/Endpoint.php index badadbe4..aa1271ed 100644 --- a/Core/Endpoints/Endpoint.php +++ b/Core/Endpoints/Endpoint.php @@ -118,12 +118,16 @@ public function getProducer() /** * {@inheritdoc} */ - public function consume($maxAmount = 0) + public function consume($maxAmount = 0, $maxTime = 0) { if ($maxAmount > 0) { $this->getConsumer()->setExpirationCount($maxAmount); } + if ($maxTime > 0) { + $this->getConsumer()->setExpirationTime($maxTime); + } + $this->getConsumer()->consume($this); } diff --git a/Core/Endpoints/EndpointInterface.php b/Core/Endpoints/EndpointInterface.php index 4fd8fc5d..a0aba834 100644 --- a/Core/Endpoints/EndpointInterface.php +++ b/Core/Endpoints/EndpointInterface.php @@ -61,10 +61,12 @@ public function getProducer(); /** * Consumes $maxAmount of messages, if $maxAmount is 0, then it consumes indefinitely in a loop. + * $maxTime before it is stopped, if $maxTime is 0, consumes indefinitely in a loop unless it is stopped before by $maxAmount. * * @param int $maxAmount + * @param int $maxTime */ - public function consume($maxAmount = 0); + public function consume($maxAmount = 0, $maxTime = 0); /** * @return bool