Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to automatically create a topic and re-use a subscription. #465

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions lib/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -196,18 +196,25 @@ PubSub.prototype.createTopic = function(name, callback) {
* @throws {Error} If a name is not provided.
*
* @param {string} name - The name of the topic.
* @param {object=} options - Configuration object.
* @param {bool=} options.autoCreate - Automatically create topic if
* it doesn't exist. Note that messages published to a topic with
* no subscribers will not be delivered.
* @return {module:pubsub/topic}
*
* @example
* var topic = pubsub.topic('my-existing-topic');
* var topic = pubsub.topic('topic-that-maybe-exists', {autoCreate: true});
* topic.publish('New message!');

This comment was marked as spam.

*/
PubSub.prototype.topic = function(name) {
PubSub.prototype.topic = function(name, options) {
if (!name) {
throw new Error('A name must be specified for a new topic.');
}
options = options || {};
return new Topic(this, {
name: name
name: name,
autoCreate: options.autoCreate
});
};

Expand Down
42 changes: 41 additions & 1 deletion lib/pubsub/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ function Topic(pubsub, options) {
this.name = Topic.formatName_(pubsub.projectId, options.name);
this.projectId = pubsub.projectId;
this.pubsub = pubsub;

if (options.autoCreate) {
this.unformattedName = options.name;
this.origMakeReq_ = this.makeReq_;
this.makeReq_ = this.autoCreateWrapper_;
}
}

/**
Expand Down Expand Up @@ -95,6 +101,37 @@ Topic.formatName_ = function(projectId, name) {
return 'projects/' + projectId + '/topics/' + name;
};

/**
* Wrapper for makeReq_ that automatically attempts to create a topic if it
* does not yet exist.
*
* @private
*/
Topic.prototype.autoCreateWrapper_ = function(method, path, q, body, callback) {
var self = this;

var createAndRetry = function(){
self.pubsub.createTopic(self.unformattedName, function(err) {
if (err) {
callback(err);
return;
}
self.origMakeReq_(method, path, q, body, callback);
});
};

this.origMakeReq_(method, path, q, body, function(err, res) {
if (err && err.code === 404 && method !== 'DELETE') {
createAndRetry();
return;
}
else {
callback(err, res);
return;
}
});
};

/**
* Publish the provided message or array of messages. A message can be of any
* type. On success, an array of messageIds is returned in the response.
Expand Down Expand Up @@ -276,7 +313,10 @@ Topic.prototype.subscribe = function(name, options, callback) {

var path = Subscription.formatName_(this.projectId, name);
this.makeReq_('PUT', path, null, body, function(err) {
if (err) {
if (options.reuseExisting && err && err.code === 409) {

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

callback(null, self.subscription(name, options));
}
else if (err) {
callback(err);
return;
}
Expand Down
38 changes: 38 additions & 0 deletions test/pubsub/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,44 @@ describe('Topic', function() {
});
});

describe('publish to non-existing topic', function(){
var messageObject = { data: 'howdy' };

it('should generate 404 error without autoCreate', function(done) {
topic.makeReq_ = function(method, path, query, body, callback) {
callback({code: 404});
};

topic.publish(messageObject, function(err){
assert.equal(err.code, 404);
done();
});
});

it('should publish successfully with autoCreate', function(done) {
var acTopic = new Topic(pubsubMock, {
name: TOPIC_NAME, autoCreate: true
});
var created = false;

acTopic.origMakeReq_ = function(method, path, query, body, callback) {
if (!created) {
callback({code: 404});
}
else {
callback(null);
}
};

pubsubMock.createTopic = function(name, callback) {
created = true;
callback();
};

acTopic.publish(messageObject, done);
});
});

describe('delete', function() {
it('should delete a topic', function(done) {
topic.makeReq_ = function(method, path) {
Expand Down