From a0ae2021a4bcc822af1011824659ada12a514c20 Mon Sep 17 00:00:00 2001 From: John Pedrie Date: Fri, 19 Jul 2019 13:28:45 -0400 Subject: [PATCH] fix: Fix update subscription method (#2122) * fix: Fix update subscription method * Fix system test failure --- PubSub/src/Connection/Grpc.php | 45 +++------ PubSub/src/Subscription.php | 56 ++++++++++- PubSub/tests/Snippet/SubscriptionTest.php | 15 +++ .../tests/System/ManageSubscriptionsTest.php | 96 +++++++++++++++---- PubSub/tests/System/PubSubTestCase.php | 4 +- PubSub/tests/Unit/Connection/GrpcTest.php | 11 ++- PubSub/tests/Unit/SubscriptionTest.php | 25 +++-- 7 files changed, 179 insertions(+), 73 deletions(-) diff --git a/PubSub/src/Connection/Grpc.php b/PubSub/src/Connection/Grpc.php index 20c8c0967747..a73f747f2da8 100644 --- a/PubSub/src/Connection/Grpc.php +++ b/PubSub/src/Connection/Grpc.php @@ -185,18 +185,23 @@ public function createSubscription(array $args) */ public function updateSubscription(array $args) { - // Get a list of keys used before building subscription, which modifies $args - $mask = array_keys($args); - - // Remove immutable properties. - $mask = array_values(array_diff($mask, ['name', 'topic'])); + $updateMaskPaths = []; + foreach (explode(',', $this->pluck('updateMask', $args)) as $path) { + $updateMaskPaths[] = Serializer::toSnakeCase($path); + } - $fieldMask = $this->serializer->decodeMessage(new FieldMask(), ['paths' => $mask]); + $fieldMask = new FieldMask([ + 'paths' => $updateMaskPaths + ]); - $subscriptionObject = $this->buildSubscription($args); + $subscription = $this->serializer->decodeMessage( + new Subscription, + $this->pluck('subscription', $args) + ); + unset($args['name']); return $this->send([$this->subscriberClient, 'updateSubscription'], [ - $subscriptionObject, + $subscription, $fieldMask, $args ]); @@ -434,28 +439,4 @@ private function buildPushConfig(array $pushConfig) { return $this->serializer->decodeMessage(new PushConfig(), $pushConfig); } - - /** - * Create a Subscription proto message from an array of arguments. - * - * @param array $args - * @param bool $required - * @return Subscription - */ - private function buildSubscription(array &$args, $required = false) - { - $pushConfig = $this->pluck('pushConfig', $args, $required); - $pushConfig = $pushConfig - ? $this->buildPushConfig($pushConfig) - : null; - - return $this->serializer->decodeMessage(new Subscription(), array_filter([ - 'name' => $this->pluck('name', $args, $required), - 'topic' => $this->pluck('topic', $args, $required), - 'pushConfig' => $pushConfig, - 'ackDeadlineSeconds' => $this->pluck('ackDeadlineSeconds', $args, $required), - 'retainAckedMessages' => $this->pluck('retainAckedMessages', $args, $required), - 'messageRetentionDuration' => $this->pluck('messageRetentionDuration', $args, $required), - ])); - } } diff --git a/PubSub/src/Subscription.php b/PubSub/src/Subscription.php index 0b70412195b4..9e1f4c9eb824 100644 --- a/PubSub/src/Subscription.php +++ b/PubSub/src/Subscription.php @@ -17,14 +17,15 @@ namespace Google\Cloud\PubSub; -use Google\Cloud\Core\Exception\NotFoundException; +use Google\Cloud\Core\ArrayTrait; use Google\Cloud\Core\Duration; +use Google\Cloud\Core\Exception\NotFoundException; use Google\Cloud\Core\Iam\Iam; use Google\Cloud\Core\Timestamp; +use Google\Cloud\Core\ValidateTrait; use Google\Cloud\PubSub\Connection\ConnectionInterface; use Google\Cloud\PubSub\Connection\IamSubscription; use Google\Cloud\PubSub\IncomingMessageTrait; -use Google\Cloud\Core\ValidateTrait; use InvalidArgumentException; /** @@ -57,6 +58,7 @@ */ class Subscription { + use ArrayTrait; use IncomingMessageTrait; use ResourceNameTrait; use ValidateTrait; @@ -235,6 +237,25 @@ public function create(array $options = []) * ]); * ``` * + * ``` + * // Updating labels and push config attributes with explicit update masks. + * $subscription->update([ + * 'labels' => [ + * 'label-1' => 'value' + * ], + * 'pushConfig' => [ + * 'attributes' => [ + * 'x-goog-version' => 1 + * ] + * ] + * ], [ + * 'updateMask' => [ + * 'labels', + * 'pushConfig.attributes' + * ] + * ]); + * ``` + * * @param array $subscription { * The Subscription data. * @@ -257,15 +278,42 @@ public function create(array $options = []) * messages, and thus configures how far back in time a `Seek` * can be done. Cannot be more than 7 days or less than 10 minutes. * **Defaults to** 7 days. + * @type array $updateMask A list of field paths to be modified. Nested + * key names should be dot-separated, e.g. `pushConfig.pushEndpoint`. + * Google Cloud PHP will attempt to infer this value on your + * behalf, however modification of map fields with arbitrary keys + * (such as labels or push config attributes) requires an explicit + * update mask. * } * @param array $options [optional] Configuration options. * @return array The subscription info. */ public function update(array $subscription, array $options = []) { + $updateMaskPaths = $this->pluck('updateMask', $options, false) ?: []; + if (!$updateMaskPaths) { + $excludes = ['name', 'topic']; + $iterator = new \RecursiveIteratorIterator(new \RecursiveArrayIterator($subscription)); + foreach ($iterator as $leafValue) { + $keys = []; + foreach (range(0, $iterator->getDepth()) as $depth) { + $keys[] = $iterator->getSubIterator($depth)->key(); + } + + $path = implode('.', $keys); + if (!in_array($path, $excludes)) { + $updateMaskPaths[] = $path; + } + } + } + return $this->info = $this->connection->updateSubscription([ - 'name' => $this->name - ] + $options + $subscription); + 'name' => $this->name, + 'subscription' => [ + 'name' => $this->name + ] + $subscription, + 'updateMask' => implode(',', $updateMaskPaths) + ] + $options); } /** diff --git a/PubSub/tests/Snippet/SubscriptionTest.php b/PubSub/tests/Snippet/SubscriptionTest.php index ef98c3ec3bfd..e7da11dc5404 100644 --- a/PubSub/tests/Snippet/SubscriptionTest.php +++ b/PubSub/tests/Snippet/SubscriptionTest.php @@ -108,6 +108,21 @@ public function testUpdate() $snippet->invoke(); } + public function testUpdateWithMask() + { + $snippet = $this->snippetFromMethod(Subscription::class, 'update', 1); + $snippet->addLocal('subscription', $this->subscription); + + $this->connection->updateSubscription(Argument::allOf( + Argument::withKey('subscription'), + Argument::withKey('updateMask') + ))->shouldBeCalled(); + + $this->subscription->___setProperty('connection', $this->connection->reveal()); + + $snippet->invoke(); + } + public function testDelete() { $snippet = $this->snippetFromMethod(Subscription::class, 'delete'); diff --git a/PubSub/tests/System/ManageSubscriptionsTest.php b/PubSub/tests/System/ManageSubscriptionsTest.php index 60499ab0bb3d..d18f39ae590a 100644 --- a/PubSub/tests/System/ManageSubscriptionsTest.php +++ b/PubSub/tests/System/ManageSubscriptionsTest.php @@ -31,8 +31,8 @@ class ManageSubscriptionsTest extends PubSubTestCase */ public function testCreateAndListSubscriptions($client) { - $topicName = uniqid(self::TESTING_PREFIX); - $topic = $client->createTopic($topicName); + $topicId = uniqid(self::TESTING_PREFIX); + $topic = $client->createTopic($topicId); self::$deletionQueue->add($topic); $subsToCreate = [ @@ -41,7 +41,7 @@ public function testCreateAndListSubscriptions($client) ]; foreach ($subsToCreate as $subToCreate) { - self::$deletionQueue->add($client->subscribe($subToCreate, $topicName)); + self::$deletionQueue->add($client->subscribe($subToCreate, $topicId)); } $this->assertSubsFound($client, $subsToCreate); @@ -51,20 +51,28 @@ public function testCreateAndListSubscriptions($client) /** * @dataProvider clientProvider */ - public function testReloadSub($client) + public function testSubscribeAndReload($client) { - $topicName = uniqid(self::TESTING_PREFIX); - $topic = $client->createTopic($topicName); - self::$deletionQueue->add($topic); + $topicId = uniqid(self::TESTING_PREFIX); + $topic = $client->createTopic($topicId); + + $subscriptionId = uniqid(self::TESTING_PREFIX); + $this->assertFalse($topic->subscription($subscriptionId)->exists()); - $shortName = uniqid(self::TESTING_PREFIX); - $this->assertFalse($topic->subscription($shortName)->exists()); + // Subscribe via the topic. + $subscription = $topic->subscribe($subscriptionId); + $this->assertTrue($subscription->exists()); - $sub = $client->subscribe($shortName, $topic->name()); - self::$deletionQueue->add($sub); + $subscriptionId2 = uniqid(self::TESTING_PREFIX); + $this->assertFalse($topic->subscription($subscriptionId2)->exists()); - $this->assertTrue($topic->subscription($shortName)->exists()); - $this->assertEquals($sub->name(), $sub->reload()['name']); + // Subscribe via pubsubclient + $subscription2 = $client->subscribe($subscriptionId2, $topicId); + $this->assertTrue($subscription2->exists()); + + self::$deletionQueue->add($topic); + self::$deletionQueue->add($subscription); + self::$deletionQueue->add($subscription2); } /** @@ -75,18 +83,18 @@ public function testCreateAndListSnapshots($client) $subs = $client->subscriptions(); $sub = $subs->current(); - $snapName = uniqid(self::TESTING_PREFIX); + $snapshotId = uniqid(self::TESTING_PREFIX); - $snap = $client->createSnapshot($snapName, $sub); + $snap = $client->createSnapshot($snapshotId, $sub); self::$deletionQueue->add($snap); $this->assertInstanceOf(Snapshot::class, $snap); $backoff = new ExponentialBackoff(8); - $hasFoundSub = $backoff->execute(function () use ($client, $snapName) { + $hasFoundSub = $backoff->execute(function () use ($client, $snapshotId) { $snaps = $client->snapshots(); - $filtered = array_filter(iterator_to_array($snaps), function ($snap) use ($snapName) { - return strpos($snap->name(), $snapName) !== false; + $filtered = array_filter(iterator_to_array($snaps), function ($snap) use ($snapshotId) { + return strpos($snap->name(), $snapshotId) !== false; }); if (count($filtered) === 1) { @@ -98,11 +106,61 @@ public function testCreateAndListSnapshots($client) $this->assertTrue($hasFoundSub); - $sub->seekToSnapshot($client->snapshot($snapName)); + $sub->seekToSnapshot($client->snapshot($snapshotId)); $sub->seekToTime($client->timestamp(new \DateTime)); } + /** + * @dataProvider clientProvider + */ + public function testUpdateSubscription($client) + { + $subs = $client->subscriptions(); + $sub = $subs->current(); + $ackDeadlineSeconds = isset($sub->info()['ackDeadlineSeconds']) + ? $sub->info()['ackDeadlineSeconds'] + : false; + + $newDeadline = rand(10, 200); + $sub->update([ + 'ackDeadlineSeconds' => $newDeadline + ]); + + $this->assertEquals($newDeadline, $sub->info()['ackDeadlineSeconds']); + } + + /** + * @dataProvider clientProvider + */ + public function testUpdateSubscriptionWithUpdateMask($client) + { + $subs = $client->subscriptions(); + $sub = $subs->current(); + + $labels = [ + 'foo' => 'bar', + 'bat' => 'baz' + ]; + + $sub->update([ + 'labels' => $labels, + 'pushConfig' => [ + 'attributes' => [ + 'x-goog-version' => 'v1beta1' + ] + ] + ], [ + 'updateMask' => [ + 'labels', + 'pushConfig.attributes' + ] + ]); + + $this->assertEquals($labels, $sub->info()['labels']); + $this->assertEquals('v1beta1', $sub->info()['pushConfig']['attributes']['x-goog-version']); + } + private function assertSubsFound($class, $expectedSubs) { $backoff = new ExponentialBackoff(8); diff --git a/PubSub/tests/System/PubSubTestCase.php b/PubSub/tests/System/PubSubTestCase.php index 235ac2b7b097..47d8cdc15c30 100644 --- a/PubSub/tests/System/PubSubTestCase.php +++ b/PubSub/tests/System/PubSubTestCase.php @@ -34,8 +34,8 @@ public function clientProvider() self::setUpBeforeClass(); return [ - [self::$restClient], - [self::$grpcClient] + 'grpc' => [self::$restClient], + 'rest' => [self::$grpcClient] ]; } diff --git a/PubSub/tests/Unit/Connection/GrpcTest.php b/PubSub/tests/Unit/Connection/GrpcTest.php index 5822b9d7e8f6..b2b5964e4e6b 100644 --- a/PubSub/tests/Unit/Connection/GrpcTest.php +++ b/PubSub/tests/Unit/Connection/GrpcTest.php @@ -34,6 +34,7 @@ /** * @group pubsub + * @group pubsub-connection */ class GrpcTest extends TestCase { @@ -108,7 +109,7 @@ public function methodProvider() $subscription->setRetainAckedMessages(true); $serializer = new Serializer(); - $fieldMask = $serializer->decodeMessage(new FieldMask(), ['paths' => ['retainAckedMessages']]); + $fieldMask = $serializer->decodeMessage(new FieldMask(), ['paths' => ['retain_acked_messages']]); $time = (new \DateTime)->format('Y-m-d\TH:i:s.u\Z'); $timestamp = $serializer->decodeMessage(new Timestamp(), $this->formatTimestampForApi($time)); @@ -116,7 +117,13 @@ public function methodProvider() return [ [ 'updateSubscription', - ['name' => 'projects/foo/subscriptions/bar', 'retainAckedMessages' => true], + [ + 'subscription' => [ + 'name' => 'projects/foo/subscriptions/bar', + 'retainAckedMessages' => true + ], + 'updateMask' => 'retainAckedMessages' + ], [$subscription, $fieldMask, []] ], [ diff --git a/PubSub/tests/Unit/SubscriptionTest.php b/PubSub/tests/Unit/SubscriptionTest.php index 83a932c4a4af..17ce3060b030 100644 --- a/PubSub/tests/Unit/SubscriptionTest.php +++ b/PubSub/tests/Unit/SubscriptionTest.php @@ -19,7 +19,6 @@ use Google\Cloud\Core\Exception\NotFoundException; use Google\Cloud\Core\Iam\Iam; -use Google\Cloud\Core\Iterator\ItemIterator; use Google\Cloud\Core\Testing\TestHelpers; use Google\Cloud\Core\Timestamp; use Google\Cloud\PubSub\Connection\ConnectionInterface; @@ -94,24 +93,22 @@ public function testCreateWithoutTopicName() public function testUpdate() { - $args = [ + $this->connection->updateSubscription(Argument::allOf( + Argument::withEntry('subscription', [ + 'name' => $this->subscription->name(), + 'foo' => 'bar' + ]), + Argument::withEntry('updateMask', 'foo') + ))->shouldBeCalled()->willReturn([ 'foo' => 'bar' - ]; - - $argsWithName = $args + [ - 'name' => $this->subscription->name() - ]; - - $this->connection->updateSubscription($argsWithName) - ->shouldBeCalled() - ->willReturn($argsWithName); + ]); $this->subscription->___setProperty('connection', $this->connection->reveal()); - $res = $this->subscription->update($args); + $res = $this->subscription->update(['foo' => 'bar']); - $this->assertEquals($res, $argsWithName); - $this->assertEquals($this->subscription->info(), $argsWithName); + $this->assertEquals(['foo' => 'bar'], $res); + $this->assertEquals('bar', $this->subscription->info()['foo']); } public function testDelete()