diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index f1f397441..80a3b4c92 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -38,6 +38,11 @@ class DbalConsumer implements Consumer */ private $redeliveryDelay; + /** + * @var int + */ + private $deliveryDelay; + public function __construct(DbalContext $context, DbalDestination $queue) { $this->context = $context; @@ -65,6 +70,19 @@ public function setRedeliveryDelay(int $redeliveryDelay): self return $this; } + /** + * @return int + */ + public function getDeliveryDelay(): ?int + { + return $this->deliveryDelay; + } + + public function setDeliveryDelay(int $deliveryDelay): void + { + $this->deliveryDelay = $deliveryDelay; + } + /** * @return DbalDestination */ @@ -104,6 +122,10 @@ public function reject(Message $message, bool $requeue = false): void $message = clone $message; $message->setRedelivered(false); + if(null !== $this->deliveryDelay && null === $message->getDeliveryDelay()) { + $message->setDeliveryDelay($this->deliveryDelay); + } + $this->getContext()->createProducer()->send($this->queue, $message); } diff --git a/pkg/redis/PhpRedis.php b/pkg/redis/PhpRedis.php index d6f5baffa..d161bd470 100644 --- a/pkg/redis/PhpRedis.php +++ b/pkg/redis/PhpRedis.php @@ -115,8 +115,8 @@ public function connect(): void $this->config['port'], $this->config['timeout'], $this->config['persistent'] ? ($this->config['phpredis_persistent_id'] ?? null) : null, - $this->config['phpredis_retry_interval'] ?? null, - $this->config['read_write_timeout'] + $this->config['phpredis_retry_interval'] ?? 0, + $this->config['read_write_timeout'] ?? 0 ); if (false == $result) { diff --git a/pkg/redis/RedisConsumer.php b/pkg/redis/RedisConsumer.php index 9c93b642a..eb4324637 100644 --- a/pkg/redis/RedisConsumer.php +++ b/pkg/redis/RedisConsumer.php @@ -28,6 +28,11 @@ class RedisConsumer implements Consumer */ private $redeliveryDelay = 300; + /** + * @var int + */ + private $deliveryDelay; + public function __construct(RedisContext $context, RedisDestination $queue) { $this->context = $context; @@ -47,6 +52,19 @@ public function setRedeliveryDelay(int $delay): void $this->redeliveryDelay = $delay; } + /** + * @return int + */ + public function getDeliveryDelay(): ?int + { + return $this->deliveryDelay; + } + + public function setDeliveryDelay(int $deliveryDelay): void + { + $this->deliveryDelay = $deliveryDelay; + } + /** * @return RedisDestination */ @@ -92,7 +110,7 @@ public function acknowledge(Message $message): void /** * @param RedisMessage $message */ - public function reject(Message $message, bool $requeue = false): void + public function reject(Message $message, bool $requeue = false, $delay = 0): void { InvalidMessageException::assertMessageInstanceOf($message, RedisMessage::class); @@ -102,13 +120,22 @@ public function reject(Message $message, bool $requeue = false): void $message = $this->getContext()->getSerializer()->toMessage($message->getReservedKey()); $message->setRedelivered(true); + if (null !== $this->deliveryDelay && null === $message->getDeliveryDelay()) { + $message->setDeliveryDelay($this->deliveryDelay); + } + if ($message->getTimeToLive()) { $message->setHeader('expires_at', time() + $message->getTimeToLive()); } - $payload = $this->getContext()->getSerializer()->toString($message); - - $this->getRedis()->lpush($this->queue->getName(), $payload); + if($message->getDeliveryDelay()) { + $deliveryAt = time() + $message->getDeliveryDelay() / 1000; + $payload = $this->getContext()->getSerializer()->toString($message); + $this->getRedis()->zadd($this->queue->getName().':delayed', $payload, $deliveryAt); + } else { + $payload = $this->getContext()->getSerializer()->toString($message); + $this->getRedis()->lpush($this->queue->getName(), $payload); + } } }