vendor/shopware/core/Content/Product/DataAbstractionLayer/StockUpdater.php line 126

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Content\Product\DataAbstractionLayer;
  3. use Doctrine\DBAL\Connection;
  4. use Shopware\Core\Checkout\Cart\Event\CheckoutOrderPlacedEvent;
  5. use Shopware\Core\Checkout\Cart\LineItem\LineItem;
  6. use Shopware\Core\Checkout\Order\Aggregate\OrderLineItem\OrderLineItemDefinition;
  7. use Shopware\Core\Checkout\Order\OrderEvents;
  8. use Shopware\Core\Checkout\Order\OrderStates;
  9. use Shopware\Core\Content\Product\Events\ProductNoLongerAvailableEvent;
  10. use Shopware\Core\Defaults;
  11. use Shopware\Core\Framework\Context;
  12. use Shopware\Core\Framework\DataAbstractionLayer\Doctrine\RetryableQuery;
  13. use Shopware\Core\Framework\DataAbstractionLayer\EntityWriteResult;
  14. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenEvent;
  15. use Shopware\Core\Framework\DataAbstractionLayer\Write\Command\ChangeSetAware;
  16. use Shopware\Core\Framework\DataAbstractionLayer\Write\Command\DeleteCommand;
  17. use Shopware\Core\Framework\DataAbstractionLayer\Write\Command\InsertCommand;
  18. use Shopware\Core\Framework\DataAbstractionLayer\Write\Command\UpdateCommand;
  19. use Shopware\Core\Framework\DataAbstractionLayer\Write\Command\WriteCommand;
  20. use Shopware\Core\Framework\DataAbstractionLayer\Write\Validation\PreWriteValidationEvent;
  21. use Shopware\Core\Framework\Uuid\Uuid;
  22. use Shopware\Core\Profiling\Profiler;
  23. use Shopware\Core\System\StateMachine\Event\StateMachineTransitionEvent;
  24. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  25. use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
  26. class StockUpdater implements EventSubscriberInterface
  27. {
  28.     private Connection $connection;
  29.     private EventDispatcherInterface $dispatcher;
  30.     /**
  31.      * @internal
  32.      */
  33.     public function __construct(
  34.         Connection $connection,
  35.         EventDispatcherInterface $dispatcher
  36.     ) {
  37.         $this->connection $connection;
  38.         $this->dispatcher $dispatcher;
  39.     }
  40.     /**
  41.      * Returns a list of custom business events to listen where the product maybe changed
  42.      *
  43.      * @return array<string, string|array{0: string, 1: int}|list<array{0: string, 1?: int}>>
  44.      */
  45.     public static function getSubscribedEvents()
  46.     {
  47.         return [
  48.             CheckoutOrderPlacedEvent::class => 'orderPlaced',
  49.             StateMachineTransitionEvent::class => 'stateChanged',
  50.             PreWriteValidationEvent::class => 'triggerChangeSet',
  51.             OrderEvents::ORDER_LINE_ITEM_WRITTEN_EVENT => 'lineItemWritten',
  52.             OrderEvents::ORDER_LINE_ITEM_DELETED_EVENT => 'lineItemWritten',
  53.         ];
  54.     }
  55.     public function triggerChangeSet(PreWriteValidationEvent $event): void
  56.     {
  57.         if ($event->getContext()->getVersionId() !== Defaults::LIVE_VERSION) {
  58.             return;
  59.         }
  60.         foreach ($event->getCommands() as $command) {
  61.             if (!$command instanceof ChangeSetAware) {
  62.                 continue;
  63.             }
  64.             /** @var ChangeSetAware|InsertCommand|UpdateCommand $command */
  65.             if ($command->getDefinition()->getEntityName() !== OrderLineItemDefinition::ENTITY_NAME) {
  66.                 continue;
  67.             }
  68.             if ($command instanceof InsertCommand) {
  69.                 continue;
  70.             }
  71.             if ($command instanceof DeleteCommand) {
  72.                 $command->requestChangeSet();
  73.                 continue;
  74.             }
  75.             /** @var WriteCommand&ChangeSetAware $command */
  76.             if ($command->hasField('referenced_id') || $command->hasField('product_id') || $command->hasField('quantity')) {
  77.                 $command->requestChangeSet();
  78.             }
  79.         }
  80.     }
  81.     public function orderPlaced(CheckoutOrderPlacedEvent $event): void
  82.     {
  83.         $ids = [];
  84.         foreach ($event->getOrder()->getLineItems() as $lineItem) {
  85.             if ($lineItem->getType() !== LineItem::PRODUCT_LINE_ITEM_TYPE) {
  86.                 continue;
  87.             }
  88.             if (!\array_key_exists($lineItem->getReferencedId(), $ids)) {
  89.                 $ids[$lineItem->getReferencedId()] = 0;
  90.             }
  91.             $ids[$lineItem->getReferencedId()] += $lineItem->getQuantity();
  92.         }
  93.         // order placed event is a high load event. Because of the high load, we simply reduce the quantity here instead of executing the high costs `update` function
  94.         $query = new RetryableQuery(
  95.             $this->connection,
  96.             $this->connection->prepare('UPDATE product SET available_stock = available_stock - :quantity WHERE id = :id')
  97.         );
  98.         Profiler::trace('order::update-stock', static function () use ($query$ids): void {
  99.             foreach ($ids as $id => $quantity) {
  100.                 $query->execute(['id' => Uuid::fromHexToBytes((string) $id), 'quantity' => $quantity]);
  101.             }
  102.         });
  103.         Profiler::trace('order::update-flag', function () use ($ids$event): void {
  104.             $this->updateAvailableFlag(\array_keys($ids), $event->getContext());
  105.         });
  106.     }
  107.     /**
  108.      * If the product of an order item changed, the stocks of the old product and the new product must be updated.
  109.      */
  110.     public function lineItemWritten(EntityWrittenEvent $event): void
  111.     {
  112.         $ids = [];
  113.         // we don't want to trigger to `update` method when we are inside the order process
  114.         if ($event->getContext()->hasState('checkout-order-route')) {
  115.             return;
  116.         }
  117.         foreach ($event->getWriteResults() as $result) {
  118.             if ($result->hasPayload('referencedId') && $result->getProperty('type') === LineItem::PRODUCT_LINE_ITEM_TYPE) {
  119.                 $ids[] = $result->getProperty('referencedId');
  120.             }
  121.             if ($result->getOperation() === EntityWriteResult::OPERATION_INSERT) {
  122.                 continue;
  123.             }
  124.             $changeSet $result->getChangeSet();
  125.             if (!$changeSet) {
  126.                 continue;
  127.             }
  128.             $type $changeSet->getBefore('type');
  129.             if ($type !== LineItem::PRODUCT_LINE_ITEM_TYPE) {
  130.                 continue;
  131.             }
  132.             if (!$changeSet->hasChanged('referenced_id') && !$changeSet->hasChanged('quantity')) {
  133.                 continue;
  134.             }
  135.             $ids[] = $changeSet->getBefore('referenced_id');
  136.             $ids[] = $changeSet->getAfter('referenced_id');
  137.         }
  138.         $ids array_filter(array_unique($ids));
  139.         if (empty($ids)) {
  140.             return;
  141.         }
  142.         $this->update($ids$event->getContext());
  143.     }
  144.     public function stateChanged(StateMachineTransitionEvent $event): void
  145.     {
  146.         if ($event->getContext()->getVersionId() !== Defaults::LIVE_VERSION) {
  147.             return;
  148.         }
  149.         if ($event->getEntityName() !== 'order') {
  150.             return;
  151.         }
  152.         if ($event->getToPlace()->getTechnicalName() === OrderStates::STATE_COMPLETED) {
  153.             $this->decreaseStock($event);
  154.             return;
  155.         }
  156.         if ($event->getFromPlace()->getTechnicalName() === OrderStates::STATE_COMPLETED) {
  157.             $this->increaseStock($event);
  158.             return;
  159.         }
  160.         if ($event->getToPlace()->getTechnicalName() === OrderStates::STATE_CANCELLED || $event->getFromPlace()->getTechnicalName() === OrderStates::STATE_CANCELLED) {
  161.             $products $this->getProductsOfOrder($event->getEntityId());
  162.             $ids array_column($products'referenced_id');
  163.             $this->updateAvailableStockAndSales($ids$event->getContext());
  164.             $this->updateAvailableFlag($ids$event->getContext());
  165.             return;
  166.         }
  167.     }
  168.     public function update(array $idsContext $context): void
  169.     {
  170.         if ($context->getVersionId() !== Defaults::LIVE_VERSION) {
  171.             return;
  172.         }
  173.         $this->updateAvailableStockAndSales($ids$context);
  174.         $this->updateAvailableFlag($ids$context);
  175.     }
  176.     private function increaseStock(StateMachineTransitionEvent $event): void
  177.     {
  178.         $products $this->getProductsOfOrder($event->getEntityId());
  179.         $ids array_column($products'referenced_id');
  180.         $this->updateStock($products, +1);
  181.         $this->updateAvailableStockAndSales($ids$event->getContext());
  182.         $this->updateAvailableFlag($ids$event->getContext());
  183.     }
  184.     private function decreaseStock(StateMachineTransitionEvent $event): void
  185.     {
  186.         $products $this->getProductsOfOrder($event->getEntityId());
  187.         $ids array_column($products'referenced_id');
  188.         $this->updateStock($products, -1);
  189.         $this->updateAvailableStockAndSales($ids$event->getContext());
  190.         $this->updateAvailableFlag($ids$event->getContext());
  191.     }
  192.     private function updateAvailableStockAndSales(array $idsContext $context): void
  193.     {
  194.         $ids array_filter(array_keys(array_flip($ids)));
  195.         if (empty($ids)) {
  196.             return;
  197.         }
  198.         $sql '
  199. SELECT LOWER(HEX(order_line_item.product_id)) as product_id,
  200.     IFNULL(
  201.         SUM(IF(state_machine_state.technical_name = :completed_state, 0, order_line_item.quantity)),
  202.         0
  203.     ) as open_quantity,
  204.     IFNULL(
  205.         SUM(IF(state_machine_state.technical_name = :completed_state, order_line_item.quantity, 0)),
  206.         0
  207.     ) as sales_quantity
  208. FROM order_line_item
  209.     INNER JOIN `order`
  210.         ON `order`.id = order_line_item.order_id
  211.         AND `order`.version_id = order_line_item.order_version_id
  212.     INNER JOIN state_machine_state
  213.         ON state_machine_state.id = `order`.state_id
  214.         AND state_machine_state.technical_name <> :cancelled_state
  215. WHERE order_line_item.product_id IN (:ids)
  216.     AND order_line_item.type = :type
  217.     AND order_line_item.version_id = :version
  218.     AND order_line_item.product_id IS NOT NULL
  219. GROUP BY product_id;
  220.         ';
  221.         $rows $this->connection->fetchAllAssociative(
  222.             $sql,
  223.             [
  224.                 'type' => LineItem::PRODUCT_LINE_ITEM_TYPE,
  225.                 'version' => Uuid::fromHexToBytes($context->getVersionId()),
  226.                 'completed_state' => OrderStates::STATE_COMPLETED,
  227.                 'cancelled_state' => OrderStates::STATE_CANCELLED,
  228.                 'ids' => Uuid::fromHexToBytesList($ids),
  229.             ],
  230.             [
  231.                 'ids' => Connection::PARAM_STR_ARRAY,
  232.             ]
  233.         );
  234.         $fallback array_column($rows'product_id');
  235.         $fallback array_diff($ids$fallback);
  236.         $update = new RetryableQuery(
  237.             $this->connection,
  238.             $this->connection->prepare('UPDATE product SET available_stock = stock - :open_quantity, sales = :sales_quantity, updated_at = :now WHERE id = :id')
  239.         );
  240.         foreach ($fallback as $id) {
  241.             $update->execute([
  242.                 'id' => Uuid::fromHexToBytes((string) $id),
  243.                 'open_quantity' => 0,
  244.                 'sales_quantity' => 0,
  245.                 'now' => (new \DateTime())->format(Defaults::STORAGE_DATE_TIME_FORMAT),
  246.             ]);
  247.         }
  248.         foreach ($rows as $row) {
  249.             $update->execute([
  250.                 'id' => Uuid::fromHexToBytes($row['product_id']),
  251.                 'open_quantity' => $row['open_quantity'],
  252.                 'sales_quantity' => $row['sales_quantity'],
  253.                 'now' => (new \DateTime())->format(Defaults::STORAGE_DATE_TIME_FORMAT),
  254.             ]);
  255.         }
  256.     }
  257.     private function updateAvailableFlag(array $idsContext $context): void
  258.     {
  259.         $ids array_filter(array_unique($ids));
  260.         if (empty($ids)) {
  261.             return;
  262.         }
  263.         $bytes Uuid::fromHexToBytesList($ids);
  264.         $sql '
  265.             UPDATE product
  266.             LEFT JOIN product parent
  267.                 ON parent.id = product.parent_id
  268.                 AND parent.version_id = product.version_id
  269.             SET product.available = IFNULL((
  270.                 IFNULL(product.is_closeout, parent.is_closeout) * product.available_stock
  271.                 >=
  272.                 IFNULL(product.is_closeout, parent.is_closeout) * IFNULL(product.min_purchase, parent.min_purchase)
  273.             ), 0)
  274.             WHERE product.id IN (:ids)
  275.             AND product.version_id = :version
  276.         ';
  277.         RetryableQuery::retryable($this->connection, function () use ($sql$context$bytes): void {
  278.             $this->connection->executeUpdate(
  279.                 $sql,
  280.                 ['ids' => $bytes'version' => Uuid::fromHexToBytes($context->getVersionId())],
  281.                 ['ids' => Connection::PARAM_STR_ARRAY]
  282.             );
  283.         });
  284.         $updated $this->connection->fetchFirstColumn(
  285.             'SELECT LOWER(HEX(id)) FROM product WHERE available = 0 AND id IN (:ids) AND product.version_id = :version',
  286.             ['ids' => $bytes'version' => Uuid::fromHexToBytes($context->getVersionId())],
  287.             ['ids' => Connection::PARAM_STR_ARRAY]
  288.         );
  289.         if (!empty($updated)) {
  290.             $this->dispatcher->dispatch(new ProductNoLongerAvailableEvent($updated$context));
  291.         }
  292.     }
  293.     private function updateStock(array $productsint $multiplier): void
  294.     {
  295.         $query = new RetryableQuery(
  296.             $this->connection,
  297.             $this->connection->prepare('UPDATE product SET stock = stock + :quantity WHERE id = :id AND version_id = :version')
  298.         );
  299.         foreach ($products as $product) {
  300.             $query->execute([
  301.                 'quantity' => (int) $product['quantity'] * $multiplier,
  302.                 'id' => Uuid::fromHexToBytes($product['referenced_id']),
  303.                 'version' => Uuid::fromHexToBytes(Defaults::LIVE_VERSION),
  304.             ]);
  305.         }
  306.     }
  307.     private function getProductsOfOrder(string $orderId): array
  308.     {
  309.         $query $this->connection->createQueryBuilder();
  310.         $query->select(['referenced_id''quantity']);
  311.         $query->from('order_line_item');
  312.         $query->andWhere('type = :type');
  313.         $query->andWhere('order_id = :id');
  314.         $query->andWhere('version_id = :version');
  315.         $query->setParameter('id'Uuid::fromHexToBytes($orderId));
  316.         $query->setParameter('version'Uuid::fromHexToBytes(Defaults::LIVE_VERSION));
  317.         $query->setParameter('type'LineItem::PRODUCT_LINE_ITEM_TYPE);
  318.         return $query->execute()->fetchAll(\PDO::FETCH_ASSOC);
  319.     }
  320. }