Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow for topics to updated via a new method #2256

Merged
merged 13 commits into from
Aug 20, 2019
Merged
5 changes: 5 additions & 0 deletions PubSub/src/Connection/ConnectionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public function getTopic(array $args);
*/
public function deleteTopic(array $args);

/**
* @param array $args
*/
public function updateTopic(array $args);

/**
* @param array $args
*/
Expand Down
28 changes: 28 additions & 0 deletions PubSub/src/Connection/Grpc.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
use Google\Cloud\PubSub\V1\PushConfig;
use Google\Cloud\PubSub\V1\SubscriberClient;
use Google\Cloud\PubSub\V1\Subscription;
use Google\Cloud\PubSub\V1\Topic;
use Google\Protobuf\Duration;
use Google\Protobuf\FieldMask;
use Google\Protobuf\Timestamp;
Expand Down Expand Up @@ -142,6 +143,33 @@ public function deleteTopic(array $args)
]);
}

/**
* @param array $args
*/
public function updateTopic(array $args)
{
$updateMaskPaths = [];
foreach (explode(',', $this->pluck('updateMask', $args)) as $path) {
$updateMaskPaths[] = Serializer::toSnakeCase($path);
}

$fieldMask = new FieldMask([
'paths' => $updateMaskPaths
]);

$topic = $this->serializer->decodeMessage(
new Topic,
$this->pluck('topic', $args)
);

unset($args['name']);
return $this->send([$this->publisherClient, 'updateTopic'], [
$topic,
$fieldMask,
$args
]);
}

/**
* @param array $args
*/
Expand Down
8 changes: 8 additions & 0 deletions PubSub/src/Connection/Rest.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ public function createTopic(array $args)
return $this->send('topics', 'create', $args);
}

/**
* @param array $args
*/
public function updateTopic(array $args)
{
return $this->send('topics', 'patch', $args);
}

/**
* @param array $args
*/
Expand Down
91 changes: 91 additions & 0 deletions PubSub/src/Topic.php
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,97 @@ public function create(array $options = [])
return $this->info;
}

/**
* Update the topic.
*
* Note that the topic's name and kms key name are immutable properties and may not be modified.
*
* Example:
* ```
* $topic->update([
* 'messageStoragePolicy' => [
* 'allowedPersistenceRegions' => ['us-central1']
* ]
* ]);
* ```
*
* ```
* // Updating labels with an explicit update mask
* $topic->update([
* 'labels' => [
* 'foo' => 'bar'
* ]
* ], [
* 'updateMask' => [
* 'labels'
* ]
* ]);
* ```
*
* @see https://cloud.google.com/pubsub/docs/reference/rest/v1/UpdateTopicRequest Update Topic
*
* @param array $topic {
* The Topic data.
*
* @type array $labels Key value pairs used to organize your resources.
* @type array $messageStoragePolicy Policy constraining the set of
* Google Cloud Platform regions where messages published to the
* topic may be stored. If not present, then no constraints are in
* effect.
* @type string[] $messageStoragePolicy.allowedPersistenceRegions A list
* of IDs of GCP regions where messages that are published to the
* topic may be persisted in storage. Messages published by
* publishers running in non-allowed GCP regions (or running
* outside of GCP altogether) will be routed for storage in one of
* the allowed regions. An empty list means that no regions are
* allowed, and is not a valid configuration.
* }
* @param array $options [optional] {
* Configuration options.
*
* @type array $updateMask A list of field paths to be modified. Nested
* key names should be dot-separated, e.g.
* `messageStoragePolicy.allowedPersistenceRegions`. Google Cloud
* PHP will attempt to infer this value on your behalf, however
* modification of map fields with arbitrary keys (such as labels
* or message storage policy) requires an explicit update mask.
* }
*
* @return array The topic info.
*/
public function update(array $topic, array $options = [])
{
$updateMaskPaths = $this->pluck('updateMask', $options, false) ?: [];

if (!$updateMaskPaths) {
$iterator = new \RecursiveIteratorIterator(new \RecursiveArrayIterator($topic));
foreach ($iterator as $leafValue) {
$excludes = ['name'];
$keys = [];
foreach (range(0, $iterator->getDepth()) as $depth) {
$key = $iterator->getSubIterator($depth)->key();
if (!is_string($key)) {
break;
}
$keys[] = $key;
}

$path = implode('.', $keys);
if (!in_array($path, $excludes)) {
$updateMaskPaths[] = $path;
}
}
}

return $this->info = $this->connection->updateTopic([
'name' => $this->name,
'topic' => [
'name' => $this->name,
] + $topic,
'updateMask' => implode(',', $updateMaskPaths)
] + $options);
}

