Skip to content

Commit

Permalink
Optimized code for hyperf/amqp. (#6045)
Browse files Browse the repository at this point in the history
Co-authored-by: 李铭昕 <[email protected]>
  • Loading branch information
gaichao168 and limingxinleo authored Aug 21, 2023
1 parent 7311f48 commit ad457f5
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 56 deletions.
6 changes: 3 additions & 3 deletions src/AMQPConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public function __construct(
$this->confirmPool = new Channel(static::CONFIRM_CHANNEL_POOL_LENGTH);
}

public function write($data)
public function write($data): void
{
$this->loop();

Expand All @@ -105,7 +105,7 @@ public function setParams(Params $params): static
return $this;
}

public function getIO()
public function getIO(): AbstractIO
{
return $this->io;
}
Expand All @@ -124,7 +124,7 @@ public function getChannel(): AMQPChannel
return $this->channel($id);
}

public function channel($channel_id = null)
public function channel($channel_id = null): AMQPChannel
{
$this->channelManager->close($channel_id);
$this->channelManager->get($channel_id, true);
Expand Down
4 changes: 2 additions & 2 deletions src/ConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ class ConnectionFactory
/**
* @var AMQPConnection[][]
*/
protected $connections = [];
protected array $connections = [];

public function __construct(protected ContainerInterface $container)
{
$this->config = $this->container->get(ConfigInterface::class);
}

public function refresh(string $pool)
public function refresh(string $pool): void
{
$config = $this->getConfig($pool);
$count = $config['pool']['connections'] ?? 1;
Expand Down
2 changes: 1 addition & 1 deletion src/ConsumerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

class ConsumerFactory
{
public function __invoke(ContainerInterface $container)
public function __invoke(ContainerInterface $container): Consumer
{
return new Consumer(
$container,
Expand Down
12 changes: 3 additions & 9 deletions src/ConsumerManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public function __construct(private ContainerInterface $container)
{
}

public function run()
public function run(): void
{
$classes = AnnotationCollector::getClassesByAnnotation(ConsumerAnnotation::class);
/**
Expand Down Expand Up @@ -57,15 +57,9 @@ public function run()
private function createProcess(ConsumerMessageInterface $consumerMessage): AbstractProcess
{
return new class($this->container, $consumerMessage) extends AbstractProcess {
/**
* @var \Hyperf\Amqp\Consumer
*/
private $consumer;
private Consumer $consumer;

/**
* @var ConsumerMessageInterface
*/
private $consumerMessage;
private ConsumerMessageInterface $consumerMessage;

public function __construct(ContainerInterface $container, ConsumerMessageInterface $consumerMessage)
{
Expand Down
24 changes: 14 additions & 10 deletions src/IO/SwooleIO.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use InvalidArgumentException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Wire\AMQPWriter;
use PhpAmqpLib\Wire\IO\AbstractIO;
Expand Down Expand Up @@ -58,12 +59,12 @@ public function __construct(
*
* @throws AMQPRuntimeException
*/
public function connect()
public function connect(): void
{
$this->sock = $this->makeClient();
}

public function read($len)
public function read($len): string
{
$data = $this->sock->recvAll($len, $this->readWriteTimeout);
if ($data === false || strlen($data) !== $len) {
Expand All @@ -73,7 +74,7 @@ public function read($len)
return $data;
}

public function write($data)
public function write($data): void
{
$len = $this->sock->sendAll($data, $this->readWriteTimeout);

Expand All @@ -87,27 +88,27 @@ public function check_heartbeat()
{
}

public function close()
public function close(): void
{
$this->sock && $this->sock->close();
}

public function select(?int $sec, int $usec = 0)
public function select(?int $sec, int $usec = 0): int
{
return 1;
}

public function disableHeartbeat()
public function disableHeartbeat(): AbstractIO
{
return $this;
}

public function reenableHeartbeat()
public function reenableHeartbeat(): AbstractIO
{
return $this;
}

protected function makeClient()
protected function makeClient(): Socket
{
$sock = new Socket(AF_INET, SOCK_STREAM, 0);

Expand All @@ -124,7 +125,10 @@ protected function makeClient()
return $sock;
}

protected function write_heartbeat()
/**
* @throws AMQPIOException
*/
protected function write_heartbeat(): void
{
$pkt = new AMQPWriter();
$pkt->write_octet(8);
Expand All @@ -134,7 +138,7 @@ protected function write_heartbeat()
$this->write($pkt->getvalue());
}

protected function do_select($sec, $usec)
protected function do_select($sec, $usec): int
{
return 1;
}
Expand Down
20 changes: 10 additions & 10 deletions src/IO/SwowIO.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ public function __construct(
*
* @throws AMQPRuntimeException
*/
public function connect()
public function connect(): void
{
$this->sock = $this->makeClient();
}

public function read($len)
public function read($len): string
{
$data = $this->sock->recvAll($len, $this->readWriteTimeout);
if ($data === false || strlen($data) !== $len) {
Expand All @@ -74,7 +74,7 @@ public function read($len)
return $data;
}

public function write($data)
public function write($data): void
{
$len = $this->sock->sendAll($data, $this->readWriteTimeout);

Expand All @@ -88,27 +88,27 @@ public function check_heartbeat()
{
}

public function close()
public function close(): void
{
$this->sock && $this->sock->close();
}

public function select(?int $sec, int $usec = 0)
public function select(?int $sec, int $usec = 0): int
{
return 1;
}

public function disableHeartbeat()
public function disableHeartbeat(): AbstractIO
{
return $this;
}

public function reenableHeartbeat()
public function reenableHeartbeat(): AbstractIO
{
return $this;
}

protected function makeClient()
protected function makeClient(): Socket
{
$sock = new Socket(Socket::TYPE_TCP);

Expand All @@ -125,7 +125,7 @@ protected function makeClient()
return $sock;
}

protected function write_heartbeat()
protected function write_heartbeat(): void
{
$pkt = new AMQPWriter();
$pkt->write_octet(8);
Expand All @@ -135,7 +135,7 @@ protected function write_heartbeat()
$this->write($pkt->getvalue());
}

protected function do_select($sec, $usec)
protected function do_select($sec, $usec): int
{
return 1;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Message/ConsumerMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public function getContainer(): ?ContainerInterface
return $this->container;
}

protected function reply($data, AMQPMessage $message)
protected function reply(mixed $data, AMQPMessage $message): void
{
$packer = ApplicationContext::getContainer()->get(Packer::class);

Expand Down
2 changes: 1 addition & 1 deletion src/Message/ProducerDelayedMessageTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ trait ProducerDelayedMessageTrait
* Set the delay time.
* @return $this
*/
public function setDelayMs(int $millisecond, string $name = 'x-delay'): self
public function setDelayMs(int $millisecond, string $name = 'x-delay'): static
{
$this->properties['application_headers'] = new AMQPTable([$name => $millisecond]);
return $this;
Expand Down
29 changes: 19 additions & 10 deletions src/Params.php
Original file line number Diff line number Diff line change
Expand Up @@ -116,64 +116,73 @@ public function isCloseOnDestruct(): bool
return $this->closeOnDestruct;
}

public function setCloseOnDestruct(bool $closeOnDestruct)
public function setCloseOnDestruct(bool $closeOnDestruct): static
{
$this->closeOnDestruct = $closeOnDestruct;
return $this;
}

public function setInsist(bool $insist)
public function setInsist(bool $insist): static
{
$this->insist = $insist;
return $this;
}

public function setLoginMethod(string $loginMethod)
public function setLoginMethod(string $loginMethod): static
{
$this->loginMethod = $loginMethod;
return $this;
}

public function setLocale(string $locale)
public function setLocale(string $locale): static
{
$this->locale = $locale;
return $this;
}

public function setConnectionTimeout(int $connectionTimeout)
public function setConnectionTimeout(int $connectionTimeout): static
{
$this->connectionTimeout = $connectionTimeout;
return $this;
}

public function setReadWriteTimeout(int $readWriteTimeout)
public function setReadWriteTimeout(int $readWriteTimeout): static
{
$this->readWriteTimeout = $readWriteTimeout;
return $this;
}

public function setKeepalive(bool $keepalive)
public function setKeepalive(bool $keepalive): static
{
$this->keepalive = $keepalive;
return $this;
}

public function setHeartbeat(int $heartbeat)
public function setHeartbeat(int $heartbeat): static
{
$this->heartbeat = $heartbeat;
return $this;
}

public function getChannelRpcTimeout(): float
{
return $this->channelRpcTimeout;
}

public function setChannelRpcTimeout(float $channelRpcTimeout)
public function setChannelRpcTimeout(float $channelRpcTimeout): static
{
$this->channelRpcTimeout = $channelRpcTimeout;
return $this;
}

public function getMaxIdleChannels(): int
{
return $this->maxIdleChannels;
}

public function setMaxIdleChannels(int $maxIdleChannels)
public function setMaxIdleChannels(int $maxIdleChannels): static
{
$this->maxIdleChannels = $maxIdleChannels;
return $this;
}
}
4 changes: 2 additions & 2 deletions src/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public function produce(ProducerMessageInterface $producerMessage, bool $confirm
return retry(1, fn () => $this->produceMessage($producerMessage, $confirm, $timeout));
}

private function produceMessage(ProducerMessageInterface $producerMessage, bool $confirm = false, int $timeout = 5)
private function produceMessage(ProducerMessageInterface $producerMessage, bool $confirm = false, int $timeout = 5): bool
{
$result = false;

Expand Down Expand Up @@ -61,7 +61,7 @@ private function produceMessage(ProducerMessageInterface $producerMessage, bool
return $result;
}

private function injectMessageProperty(ProducerMessageInterface $producerMessage)
private function injectMessageProperty(ProducerMessageInterface $producerMessage): void
{
if (class_exists(AnnotationCollector::class)) {
/** @var null|\Hyperf\Amqp\Annotation\Producer $annotation */
Expand Down
7 changes: 2 additions & 5 deletions src/RpcChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,14 @@ public function setQueue(?string $queue): static
return $this;
}

/**
* @return AMQPMessage|false
*/
public function wait(int $timeout)
public function wait(int $timeout): bool|AMQPMessage
{
$this->channel->wait(null, false, $timeout);

return $this->chan->pop(0.001);
}

public function close()
public function close(): void
{
$this->chan?->close();
$this->channel->close();
Expand Down
4 changes: 2 additions & 2 deletions src/RpcClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

class RpcClient extends Builder
{
protected $poolChannels = [];
protected array $poolChannels = [];

public function __construct(ContainerInterface $container, ConnectionFactory $factory, protected int $maxChannels = 64)
{
Expand Down Expand Up @@ -75,7 +75,7 @@ public function call(RpcMessageInterface $rpcMessage, int $timeout = 5)
return $result;
}

protected function initChannel(RpcChannel $channel, QueueBuilder $builder)
protected function initChannel(RpcChannel $channel, QueueBuilder $builder): void
{
[$queue] = $channel->getChannel()->queue_declare(
$builder->getQueue(),
Expand Down

0 comments on commit ad457f5

Please sign in to comment.