vendor/shopware/core/Framework/MessageQueue/Api/ConsumeMessagesController.php line 96

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\MessageQueue\Api;
  3. use Shopware\Core\Framework\MessageQueue\Subscriber\CountHandledMessagesListener;
  4. use Shopware\Core\Framework\MessageQueue\Subscriber\EarlyReturnMessagesListener;
  5. use Shopware\Core\Framework\Routing\Annotation\RouteScope;
  6. use Shopware\Core\Framework\Routing\Annotation\Since;
  7. use Shopware\Core\Framework\Util\MemorySizeCalculator;
  8. use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
  9. use Symfony\Component\DependencyInjection\ServiceLocator;
  10. use Symfony\Component\EventDispatcher\EventDispatcher;
  11. use Symfony\Component\HttpFoundation\JsonResponse;
  12. use Symfony\Component\HttpFoundation\Request;
  13. use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
  14. use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
  15. use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener;
  16. use Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener;
  17. use Symfony\Component\Messenger\MessageBusInterface;
  18. use Symfony\Component\Messenger\Worker;
  19. use Symfony\Component\Routing\Annotation\Route;
  20. /**
  21.  * @Route(defaults={"_routeScope"={"api"}})
  22.  */
  23. class ConsumeMessagesController extends AbstractController
  24. {
  25.     /**
  26.      * @var ServiceLocator
  27.      */
  28.     private $receiverLocator;
  29.     /**
  30.      * @var MessageBusInterface
  31.      */
  32.     private $bus;
  33.     /**
  34.      * @var int
  35.      */
  36.     private $pollInterval;
  37.     /**
  38.      * @var StopWorkerOnRestartSignalListener
  39.      */
  40.     private $stopWorkerOnRestartSignalListener;
  41.     /**
  42.      * @var StopWorkerOnSigtermSignalListener
  43.      */
  44.     private $stopWorkerOnSigtermSignalListener;
  45.     /**
  46.      * @var DispatchPcntlSignalListener
  47.      */
  48.     private $dispatchPcntlSignalListener;
  49.     /**
  50.      * @var EarlyReturnMessagesListener
  51.      */
  52.     private $earlyReturnListener;
  53.     private string $defaultTransportName;
  54.     private string $memoryLimit;
  55.     /**
  56.      * @internal
  57.      */
  58.     public function __construct(
  59.         ServiceLocator $receiverLocator,
  60.         MessageBusInterface $bus,
  61.         int $pollInterval,
  62.         StopWorkerOnRestartSignalListener $stopWorkerOnRestartSignalListener,
  63.         StopWorkerOnSigtermSignalListener $stopWorkerOnSigtermSignalListener,
  64.         DispatchPcntlSignalListener $dispatchPcntlSignalListener,
  65.         EarlyReturnMessagesListener $earlyReturnListener,
  66.         string $defaultTransportName,
  67.         string $memoryLimit
  68.     ) {
  69.         $this->receiverLocator $receiverLocator;
  70.         $this->bus $bus;
  71.         $this->pollInterval $pollInterval;
  72.         $this->stopWorkerOnRestartSignalListener $stopWorkerOnRestartSignalListener;
  73.         $this->stopWorkerOnSigtermSignalListener $stopWorkerOnSigtermSignalListener;
  74.         $this->dispatchPcntlSignalListener $dispatchPcntlSignalListener;
  75.         $this->earlyReturnListener $earlyReturnListener;
  76.         $this->defaultTransportName $defaultTransportName;
  77.         $this->memoryLimit $memoryLimit;
  78.     }
  79.     /**
  80.      * @Since("6.0.0.0")
  81.      * @Route("/api/_action/message-queue/consume", name="api.action.message-queue.consume", methods={"POST"})
  82.      */
  83.     public function consumeMessages(Request $request): JsonResponse
  84.     {
  85.         $receiverName $request->get('receiver');
  86.         if (!$receiverName || !$this->receiverLocator->has($receiverName)) {
  87.             throw new \RuntimeException('No receiver name provided.');
  88.         }
  89.         $receiver $this->receiverLocator->get($receiverName);
  90.         $workerDispatcher = new EventDispatcher();
  91.         $listener = new CountHandledMessagesListener($this->pollInterval);
  92.         $workerDispatcher->addSubscriber($listener);
  93.         $workerDispatcher->addSubscriber($this->stopWorkerOnRestartSignalListener);
  94.         $workerDispatcher->addSubscriber($this->stopWorkerOnSigtermSignalListener);
  95.         $workerDispatcher->addSubscriber($this->dispatchPcntlSignalListener);
  96.         $workerDispatcher->addSubscriber($this->earlyReturnListener);
  97.         if ($this->memoryLimit !== '-1') {
  98.             $workerDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener(
  99.                 MemorySizeCalculator::convertToBytes($this->memoryLimit)
  100.             ));
  101.         }
  102.         $worker = new Worker([$this->defaultTransportName => $receiver], $this->bus$workerDispatcher);
  103.         $worker->run(['sleep' => 50]);
  104.         return $this->json(['handledMessages' => $listener->getHandledMessages()]);
  105.     }
  106. }