Skip to content

Commit

Permalink
Added params connection_name which used to define amqp connection n…
Browse files Browse the repository at this point in the history
…ame.
  • Loading branch information
PandaLIU-1111 authored Jan 8, 2025
1 parent 45f1c42 commit db5e248
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 3 deletions.
1 change: 1 addition & 0 deletions publish/amqp.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
'channel_rpc_timeout' => 0.0,
'close_on_destruct' => false,
'max_idle_channels' => 10,
'connection_name' => null,
],
],
];
6 changes: 4 additions & 2 deletions src/AMQPConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Channel\Frame;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Connection\AMQPConnectionConfig;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Wire\AMQPWriter;
Expand Down Expand Up @@ -68,13 +69,14 @@ public function __construct(
?AbstractIO $io = null,
int $heartbeat = 0,
int $connection_timeout = 0,
float $channel_rpc_timeout = 0.0
float $channel_rpc_timeout = 0.0,
?AMQPConnectionConfig $config = null
) {
$this->channelManager = new ChannelManager(16);
$this->channelManager->get(0, true);
$this->chan = $this->channelManager->make(65535);

parent::__construct($user, $password, $vhost, $insist, $login_method, $login_response, $locale, $io, $heartbeat, $connection_timeout, $channel_rpc_timeout);
parent::__construct($user, $password, $vhost, $insist, $login_method, $login_response, $locale, $io, $heartbeat, $connection_timeout, $channel_rpc_timeout, $config);

$this->pool = new Channel(static::CHANNEL_POOL_LENGTH);
$this->confirmPool = new Channel(static::CONFIRM_CHANNEL_POOL_LENGTH);
Expand Down
10 changes: 9 additions & 1 deletion src/ConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Coroutine\Locker;
use InvalidArgumentException;
use PhpAmqpLib\Connection\AMQPConnectionConfig;
use PhpAmqpLib\Wire\IO\AbstractIO;
use Psr\Container\ContainerInterface;

Expand Down Expand Up @@ -91,6 +92,12 @@ public function make(array $config): AMQPConnection
$params = new Params(Arr::get($config, 'params', []));
$io = $this->makeIO($config, $params);

$amqpConfig = null;
if (! empty($params->getConnectionName())) {
$amqpConfig = new AMQPConnectionConfig();
$amqpConfig->setConnectionName($params->getConnectionName());
}

$connection = new AMQPConnection(
$user,
$password,
Expand All @@ -102,7 +109,8 @@ public function make(array $config): AMQPConnection
$io,
$params->getHeartbeat(),
$params->getConnectionTimeout(),
$params->getChannelRpcTimeout()
$params->getChannelRpcTimeout(),
$amqpConfig
);

return $connection->setParams($params)
Expand Down
17 changes: 17 additions & 0 deletions src/Params.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class Params

protected int $maxIdleChannels = 10;

protected ?string $connectionName = null;

public function __construct(array $data)
{
if (isset($data['insist'])) {
Expand Down Expand Up @@ -75,6 +77,10 @@ public function __construct(array $data)
if (isset($data['max_idle_channels'])) {
$this->setMaxIdleChannels((int) $data['max_idle_channels']);
}

if (isset($data['connection_name'])) {
$this->setConnectionName($data['connection_name']);
}
}

public function isInsist(): bool
Expand Down Expand Up @@ -186,4 +192,15 @@ public function setMaxIdleChannels(int $maxIdleChannels): static
$this->maxIdleChannels = $maxIdleChannels;
return $this;
}

public function getConnectionName(): ?string
{
return $this->connectionName;
}

public function setConnectionName(?string $connectionName): static
{
$this->connectionName = $connectionName;
return $this;
}
}

0 comments on commit db5e248

Please sign in to comment.