diff --git a/spec/Extraload/Extractor/QueuedExtractorSpec.php b/spec/Extraload/Extractor/QueuedExtractorSpec.php index 1716849..a425f17 100644 --- a/spec/Extraload/Extractor/QueuedExtractorSpec.php +++ b/spec/Extraload/Extractor/QueuedExtractorSpec.php @@ -4,15 +4,16 @@ use PhpSpec\ObjectBehavior; use Prophecy\Argument; -use PhpAmqpLib\Channel\AMQPChannel; +use Ko\AmqpBroker; +use Ko\RabbitMq\Producer; use Extraload\Extractor\ExtractorInterface; use Extraload\Extractor\ExtractorIteratorInterface; class QueuedExtractorSpec extends ObjectBehavior { - function let(ExtractorIteratorInterface $extractor, AMQPChannel $channel) + function let(ExtractorIteratorInterface $extractor, AmqpBroker $broker) { - $this->beConstructedWith($extractor, $channel, 'extracted'); + $this->beConstructedWith($extractor, $broker, 'extracted'); } function it_is_initializable() @@ -25,11 +26,11 @@ function it_implements_extractor_interface() $this->shouldHaveType('Extraload\Extractor\ExtractorInterface'); } - function it_publihes_messages_to_given_channel(ExtractorIteratorInterface $extractor, AMQPChannel $channel) + function it_publihes_messages_to_given_channel(ExtractorIteratorInterface $extractor, AmqpBroker $broker, Producer $producer) { $extractor->extract()->shouldBeCalled()->willReturn(['foo' => 'bar'], null); - $channel->basic_publish(Argument::any(), null, 'extracted')->shouldBeCalled(); - $channel->close()->shouldBeCalled(); + $broker->getProducer('extracted')->shouldBeCalled()->willReturn($producer); + $producer->publish(serialize(['foo' => 'bar'])); $this->extract(); } diff --git a/spec/Extraload/Loader/ConsoleLoaderSpec.php b/spec/Extraload/Loader/ConsoleLoaderSpec.php index f9398a6..c62db30 100644 --- a/spec/Extraload/Loader/ConsoleLoaderSpec.php +++ b/spec/Extraload/Loader/ConsoleLoaderSpec.php @@ -32,6 +32,7 @@ function it_loads_data_in_console_using_table_helper(Table $table) function it_renders_data_in_console_on_flush(Table $table) { $table->render()->shouldBeCalled(); + $table->setRows([])->shouldBeCalled(); $this->flush(); } diff --git a/spec/Extraload/Loader/MessageLoaderSpec.php b/spec/Extraload/Loader/MessageLoaderSpec.php index 4f63983..d46c5ed 100644 --- a/spec/Extraload/Loader/MessageLoaderSpec.php +++ b/spec/Extraload/Loader/MessageLoaderSpec.php @@ -5,7 +5,6 @@ use PhpSpec\ObjectBehavior; use Prophecy\Argument; use Extraload\Loader\LoaderInterface; -use PhpAmqpLib\Message\AMQPMessage; class MessageLoaderSpec extends ObjectBehavior { @@ -29,11 +28,12 @@ function it_throws_exception_if_not_loading_message() $this->shouldThrow('InvalidArgumentException')->duringLoad(['foo' => 'bar']); } - function it_loads_data_from_message_using_given_loader(LoaderInterface $loader) + function it_loads_data_from_message_using_given_loader(LoaderInterface $loader, \AMQPEnvelope $envelope) { - $message = new AMQPMessage(serialize(['foo' => 'bar'])); + $envelope->getBody()->shouldBeCalled()->willReturn(serialize(['foo' => 'bar'])); $loader->load(['foo' => 'bar'])->shouldBeCalled(); + $loader->flush()->shouldBeCalled(); - $this->load($message); + $this->load($envelope); } } diff --git a/spec/Extraload/Loader/QueuedLoaderSpec.php b/spec/Extraload/Loader/QueuedLoaderSpec.php index e02c68e..399e022 100644 --- a/spec/Extraload/Loader/QueuedLoaderSpec.php +++ b/spec/Extraload/Loader/QueuedLoaderSpec.php @@ -5,13 +5,14 @@ use PhpSpec\ObjectBehavior; use Prophecy\Argument; use Extraload\Loader\LoaderInterface; -use PhpAmqpLib\Channel\AMQPChannel; +use Ko\AmqpBroker; +use Ko\RabbitMq\Consumer; class QueuedLoaderSpec extends ObjectBehavior { - function let(LoaderInterface $loader, AMQPChannel $channel) + function let(LoaderInterface $loader, AmqpBroker $broker) { - $this->beConstructedWith($loader, $channel, 'transformed'); + $this->beConstructedWith($loader, $broker, 'transformed'); } function it_is_initializable() @@ -24,8 +25,11 @@ function it_implements_loader_interface() $this->shouldImplement('Extraload\Loader\LoaderInterface'); } - function it_publihes_messages_from_given_channel(LoaderInterface $loader, AMQPChannel $channel) + function it_publihes_messages_from_given_channel(LoaderInterface $loader, AmqpBroker $broker, Consumer $consumer) { - $channel->basic_consume('transformed', '', false, false, false, false, [$loader, 'load']); + $broker->getConsumer('transformed')->shouldBeCalled()->willReturn($consumer); + $consumer->consume([$loader, 'load'], AMQP_AUTOACK); + + $this->load(); } } diff --git a/spec/Extraload/Pipeline/QueuedPipelineSpec.php b/spec/Extraload/Pipeline/QueuedPipelineSpec.php index 3998a59..646b512 100644 --- a/spec/Extraload/Pipeline/QueuedPipelineSpec.php +++ b/spec/Extraload/Pipeline/QueuedPipelineSpec.php @@ -7,7 +7,6 @@ use Extraload\Extractor\QueuedExtractor; use Extraload\Transformer\TransformerInterface;; use Extraload\Loader\QueuedLoader; -use PhpAmqpLib\Connection\AbstractConnection; use Ko\ProcessManager; class QueuedPipelineSpec extends ObjectBehavior @@ -16,11 +15,10 @@ function let( QueuedExtractor $extractor, TransformerInterface $transformer, QueuedLoader $loader, - ProcessManager $processManager, - AbstractConnection $connection + ProcessManager $processManager ) { - $this->beConstructedWith($extractor, $transformer, $loader, $processManager, $connection); + $this->beConstructedWith($extractor, $transformer, $loader, $processManager); } function it_is_initializable() diff --git a/src/Extraload/Extractor/QueuedExtractor.php b/src/Extraload/Extractor/QueuedExtractor.php index 1cd161b..3a7d8b4 100644 --- a/src/Extraload/Extractor/QueuedExtractor.php +++ b/src/Extraload/Extractor/QueuedExtractor.php @@ -2,8 +2,6 @@ namespace Extraload\Extractor; -use PhpAmqpLib\Channel\AMQPChannel; -use PhpAmqpLib\Message\AMQPMessage; use Ko\AmqpBroker; class QueuedExtractor implements ExtractorInterface diff --git a/src/Extraload/Loader/QueuedLoader.php b/src/Extraload/Loader/QueuedLoader.php index 68ada98..bb49b42 100644 --- a/src/Extraload/Loader/QueuedLoader.php +++ b/src/Extraload/Loader/QueuedLoader.php @@ -2,7 +2,6 @@ namespace Extraload\Loader; -use PhpAmqpLib\Channel\AMQPChannel; use Ko\AmqpBroker; class QueuedLoader extends AutoFlushLoader implements LoaderInterface