/**
* Delete a topic.
*
Expand Down
28 changes: 28 additions & 0 deletions PubSub/tests/Snippet/TopicTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,34 @@ public function testCreate()
$this->assertEquals([], $res->returnVal());
}

public function testUpdate()
{
$snippet = $this->snippetFromMethod(Topic::class, 'update');
$snippet->addLocal('topic', $this->topic);

$this->connection->updateTopic(Argument::any())
->shouldBeCalled();

$this->topic->___setProperty('connection', $this->connection->reveal());

$snippet->invoke();
}

public function testUpdateWithMask()
{
$snippet = $this->snippetFromMethod(Topic::class, 'update', 1);
$snippet->addLocal('topic', $this->topic);

$this->connection->updateTopic(Argument::allOf(
Argument::withKey('topic'),
Argument::withKey('updateMask')
))->shouldBeCalled();

$this->topic->___setProperty('connection', $this->connection->reveal());

$snippet->invoke();
}

public function testDelete()
{
$snippet = $this->snippetFromMethod(Topic::class, 'delete');
Expand Down
44 changes: 44 additions & 0 deletions PubSub/tests/System/ManageTopicsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,50 @@ public function testReloadTopic($client)
$this->assertEquals($topic->name(), $topic->reload()['name']);
}

/**
* @dataProvider clientProvider
*/
public function testUpdateTopic($client)
{
$shortName = uniqid(self::TESTING_PREFIX);
$this->assertFalse($client->topic($shortName)->exists());
$topic = $client->createTopic($shortName);
self::$deletionQueue->add($topic);

$policy = [
'allowedPersistenceRegions' => ['us-central1', 'us-east1']
];

$topic->update([
'messageStoragePolicy' => $policy
]);

$this->assertEquals($policy, $topic->info()['messageStoragePolicy']);
}

/**
* @dataProvider clientProvider
*/
public function testUpdateTopicWithUpdateMask($client)
{
$shortName = uniqid(self::TESTING_PREFIX);
$this->assertFalse($client->topic($shortName)->exists());
$topic = $client->createTopic($shortName);
self::$deletionQueue->add($topic);

$labels = [
'foo' => 'bar'
];

$topic->update([
'labels' => $labels
], [
'updateMask' => [ 'labels' ]
]);

$this->assertEquals($labels, $topic->info()['labels']);
}

/**
* @dataProvider clientProvider
*/
Expand Down
23 changes: 21 additions & 2 deletions PubSub/tests/Unit/Connection/GrpcTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use Google\Cloud\PubSub\Connection\Grpc;
use Google\Cloud\Core\Testing\GrpcTestTrait;
use Google\ApiCore\Serializer;
use Google\Cloud\PubSub\V1\Topic;
use Google\Protobuf\FieldMask;
use Google\Protobuf\Timestamp;
use Prophecy\Argument;
Expand Down Expand Up @@ -118,12 +119,30 @@ public function methodProvider()
$subscription->setRetainAckedMessages(true);

$serializer = new Serializer();
$fieldMask = $serializer->decodeMessage(new FieldMask(), ['paths' => ['retain_acked_messages']]);
$subscriptionFieldMask = $serializer->decodeMessage(new FieldMask(), ['paths' => ['retain_acked_messages']]);

$time = (new \DateTime)->format('Y-m-d\TH:i:s.u\Z');
$timestamp = $serializer->decodeMessage(new Timestamp(), $this->formatTimestampForApi($time));

$topic = new Topic;
$topic->setLabels(['foo' => 'bar']);
$topic->setName('projects/foo/topics/bar');
$topicFieldMask = $serializer->decodeMessage(new FieldMask(), ['paths' => ['labels']]);

return [
[
'updateTopic',
[
'topic' => [
'name' => 'projects/foo/topics/bar',
'labels' => [
'foo' => 'bar'
]
],
'updateMask' => 'labels'
],
[$topic, $topicFieldMask, []]
],
[
'updateSubscription',
[
Expand All @@ -133,7 +152,7 @@ public function methodProvider()
],
'updateMask' => 'retainAckedMessages'
],
[$subscription, $fieldMask, []]
[$subscription, $subscriptionFieldMask, []]
],
[
'listSnapshots',
Expand Down
1 change: 1 addition & 0 deletions PubSub/tests/Unit/Connection/RestTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public function methodProvider()
['createTopic'],
['getTopic'],
['deleteTopic'],
['updateTopic'],
['listTopics'],
['publishMessage'],
['listSubscriptionsByTopic'],
Expand Down
20 changes: 20 additions & 0 deletions PubSub/tests/Unit/TopicTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,26 @@ public function testCreate()
$this->assertEquals('projects/project-name/topics/topic-name', $res['name']);
}

public function testUpdate()
{
$this->connection->updateTopic(Argument::allOf(
Argument::withEntry('topic', [
'name' => $this->topic->name(),
'foo' => 'bar'
]),
Argument::withEntry('updateMask', 'foo')
))->shouldBeCalled()->willReturn([
'foo' => 'bar'
]);

$this->topic->___setProperty('connection', $this->connection->reveal());

$res = $this->topic->update(['foo' => 'bar']);

$this->assertEquals(['foo' => 'bar'], $res);
$this->assertEquals('bar', $this->topic->info()['foo']);
}

public function testDelete()
{
$this->connection->deleteTopic(Argument::withEntry('foo', 'bar'))
Expand Down