Skip to content

Commit

Permalink
Merge branch 'SSM-116'
Browse files Browse the repository at this point in the history
  • Loading branch information
mmartinezf committed Jan 11, 2019
2 parents 76b8a72 + 8bb5091 commit 47120dd
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 18 deletions.
1 change: 1 addition & 0 deletions src/Cmp/Queues/Domain/Event/DomainEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class DomainEvent implements Message
* @param string $id
* @param bool $isDeprecated
* @param string|null $correlationId
* @throws DomainEventException
*/
public function __construct(
$origin,
Expand Down
32 changes: 29 additions & 3 deletions src/Cmp/Queues/Domain/Event/Subscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class Subscriber
/**
* Subscriber constructor.
* @param QueueReader $queueReader
* @param LoggerInterface $logger
*/
public function __construct(QueueReader $queueReader, LoggerInterface $logger)
{
Expand All @@ -44,11 +45,14 @@ public function subscribe(EventSubscriptor $eventSubscriptor)
return $this;
}

/**
* @param int $timeout
* @throws DomainEventException
* @throws \Cmp\Queues\Domain\Queue\Exception\ReaderException
*/
public function start($timeout=0)
{
if(!isset($this->subscriptors[0])) {
throw new DomainEventException('You must add at least 1 EventSubscriptor in order to publish start reading from queue.');
}
$this->checkHasSubscriptors();

while(true) {
try {
Expand All @@ -61,6 +65,18 @@ public function start($timeout=0)
}
}

/**
* @param int $timeout
* @throws DomainEventException
* @throws \Cmp\Queues\Domain\Queue\Exception\ReaderException
*/
public function batch($timeout=0)
{
$this->checkHasSubscriptors();

$this->queueReader->read(array($this, 'notify'), $timeout);
}

/**
* @param DomainEvent $domainEvent
*/
Expand All @@ -81,4 +97,14 @@ public function getSubscriptors()
{
return $this->subscriptors;
}

/**
* @throws DomainEventException
*/
private function checkHasSubscriptors()
{
if(empty($this->getSubscriptors())) {
throw new DomainEventException('You must add at least 1 EventSubscriptor in order to publish start reading from queue.');
}
}
}
9 changes: 2 additions & 7 deletions src/Cmp/Queues/Domain/Queue/QueueReader.php
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
<?php
/**
* Created by PhpStorm.
* User: quimmanrique
* Date: 13/02/17
* Time: 17:32
*/

namespace Cmp\Queues\Domain\Queue;

Expand All @@ -14,8 +8,9 @@ interface QueueReader
{
/**
* @param callable $callback
* @throws ReaderException
* @param int $timeout
* @return void
* @throws ReaderException
*/
public function read(callable $callback, $timeout=0);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

namespace Cmp\Queues\Domain\Task\Exception;

class ParseMessageException extends \Exception
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

namespace Cmp\Queues\Infrastructure\AWS\v20121105\Queue;

use Cmp\Queues\Domain\Event\Exception\InvalidJSONDomainEventException;
use Cmp\Queues\Domain\Queue\Exception\InvalidJSONMessageException;
use Cmp\Queues\Domain\Queue\Exception\ReaderException;
use Cmp\Queues\Domain\Queue\JSONMessageFactory;
use Cmp\Queues\Domain\Task\Exception\ParseMessageException;
use Exception;

class MessageHandler
{
Expand All @@ -28,6 +32,7 @@ public function __construct(JSONMessageFactory $jsonMessageFactory)
/**
* @param array $message
*
* @throws ParseMessageException
* @throws ReaderException
*/
public function handleMessage(array $message)
Expand All @@ -36,9 +41,23 @@ public function handleMessage(array $message)
throw new ReaderException("Handling a message with no callback set");
}

$body = json_decode($message['Body'], true);
$task = $this->jsonMessageFactory->create($body['Message']);
call_user_func($this->callback, $task);
try{

if (!isset($message['Body'])) {
throw new InvalidJSONMessageException('Undefined index key Body: ' . print_r($message, true));
}

$body = json_decode($message['Body'], true);

if (!isset($body['Message'])) {
throw new InvalidJSONMessageException('Undefined index key Message: ' . print_r($body, true));
}

call_user_func($this->callback, $this->jsonMessageFactory->create($body['Message']));

} catch(InvalidJSONMessageException $e) {
throw new ParseMessageException(json_encode($message),0, $e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
use Cmp\Queues\Domain\Queue\Exception\GracefulStopException;
use Cmp\Queues\Domain\Queue\Exception\ReaderException;
use Cmp\Queues\Domain\Queue\Exception\TimeoutReaderException;
use Cmp\Queues\Domain\Task\Exception\ParseMessageException;
use Cmp\Queues\Domain\Queue\QueueReader as DomainQueueReader;

use Psr\Log\LoggerInterface;

class QueueReader implements DomainQueueReader
Expand Down Expand Up @@ -59,18 +61,21 @@ public function __construct(

/**
* @param callable $callback
* @param int $timeout
* @param int $timeout
*
* @throws GracefulStopException
* @throws TimeoutReaderException
* @throws ParseMessageException
* @throws ReaderException
* @throws TimeoutReaderException
*/
public function read(callable $callback, $timeout=0)
{
$this->messageHandler->setCallback($callback);

try {
$this->consume($timeout);
} catch(ParseMessageException $e){
throw $e;
} catch(GracefulStopException $e) {
$this->logger->info("Gracefully stopping the AWS queue reader", ["exception" => $e]);
throw $e;
Expand All @@ -94,7 +99,9 @@ public function purge()
/**
* @param int $timeout
*
* @throws ReaderException
* @throws TimeoutReaderException
* @throws ParseMessageException
*/
protected function consume($timeout)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
<?php
namespace Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue;

use Cmp\Queues\Domain\Queue\Exception\InvalidJSONMessageException;
use Cmp\Queues\Domain\Queue\Exception\ReaderException;
use Cmp\Queues\Domain\Queue\JSONMessageFactory;
use Cmp\Queues\Domain\Task\Exception\ParseMessageException;
use Exception;
use PhpAmqpLib\Message\AMQPMessage;

class MessageHandler
Expand Down Expand Up @@ -36,9 +39,13 @@ public function handleMessage(AMQPMessage $message)
throw new ReaderException("Handling a message with no callback set");
}

$task = $this->jsonMessageFactory->create($message->body);
call_user_func($this->callback, $task);
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
try {
$task = $this->jsonMessageFactory->create($message->body);
call_user_func($this->callback, $task);
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
} catch(InvalidJSONMessageException $e) {
throw new ParseMessageException(json_encode($message->getBody()), 0, $e);
}
}

public function setCallback(callable $callback)
Expand Down

0 comments on commit 47120dd

Please sign in to comment.