Skip to content

Commit

Permalink
Automatic declare exchange when produce message (#6696)
Browse files Browse the repository at this point in the history
  • Loading branch information
huangdijia authored Apr 18, 2024
1 parent a8f7788 commit 2436f4f
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 1 deletion.
33 changes: 33 additions & 0 deletions src/DeclaredExchanges.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact [email protected]
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/

namespace Hyperf\Amqp;

class DeclaredExchanges
{
private static array $exchanges = [];

public static function add(string $exchange): void
{
self::$exchanges[$exchange] = true;
}

public static function remove(string $exchange): void
{
unset(self::$exchanges[$exchange]);
}

public static function has(string $exchange): bool
{
return isset(self::$exchanges[$exchange]);
}
}
3 changes: 3 additions & 0 deletions src/Listener/MainWorkerStartListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use Doctrine\Instantiator\Instantiator;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\DeclaredExchanges;
use Hyperf\Amqp\Message\ProducerMessageInterface;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Contract\StdoutLoggerInterface;
Expand Down Expand Up @@ -70,13 +71,15 @@ public function process(object $event): void
$annotation->routingKey && $instance->setRoutingKey($annotation->routingKey);
try {
$producer->declare($instance);
DeclaredExchanges::add($instance->getExchange());
$routingKey = $instance->getRoutingKey();
if (is_array($routingKey)) {
$routingKey = implode(',', $routingKey);
}
$this->logger->debug(sprintf('AMQP exchange[%s] and routingKey[%s] were created successfully.', $instance->getExchange(), $routingKey));
} catch (AMQPProtocolChannelException $e) {
$this->logger->debug('AMQPProtocolChannelException: ' . $e->getMessage());
DeclaredExchanges::remove($instance->getExchange());
// Do nothing.
} catch (Throwable $exception) {
$this->logger->error((string) $exception);
Expand Down
14 changes: 13 additions & 1 deletion src/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,22 @@ private function produceMessage(ProducerMessageInterface $producerMessage, bool
$channel = $connection->getChannel();
}

$exchange = $producerMessage->getExchange();

if (! DeclaredExchanges::has($exchange)) {
try {
DeclaredExchanges::add($exchange);
$this->declare($producerMessage, $channel);
} catch (Throwable $exception) {
DeclaredExchanges::remove($exchange);
throw $exception;
}
}

$channel->set_ack_handler(function () use (&$result) {
$result = true;
});
$channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
$channel->basic_publish($message, $exchange, $producerMessage->getRoutingKey());
$channel->wait_for_pending_acks_returns($timeout);
} catch (Throwable $exception) {
isset($channel) && $channel->close();
Expand Down

0 comments on commit 2436f4f

Please sign in to comment.