Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Version 5.5.0
* Added cleanup commands

# Version 5.4.1
* Fix File Exceptions integration

Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ implementing `DataflowTypeInterface`.

Otherwise, manually add the tag `coderhapsodie.dataflow.type` in your dataflow type service configuration:

```yaml
```yaml
CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType:
tags:
Expand Down Expand Up @@ -598,6 +597,10 @@ the messenger component instead.

`code-rhapsodie:dataflow:dump-schema` Generates schema create / update SQL queries

`code-rhapsodie:dataflow:set_crashed` Jobs that have been in the "running" status for too long will be set in the "crashed" status.

`code-rhapsodie:dataflow:job_cleanup` Remove old completed or crashed jobs
Comment thread
jeremycr marked this conversation as resolved.
Outdated

### Work with many databases

All commands have a `--connection` option to define what Doctrine DBAL connection to use during execution.
Expand Down
32 changes: 32 additions & 0 deletions src/Command/JobCleanupCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);

namespace CodeRhapsodie\DataflowBundle\Command;

use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(name: 'code-rhapsodie:job_cleanup', description: 'Cleanup job history.')]
Comment thread
jeremycr marked this conversation as resolved.
Outdated
class JobCleanupCommand extends Command
{
public function __construct(private JobRepository $jobRepository, private int $retention)
{
parent::__construct();
}

protected function configure()
{
$this->setHelp('Job retention can be configured with the "job_history.retention" configuration.');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->jobRepository->deleteOld($this->retention);

return Command::SUCCESS;
}
}
2 changes: 1 addition & 1 deletion src/Command/JobShowCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$io->table(['Field', 'Value'], $display);
if ($input->getOption('details')) {
$io->section('Exceptions');
$exceptions = array_map(fn (string $exception) => substr($exception, 0, 900).'…', $job->getExceptions());
$exceptions = array_map(static fn (string $exception) => substr($exception, 0, 900).'…', $job->getExceptions());

$io->write($exceptions);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Command/SchemaCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int

// add -- before each keys
$options = array_combine(
array_map(fn ($key) => '--'.$key, array_keys($options)),
array_map(static fn ($key) => '--'.$key, array_keys($options)),
array_values($options)
);

Expand Down
32 changes: 32 additions & 0 deletions src/Command/SetCrashedCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);

namespace CodeRhapsodie\DataflowBundle\Command;

use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(name: 'code-rhapsodie:set_crashed', description: 'Set long running jobs as crashed.')]
Comment thread
jeremycr marked this conversation as resolved.
Outdated
class SetCrashedCommand extends Command
{
public function __construct(private JobRepository $jobRepository, private int $crashedDelay)
{
parent::__construct();
}

protected function configure()
{
$this->setHelp('How long jobs have to run before they are set as crashed can be configured with the "job_history.crashed_delay" configuration.');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->jobRepository->crashLongRunning($this->crashedDelay);

return Command::SUCCESS;
}
}
2 changes: 1 addition & 1 deletion src/DataflowType/Dataflow/Dataflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public function setCustomExceptionIndex(callable $callable): self
*/
public function setAfterItemProcessors(array $processors): self
{
$this->afterItemProcessors = array_map(fn (callable $callable) => \Closure::fromCallable($callable), $processors);
$this->afterItemProcessors = array_map(static fn (callable $callable) => \Closure::fromCallable($callable), $processors);

return $this;
}
Expand Down
2 changes: 2 additions & 0 deletions src/DependencyInjection/CodeRhapsodieDataflowExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public function load(array $configs, ContainerBuilder $container): void
$container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']);
$container->setParameter('coderhapsodie.dataflow.default_logger', $config['default_logger']);
$container->setParameter('coderhapsodie.dataflow.exceptions_mode.type', $config['exceptions_mode']['type']);
$container->setParameter('coderhapsodie.dataflow.job_history.retention', $config['job_history']['retention']);
$container->setParameter('coderhapsodie.dataflow.job_history.crashed_delay', $config['job_history']['crashed_delay']);

if ($config['exceptions_mode']['type'] === 'file') {
$container->setParameter('coderhapsodie.dataflow.flysystem_service', $config['exceptions_mode']['flysystem_service']);
Expand Down
14 changes: 14 additions & 0 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,20 @@ public function getConfigTreeBuilder(): TreeBuilder
->thenInvalid('You need "league/flysystem" to use Dataflow file exception mode.')
->end()
->end()
->arrayNode('job_history')
->addDefaultsIfNotSet()
->children()
->integerNode('retention')
->defaultValue(30)
->min(0)
->info('How many days completed and crashed jobs are kept when running the cleanup command.')
->end()
->integerNode('crashed_delay')
->defaultValue(24)
->min(24)
->info('Jobs running for more than this many hours will be set as crashed when running the cleanup command.')
->end()
->end()
->end()
;

Expand Down
1 change: 1 addition & 0 deletions src/Entity/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Job
public const STATUS_RUNNING = 1;
public const STATUS_COMPLETED = 2;
public const STATUS_QUEUED = 3;
public const STATUS_CRASHED = 4;

private const KEYS = [
'id',
Expand Down
27 changes: 27 additions & 0 deletions src/Repository/JobRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,33 @@ public function createQueryBuilder($alias = null): QueryBuilder
return $qb;
}

public function crashLongRunning(int $hours): void
{
$qb = $this->connection->createQueryBuilder();
$qb->update(static::TABLE_NAME, 'j')
->set('j.status', ':new_status')
->set('j.end_time', ':now')
->andWhere('j.status = :status')
->andWhere('j.start_time < :date')
->setParameter('status', Job::STATUS_RUNNING)
->setParameter('date', new \DateTime("- {$hours} hours"), 'datetime')
->setParameter('new_status', Job::STATUS_CRASHED)
->setParameter('now', new \DateTime(), 'datetime')
->executeStatement()
;
}

public function deleteOld(int $days): void
{
$qb = $this->connection->createQueryBuilder();
$qb->delete(static::TABLE_NAME, 'j')
->andWhere($qb->expr()->in('j.status', [Job::STATUS_COMPLETED, Job::STATUS_CRASHED]))
->andWhere('j.end_time < :date')
->setParameter('date', new \DateTime("- {$days} days"), 'datetime')
->executeStatement()
;
}

private function returnFirstOrNull(QueryBuilder $qb): ?Job
{
$stmt = $qb->executeQuery();
Expand Down
14 changes: 13 additions & 1 deletion src/Resources/config/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ services:
public: false

CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry'
CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry:
CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry: ~

CodeRhapsodie\DataflowBundle\Command\AddScheduledDataflowCommand:
arguments:
Expand Down Expand Up @@ -100,3 +100,15 @@ services:
arguments:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$exceptionHandler: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface'

CodeRhapsodie\DataflowBundle\Command\JobCleanupCommand:
arguments:
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$retention: '%coderhapsodie.dataflow.job_history.retention%'
tags: ['console.command']

CodeRhapsodie\DataflowBundle\Command\SetCrashedCommand:
arguments:
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$crashedDelay: '%coderhapsodie.dataflow.job_history.crashed_delay%'
tags: ['console.command']