From 4f75c1084daee76f83c88f34a2cedbdf031848a4 Mon Sep 17 00:00:00 2001 From: Chimdi Azubuike Date: Sat, 20 Jun 2015 23:04:09 -0700 Subject: [PATCH] added example 6 rpc client/server --- php-amqp/rpc_client.php | 109 ++++++++++++++++++++++++++++++++++++++++ php-amqp/rpc_server.php | 88 ++++++++++++++++++++++++++++++++ 2 files changed, 197 insertions(+) create mode 100644 php-amqp/rpc_client.php create mode 100644 php-amqp/rpc_server.php diff --git a/php-amqp/rpc_client.php b/php-amqp/rpc_client.php new file mode 100644 index 00000000..c438023e --- /dev/null +++ b/php-amqp/rpc_client.php @@ -0,0 +1,109 @@ +#!/usr/bin/env php + + */ + +class FibonacciRpcClient { + private $connection; + private $channel; + + private $callbackQueueName; + private $queueName = 'rpc_queue'; + private $rpcQueue = 'rpc_queue'; + + + private $response; + + protected $queue; + protected $corrId; + + public function __construct() { + $this->connection = $this->getAMQPConnection(); + $this->setChannel(); + $this->setExchange(); + } + + /** + AMQP Connection + */ + protected function getAMQPConnection() { + $connection = new AMQPConnection(); + $connection->setHost('127.0.0.1'); + $connection->setLogin('guest'); + $connection->setPassword('guest'); + $connection->connect(); + return $connection; + } + + /** + Declare Channel + */ + protected function setChannel() { + $this->channel = new AMQPChannel($this->connection); + $this->channel->setPrefetchCount(1); + } + + /** + Declare Exchange + */ + protected function setExchange() { + $this->exchange = new AMQPExchange($this->channel); + } + + public function on_response(AMQPEnvelope $message, AMQPQueue $queue) { + print_r(func_get_args()); + } + + public function call($value) { + $this->response = NULL; + $this->corrId = uniqid(); + + try { + //Declare an nonymous channel + $this->queue = new AMQPQueue($this->channel); + $this->queue->setFlags(AMQP_EXCLUSIVE); + $this->queue->declareQueue(); + $this->callbackQueueName = $this->queue->getName(); + + //Set Publish Attributes + $attributes = array( + 'correlation_id' => $this->corrId, + 'reply_to' => $this->callbackQueueName + ); + + $this->exchange->publish( + $value, + $this->rpcQueue, + AMQP_NOPARAM, + $attributes + ); + + $callback = function(AMQPEnvelope $message, AMQPQueue $q) { + if($message->getCorrelationId() == $this->corrId) { + //echo sprintf("CorrelationID: %s",$message->getCorrelationId()), PHP_EOL; + //echo sprintf("Response: %s",$message->getBody()), PHP_EOL; + $this->response = $message->getBody(); + $q->nack($message->getDeliveryTag()); + return false; + } + }; + + $this->queue->consume($callback); + + //Return RPC Results + return $this->response; + } catch(AMQPQueueException $ex) { + print_r($ex); + } catch(Exception $ex) { + print_r($ex); + } + } +} + +$value = (isset($argv[1]))? $argv[1] : 5; +$fibonacciRpc = new FibonacciRpcClient(); +echo sprintf(" [x] Requesting fib(%s)",$value), PHP_EOL; +$response = $fibonacciRpc->call($value); +echo sprintf(" [.] Received: %s",$response), PHP_EOL; diff --git a/php-amqp/rpc_server.php b/php-amqp/rpc_server.php new file mode 100644 index 00000000..b4d1e78a --- /dev/null +++ b/php-amqp/rpc_server.php @@ -0,0 +1,88 @@ +#!/usr/bin/env php + + */ + +function fib($n) { + if($n == 0) + return 0; + if($n == 1) + return 1; + return fib($n - 1) + fib($n - 2); +} + +function fast_fib($n) { + if ($n < 0) + throw new Exception('Negative number not implemented'); + else + return fast_fib_calc($n)[0]; +} + + +function fast_fib_calc($n) { + if ($n == 0) + return array(0, 1); + else { + list($a,$b) = fast_fib_calc(floor($n/2)); + $c = $a * ($b * 2 - $a); + $d = $a * $a + $b * $b; + if (($n % 2) == 0) + return array($c, $d); + else + return array($d, $c + $d); + } +} + +//Establish connection to AMQP +$connection = new AMQPConnection(); +$connection->setHost('127.0.0.1'); +$connection->setLogin('guest'); +$connection->setPassword('guest'); +$connection->connect(); + + +//Declare Channel +$channel = new AMQPChannel($connection); +$channel->setPrefetchCount(1); + +$exchange = new AMQPExchange($channel); + +$queueName = 'rpc_queue'; +$queue = new AMQPQueue($channel); +$queue->setName($queueName); +$queue->declareQueue(); + + + +echo " [x] Awaiting RPC requests", PHP_EOL; +$callback_func = function(AMQPEnvelope $message, AMQPQueue $q) use (&$exchange) { + $n = intval($message->getBody()); + echo " [.] fib({$n})", PHP_EOL; + + $attributes = array( + 'correlation_id' => $message->getCorrelationId() + ); + + echo sprintf(" QueueName: %s", $q->getName()), PHP_EOL; + echo sprintf(" ReplyTo: %s", $message->getReplyTo()), PHP_EOL; + echo sprintf(" CorrelationID: %s", $message->getCorrelationId()), PHP_EOL; + + $exchange->publish( (string)fast_fib($n), + $message->getReplyTo(), + AMQP_NOPARAM, + $attributes + ); + + $q->nack($message->getDeliveryTag()); +}; + + +try { + $queue->consume($callback_func); +} catch(AMQPQueueException $ex) { + print_r($ex); +} catch(Exception $ex) { + print_r($ex); +}