vendor/shopware/core/Framework/MessageQueue/ScheduledTask/Scheduler/TaskScheduler.php line 144

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\MessageQueue\ScheduledTask\Scheduler;
  3. use Shopware\Core\Defaults;
  4. use Shopware\Core\Framework\Context;
  5. use Shopware\Core\Framework\DataAbstractionLayer\EntityRepositoryInterface;
  6. use Shopware\Core\Framework\DataAbstractionLayer\Search\Aggregation\Metric\MinAggregation;
  7. use Shopware\Core\Framework\DataAbstractionLayer\Search\AggregationResult\AggregationResult;
  8. use Shopware\Core\Framework\DataAbstractionLayer\Search\AggregationResult\Metric\MinResult;
  9. use Shopware\Core\Framework\DataAbstractionLayer\Search\Criteria;
  10. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\EqualsFilter;
  11. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\NotFilter;
  12. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\RangeFilter;
  13. use Shopware\Core\Framework\MessageQueue\ScheduledTask\ScheduledTask;
  14. use Shopware\Core\Framework\MessageQueue\ScheduledTask\ScheduledTaskDefinition;
  15. use Shopware\Core\Framework\MessageQueue\ScheduledTask\ScheduledTaskEntity;
  16. use Symfony\Component\DependencyInjection\ParameterBag\ParameterBagInterface;
  17. use Symfony\Component\Messenger\MessageBusInterface;
  18. class TaskScheduler
  19. {
  20.     /**
  21.      * @var EntityRepositoryInterface
  22.      */
  23.     private $scheduledTaskRepository;
  24.     /**
  25.      * @var MessageBusInterface
  26.      */
  27.     private $bus;
  28.     private ParameterBagInterface $parameterBag;
  29.     /**
  30.      * @internal
  31.      */
  32.     public function __construct(
  33.         EntityRepositoryInterface $scheduledTaskRepository,
  34.         MessageBusInterface $bus,
  35.         ParameterBagInterface $parameterBag
  36.     ) {
  37.         $this->scheduledTaskRepository $scheduledTaskRepository;
  38.         $this->bus $bus;
  39.         $this->parameterBag $parameterBag;
  40.     }
  41.     public function queueScheduledTasks(): void
  42.     {
  43.         $criteria $this->buildCriteriaForAllScheduledTask();
  44.         $context Context::createDefaultContext();
  45.         $tasks $this->scheduledTaskRepository->search($criteria$context)->getEntities();
  46.         if (\count($tasks) === 0) {
  47.             return;
  48.         }
  49.         // Tasks **must not** be queued before their state in the database has been updated. Otherwise,
  50.         // a worker could have already fetched the task and set its state to running before it gets set to
  51.         // queued, thus breaking the task.
  52.         /** @var ScheduledTaskEntity $task */
  53.         foreach ($tasks as $task) {
  54.             $this->scheduledTaskRepository->update([
  55.                 [
  56.                     'id' => $task->getId(),
  57.                     'status' => ScheduledTaskDefinition::STATUS_QUEUED,
  58.                 ],
  59.             ], $context);
  60.             $this->queueTask($task);
  61.         }
  62.     }
  63.     public function getNextExecutionTime(): ?\DateTimeInterface
  64.     {
  65.         $criteria $this->buildCriteriaForNextScheduledTask();
  66.         /** @var AggregationResult $aggregation */
  67.         $aggregation $this->scheduledTaskRepository
  68.             ->aggregate($criteriaContext::createDefaultContext())
  69.             ->get('nextExecutionTime');
  70.         /** @var MinResult $aggregation */
  71.         if (!$aggregation instanceof MinResult) {
  72.             return null;
  73.         }
  74.         if ($aggregation->getMin() === null) {
  75.             return null;
  76.         }
  77.         return new \DateTime((string) $aggregation->getMin());
  78.     }
  79.     public function getMinRunInterval(): ?int
  80.     {
  81.         $criteria $this->buildCriteriaForMinRunInterval();
  82.         $aggregation $this->scheduledTaskRepository
  83.             ->aggregate($criteriaContext::createDefaultContext())
  84.             ->get('runInterval');
  85.         /** @var MinResult $aggregation */
  86.         if (!$aggregation instanceof MinResult) {
  87.             return null;
  88.         }
  89.         if ($aggregation->getMin() === null) {
  90.             return null;
  91.         }
  92.         return (int) $aggregation->getMin();
  93.     }
  94.     private function buildCriteriaForAllScheduledTask(): Criteria
  95.     {
  96.         $criteria = new Criteria();
  97.         $criteria->addFilter(
  98.             new RangeFilter(
  99.                 'nextExecutionTime',
  100.                 [
  101.                     RangeFilter::LT => (new \DateTime())->format(Defaults::STORAGE_DATE_TIME_FORMAT),
  102.                 ]
  103.             ),
  104.             new EqualsFilter('status'ScheduledTaskDefinition::STATUS_SCHEDULED)
  105.         );
  106.         return $criteria;
  107.     }
  108.     private function queueTask(ScheduledTaskEntity $taskEntity): void
  109.     {
  110.         $taskClass $taskEntity->getScheduledTaskClass();
  111.         if (!\is_a($taskClassScheduledTask::class, true)) {
  112.             throw new \RuntimeException(sprintf(
  113.                 'Tried to schedule "%s", but class does not extend ScheduledTask',
  114.                 $taskClass
  115.             ));
  116.         }
  117.         if (!$taskClass::shouldRun($this->parameterBag)) {
  118.             return;
  119.         }
  120.         $task = new $taskClass();
  121.         $task->setTaskId($taskEntity->getId());
  122.         $this->bus->dispatch($task);
  123.     }
  124.     private function buildCriteriaForNextScheduledTask(): Criteria
  125.     {
  126.         $criteria = new Criteria();
  127.         $criteria->addFilter(
  128.             new EqualsFilter('status'ScheduledTaskDefinition::STATUS_SCHEDULED)
  129.         )
  130.         ->addAggregation(new MinAggregation('nextExecutionTime''nextExecutionTime'));
  131.         return $criteria;
  132.     }
  133.     private function buildCriteriaForMinRunInterval(): Criteria
  134.     {
  135.         $criteria = new Criteria();
  136.         $criteria->addFilter(
  137.             new NotFilter(NotFilter::CONNECTION_AND, [
  138.                 new EqualsFilter('status'ScheduledTaskDefinition::STATUS_INACTIVE),
  139.             ])
  140.         )
  141.         ->addAggregation(new MinAggregation('runInterval''runInterval'));
  142.         return $criteria;
  143.     }
  144. }