Skip to content

Commit

Permalink
fix: Fix update subscription method (#2122)
Browse files Browse the repository at this point in the history
* fix: Fix update subscription method

* Fix system test failure
  • Loading branch information
jdpedrie authored and dwsupplee committed Jul 19, 2019
1 parent df8d42b commit a0ae202
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 73 deletions.
45 changes: 13 additions & 32 deletions PubSub/src/Connection/Grpc.php
Original file line number Diff line number Diff line change
Expand Up @@ -185,18 +185,23 @@ public function createSubscription(array $args)
*/
public function updateSubscription(array $args)
{
// Get a list of keys used before building subscription, which modifies $args
$mask = array_keys($args);

// Remove immutable properties.
$mask = array_values(array_diff($mask, ['name', 'topic']));
$updateMaskPaths = [];
foreach (explode(',', $this->pluck('updateMask', $args)) as $path) {
$updateMaskPaths[] = Serializer::toSnakeCase($path);
}

$fieldMask = $this->serializer->decodeMessage(new FieldMask(), ['paths' => $mask]);
$fieldMask = new FieldMask([
'paths' => $updateMaskPaths
]);

$subscriptionObject = $this->buildSubscription($args);
$subscription = $this->serializer->decodeMessage(
new Subscription,
$this->pluck('subscription', $args)
);

unset($args['name']);
return $this->send([$this->subscriberClient, 'updateSubscription'], [
$subscriptionObject,
$subscription,
$fieldMask,
$args
]);
Expand Down Expand Up @@ -434,28 +439,4 @@ private function buildPushConfig(array $pushConfig)
{
return $this->serializer->decodeMessage(new PushConfig(), $pushConfig);
}

/**
* Create a Subscription proto message from an array of arguments.
*
* @param array $args
* @param bool $required
* @return Subscription
*/
private function buildSubscription(array &$args, $required = false)
{
$pushConfig = $this->pluck('pushConfig', $args, $required);
$pushConfig = $pushConfig
? $this->buildPushConfig($pushConfig)
: null;

return $this->serializer->decodeMessage(new Subscription(), array_filter([
'name' => $this->pluck('name', $args, $required),
'topic' => $this->pluck('topic', $args, $required),
'pushConfig' => $pushConfig,
'ackDeadlineSeconds' => $this->pluck('ackDeadlineSeconds', $args, $required),
'retainAckedMessages' => $this->pluck('retainAckedMessages', $args, $required),
'messageRetentionDuration' => $this->pluck('messageRetentionDuration', $args, $required),
]));
}
}
56 changes: 52 additions & 4 deletions PubSub/src/Subscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

namespace Google\Cloud\PubSub;

use Google\Cloud\Core\Exception\NotFoundException;
use Google\Cloud\Core\ArrayTrait;
use Google\Cloud\Core\Duration;
use Google\Cloud\Core\Exception\NotFoundException;
use Google\Cloud\Core\Iam\Iam;
use Google\Cloud\Core\Timestamp;
use Google\Cloud\Core\ValidateTrait;
use Google\Cloud\PubSub\Connection\ConnectionInterface;
use Google\Cloud\PubSub\Connection\IamSubscription;
use Google\Cloud\PubSub\IncomingMessageTrait;
use Google\Cloud\Core\ValidateTrait;
use InvalidArgumentException;

/**
Expand Down Expand Up @@ -57,6 +58,7 @@
*/
class Subscription
{
use ArrayTrait;
use IncomingMessageTrait;
use ResourceNameTrait;
use ValidateTrait;
Expand Down Expand Up @@ -235,6 +237,25 @@ public function create(array $options = [])
* ]);
* ```
*
* ```
* // Updating labels and push config attributes with explicit update masks.
* $subscription->update([
* 'labels' => [
* 'label-1' => 'value'
* ],
* 'pushConfig' => [
* 'attributes' => [
* 'x-goog-version' => 1
* ]
* ]
* ], [
* 'updateMask' => [
* 'labels',
* 'pushConfig.attributes'
* ]
* ]);
* ```
*
* @param array $subscription {
* The Subscription data.
*
Expand All @@ -257,15 +278,42 @@ public function create(array $options = [])
* messages, and thus configures how far back in time a `Seek`
* can be done. Cannot be more than 7 days or less than 10 minutes.
* **Defaults to** 7 days.
* @type array $updateMask A list of field paths to be modified. Nested
* key names should be dot-separated, e.g. `pushConfig.pushEndpoint`.
* Google Cloud PHP will attempt to infer this value on your
* behalf, however modification of map fields with arbitrary keys
* (such as labels or push config attributes) requires an explicit
* update mask.
* }
* @param array $options [optional] Configuration options.
* @return array The subscription info.
*/
public function update(array $subscription, array $options = [])
{
$updateMaskPaths = $this->pluck('updateMask', $options, false) ?: [];
if (!$updateMaskPaths) {
$excludes = ['name', 'topic'];
$iterator = new \RecursiveIteratorIterator(new \RecursiveArrayIterator($subscription));
foreach ($iterator as $leafValue) {
$keys = [];
foreach (range(0, $iterator->getDepth()) as $depth) {
$keys[] = $iterator->getSubIterator($depth)->key();
}

$path = implode('.', $keys);
if (!in_array($path, $excludes)) {
$updateMaskPaths[] = $path;
}
}
}

return $this->info = $this->connection->updateSubscription([
'name' => $this->name
] + $options + $subscription);
'name' => $this->name,
'subscription' => [
'name' => $this->name
] + $subscription,
'updateMask' => implode(',', $updateMaskPaths)
] + $options);
}

/**
Expand Down
15 changes: 15 additions & 0 deletions PubSub/tests/Snippet/SubscriptionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,21 @@ public function testUpdate()
$snippet->invoke();
}

public function testUpdateWithMask()
{
$snippet = $this->snippetFromMethod(Subscription::class, 'update', 1);
$snippet->addLocal('subscription', $this->subscription);

$this->connection->updateSubscription(Argument::allOf(
Argument::withKey('subscription'),
Argument::withKey('updateMask')
))->shouldBeCalled();

$this->subscription->___setProperty('connection', $this->connection->reveal());

$snippet->invoke();
}

public function testDelete()
{
$snippet = $this->snippetFromMethod(Subscription::class, 'delete');
Expand Down
96 changes: 77 additions & 19 deletions PubSub/tests/System/ManageSubscriptionsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class ManageSubscriptionsTest extends PubSubTestCase
*/
public function testCreateAndListSubscriptions($client)
{
$topicName = uniqid(self::TESTING_PREFIX);
$topic = $client->createTopic($topicName);
$topicId = uniqid(self::TESTING_PREFIX);
$topic = $client->createTopic($topicId);
self::$deletionQueue->add($topic);

$subsToCreate = [
Expand All @@ -41,7 +41,7 @@ public function testCreateAndListSubscriptions($client)
];

foreach ($subsToCreate as $subToCreate) {
self::$deletionQueue->add($client->subscribe($subToCreate, $topicName));
self::$deletionQueue->add($client->subscribe($subToCreate, $topicId));
}

$this->assertSubsFound($client, $subsToCreate);
Expand All @@ -51,20 +51,28 @@ public function testCreateAndListSubscriptions($client)
/**
* @dataProvider clientProvider
*/
public function testReloadSub($client)
public function testSubscribeAndReload($client)
{
$topicName = uniqid(self::TESTING_PREFIX);
$topic = $client->createTopic($topicName);
self::$deletionQueue->add($topic);
$topicId = uniqid(self::TESTING_PREFIX);
$topic = $client->createTopic($topicId);

$subscriptionId = uniqid(self::TESTING_PREFIX);
$this->assertFalse($topic->subscription($subscriptionId)->exists());

$shortName = uniqid(self::TESTING_PREFIX);
$this->assertFalse($topic->subscription($shortName)->exists());
// Subscribe via the topic.
$subscription = $topic->subscribe($subscriptionId);
$this->assertTrue($subscription->exists());

$sub = $client->subscribe($shortName, $topic->name());
self::$deletionQueue->add($sub);
$subscriptionId2 = uniqid(self::TESTING_PREFIX);
$this->assertFalse($topic->subscription($subscriptionId2)->exists());

$this->assertTrue($topic->subscription($shortName)->exists());
$this->assertEquals($sub->name(), $sub->reload()['name']);
// Subscribe via pubsubclient
$subscription2 = $client->subscribe($subscriptionId2, $topicId);
$this->assertTrue($subscription2->exists());

self::$deletionQueue->add($topic);
self::$deletionQueue->add($subscription);
self::$deletionQueue->add($subscription2);
}

/**
Expand All @@ -75,18 +83,18 @@ public function testCreateAndListSnapshots($client)
$subs = $client->subscriptions();
$sub = $subs->current();

$snapName = uniqid(self::TESTING_PREFIX);
$snapshotId = uniqid(self::TESTING_PREFIX);

$snap = $client->createSnapshot($snapName, $sub);
$snap = $client->createSnapshot($snapshotId, $sub);
self::$deletionQueue->add($snap);

$this->assertInstanceOf(Snapshot::class, $snap);

$backoff = new ExponentialBackoff(8);
$hasFoundSub = $backoff->execute(function () use ($client, $snapName) {
$hasFoundSub = $backoff->execute(function () use ($client, $snapshotId) {
$snaps = $client->snapshots();
$filtered = array_filter(iterator_to_array($snaps), function ($snap) use ($snapName) {
return strpos($snap->name(), $snapName) !== false;
$filtered = array_filter(iterator_to_array($snaps), function ($snap) use ($snapshotId) {
return strpos($snap->name(), $snapshotId) !== false;
});

if (count($filtered) === 1) {
Expand All @@ -98,11 +106,61 @@ public function testCreateAndListSnapshots($client)

$this->assertTrue($hasFoundSub);

$sub->seekToSnapshot($client->snapshot($snapName));
$sub->seekToSnapshot($client->snapshot($snapshotId));

$sub->seekToTime($client->timestamp(new \DateTime));
}

/**
* @dataProvider clientProvider
*/
public function testUpdateSubscription($client)
{
$subs = $client->subscriptions();
$sub = $subs->current();
$ackDeadlineSeconds = isset($sub->info()['ackDeadlineSeconds'])
? $sub->info()['ackDeadlineSeconds']
: false;

$newDeadline = rand(10, 200);
$sub->update([
'ackDeadlineSeconds' => $newDeadline
]);

$this->assertEquals($newDeadline, $sub->info()['ackDeadlineSeconds']);
}

/**
* @dataProvider clientProvider
*/
public function testUpdateSubscriptionWithUpdateMask($client)
{
$subs = $client->subscriptions();
$sub = $subs->current();

$labels = [
'foo' => 'bar',
'bat' => 'baz'
];

$sub->update([
'labels' => $labels,
'pushConfig' => [
'attributes' => [
'x-goog-version' => 'v1beta1'
]
]
], [
'updateMask' => [
'labels',
'pushConfig.attributes'
]
]);

$this->assertEquals($labels, $sub->info()['labels']);
$this->assertEquals('v1beta1', $sub->info()['pushConfig']['attributes']['x-goog-version']);
}

private function assertSubsFound($class, $expectedSubs)
{
$backoff = new ExponentialBackoff(8);
Expand Down
4 changes: 2 additions & 2 deletions PubSub/tests/System/PubSubTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public function clientProvider()
self::setUpBeforeClass();

return [
[self::$restClient],
[self::$grpcClient]
'grpc' => [self::$restClient],
'rest' => [self::$grpcClient]
];
}

Expand Down
11 changes: 9 additions & 2 deletions PubSub/tests/Unit/Connection/GrpcTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

/**
* @group pubsub
* @group pubsub-connection
*/
class GrpcTest extends TestCase
{
Expand Down Expand Up @@ -108,15 +109,21 @@ public function methodProvider()
$subscription->setRetainAckedMessages(true);

$serializer = new Serializer();
$fieldMask = $serializer->decodeMessage(new FieldMask(), ['paths' => ['retainAckedMessages']]);
$fieldMask = $serializer->decodeMessage(new FieldMask(), ['paths' => ['retain_acked_messages']]);

$time = (new \DateTime)->format('Y-m-d\TH:i:s.u\Z');
$timestamp = $serializer->decodeMessage(new Timestamp(), $this->formatTimestampForApi($time));

return [
[
'updateSubscription',
['name' => 'projects/foo/subscriptions/bar', 'retainAckedMessages' => true],
[
'subscription' => [
'name' => 'projects/foo/subscriptions/bar',
'retainAckedMessages' => true
],
'updateMask' => 'retainAckedMessages'
],
[$subscription, $fieldMask, []]
],
[
Expand Down
Loading

0 comments on commit a0ae202

Please sign in to comment.