vendor/shopware/core/Framework/DataAbstractionLayer/Indexing/EntityIndexerRegistry.php line 99

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\DataAbstractionLayer\Indexing;
  3. use Shopware\Core\Framework\Context;
  4. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent;
  5. use Shopware\Core\Framework\DataAbstractionLayer\Indexing\MessageQueue\IterateEntityIndexerMessage;
  6. use Shopware\Core\Framework\Event\ProgressAdvancedEvent;
  7. use Shopware\Core\Framework\Event\ProgressFinishedEvent;
  8. use Shopware\Core\Framework\Event\ProgressStartedEvent;
  9. use Shopware\Core\Framework\MessageQueue\Handler\AbstractMessageHandler;
  10. use Shopware\Core\Framework\Struct\ArrayStruct;
  11. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  12. use Symfony\Component\Messenger\MessageBusInterface;
  13. use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
  14. class EntityIndexerRegistry extends AbstractMessageHandler implements EventSubscriberInterface
  15. {
  16.     public const EXTENSION_INDEXER_SKIP 'indexer-skip';
  17.     /**
  18.      * @deprecated tag:v6.5.0 - `$context->addExtension(EntityIndexerRegistry::USE_INDEXING_QUEUE, ...)` will be ignored, use `context->addState(EntityIndexerRegistry::USE_INDEXING_QUEUE)` instead
  19.      */
  20.     public const USE_INDEXING_QUEUE 'use-queue-indexing';
  21.     /**
  22.      * @deprecated tag:v6.5.0 - `$context->addExtension(EntityIndexerRegistry::DISABLE_INDEXING, ...)` will be ignored, use `context->addState(EntityIndexerRegistry::DISABLE_INDEXING)` instead
  23.      */
  24.     public const DISABLE_INDEXING 'disable-indexing';
  25.     /**
  26.      * @var EntityIndexer[]
  27.      */
  28.     private iterable $indexer;
  29.     private MessageBusInterface $messageBus;
  30.     private bool $working false;
  31.     private EventDispatcherInterface $dispatcher;
  32.     /**
  33.      * @internal
  34.      */
  35.     public function __construct(iterable $indexerMessageBusInterface $messageBusEventDispatcherInterface $dispatcher)
  36.     {
  37.         $this->indexer $indexer;
  38.         $this->messageBus $messageBus;
  39.         $this->dispatcher $dispatcher;
  40.     }
  41.     public static function getSubscribedEvents(): array
  42.     {
  43.         return [
  44.             EntityWrittenContainerEvent::class => [
  45.                 ['refresh'1000],
  46.             ],
  47.         ];
  48.     }
  49.     public static function getHandledMessages(): iterable
  50.     {
  51.         return [
  52.             EntityIndexingMessage::class,
  53.             IterateEntityIndexerMessage::class,
  54.         ];
  55.     }
  56.     public function index(bool $useQueue, array $skip = []): void
  57.     {
  58.         foreach ($this->indexer as $indexer) {
  59.             if (\in_array($indexer->getName(), $skiptrue)) {
  60.                 continue;
  61.             }
  62.             $offset null;
  63.             $this->dispatcher->dispatch(new ProgressStartedEvent($indexer->getName(), $indexer->getTotal()));
  64.             while ($message $indexer->iterate($offset)) {
  65.                 $message->setIndexer($indexer->getName());
  66.                 $message->addSkip(...$skip);
  67.                 $this->sendOrHandle($message$useQueue);
  68.                 $offset $message->getOffset();
  69.                 try {
  70.                     $count = \is_array($message->getData()) ? \count($message->getData()) : 1;
  71.                     $this->dispatcher->dispatch(new ProgressAdvancedEvent($count));
  72.                 } catch (\Exception $e) {
  73.                 }
  74.             }
  75.             $this->dispatcher->dispatch(new ProgressFinishedEvent($indexer->getName()));
  76.         }
  77.     }
  78.     public function refresh(EntityWrittenContainerEvent $event): void
  79.     {
  80.         $context $event->getContext();
  81.         if ($this->working) {
  82.             return;
  83.         }
  84.         $this->working true;
  85.         if ($this->disabled($context)) {
  86.             $this->working false;
  87.             return;
  88.         }
  89.         $useQueue $this->useQueue($context);
  90.         foreach ($this->indexer as $indexer) {
  91.             $message $indexer->update($event);
  92.             if (!$message) {
  93.                 continue;
  94.             }
  95.             $message->setIndexer($indexer->getName());
  96.             self::addSkips($message$context);
  97.             $this->sendOrHandle($message$useQueue);
  98.         }
  99.         $this->working false;
  100.     }
  101.     public static function addSkips(EntityIndexingMessage $messageContext $context): void
  102.     {
  103.         if (!$context->hasExtension(self::EXTENSION_INDEXER_SKIP)) {
  104.             return;
  105.         }
  106.         /** @var ArrayStruct<string, mixed> $skip */
  107.         $skip $context->getExtension(self::EXTENSION_INDEXER_SKIP);
  108.         $message->addSkip(...$skip->all());
  109.     }
  110.     /**
  111.      * @param mixed $message
  112.      */
  113.     public function handle($message): void
  114.     {
  115.         if ($message instanceof EntityIndexingMessage) {
  116.             $indexer $this->getIndexer($message->getIndexer());
  117.             if ($indexer) {
  118.                 $indexer->handle($message);
  119.             }
  120.             return;
  121.         }
  122.         if ($message instanceof IterateEntityIndexerMessage) {
  123.             $next $this->iterateIndexer($message->getIndexer(), $message->getOffset(), true$message->getSkip());
  124.             if (!$next) {
  125.                 return;
  126.             }
  127.             $this->messageBus->dispatch(new IterateEntityIndexerMessage($message->getIndexer(), $next->getOffset(), $message->getSkip()));
  128.         }
  129.     }
  130.     public function sendIndexingMessage(array $indexer = [], array $skip = []): void
  131.     {
  132.         if (empty($indexer)) {
  133.             $indexer = [];
  134.             foreach ($this->indexer as $loop) {
  135.                 $indexer[] = $loop->getName();
  136.             }
  137.         }
  138.         if (empty($indexer)) {
  139.             return;
  140.         }
  141.         foreach ($indexer as $name) {
  142.             if (\in_array($name$skiptrue)) {
  143.                 continue;
  144.             }
  145.             $this->messageBus->dispatch(new IterateEntityIndexerMessage($namenull$skip));
  146.         }
  147.     }
  148.     public function has(string $name): bool
  149.     {
  150.         return $this->getIndexer($name) !== null;
  151.     }
  152.     public function getIndexer(string $name): ?EntityIndexer
  153.     {
  154.         foreach ($this->indexer as $indexer) {
  155.             if ($indexer->getName() === $name) {
  156.                 return $indexer;
  157.             }
  158.         }
  159.         return null;
  160.     }
  161.     private function useQueue(Context $context): bool
  162.     {
  163.         return $context->hasExtension(self::USE_INDEXING_QUEUE) || $context->hasState(self::USE_INDEXING_QUEUE);
  164.     }
  165.     private function disabled(Context $context): bool
  166.     {
  167.         return $context->hasExtension(self::DISABLE_INDEXING) || $context->hasState(self::DISABLE_INDEXING);
  168.     }
  169.     private function sendOrHandle(EntityIndexingMessage $messagebool $useQueue): void
  170.     {
  171.         if ($useQueue || $message->forceQueue()) {
  172.             $this->messageBus->dispatch($message);
  173.             return;
  174.         }
  175.         $this->handle($message);
  176.     }
  177.     private function iterateIndexer(string $name, ?array $offsetbool $useQueue, array $skip): ?EntityIndexingMessage
  178.     {
  179.         $indexer $this->getIndexer($name);
  180.         if (!$indexer instanceof EntityIndexer) {
  181.             throw new \RuntimeException(sprintf('Entity indexer with name %s not found'$name));
  182.         }
  183.         $message $indexer->iterate($offset);
  184.         if (!$message) {
  185.             return null;
  186.         }
  187.         $message->setIndexer($indexer->getName());
  188.         $message->addSkip(...$skip);
  189.         $this->sendOrHandle($message$useQueue);
  190.         return $message;
  191.     }
  192. }