Skip to content

Commit

Permalink
subcription name updated with id
Browse files Browse the repository at this point in the history
  • Loading branch information
Rajan Joshi committed Mar 13, 2020
1 parent 8a8e633 commit 70f3051
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 9 deletions.
8 changes: 4 additions & 4 deletions beater/pubsubbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
return nil, err
}

logger := logp.NewLogger(fmt.Sprintf("PubSub: %s/%s/%s", config.Project, config.Topic, config.Subscription.Name))
logger := logp.NewLogger(fmt.Sprintf("PubSub: %s/%s/%s", config.Project, config.Topic, config.Subscription.Id))
logger.Infof("config retrieved: %+v", config)

client, err := createPubsubClient(config)
Expand Down Expand Up @@ -202,14 +202,14 @@ func createPubsubClient(config *config.Config) (*pubsub.Client, error) {

func getOrCreateSubscription(client *pubsub.Client, config *config.Config) (*pubsub.Subscription, error) {
if !config.Subscription.Create {
subscription := client.Subscription(config.Subscription.Name)
subscription := client.Subscription(config.Subscription.Id)
return subscription, nil
}

topic := client.Topic(config.Topic)
ctx := context.Background()

subscription, err := client.CreateSubscription(ctx, config.Subscription.Name, pubsub.SubscriptionConfig{
subscription, err := client.CreateSubscription(ctx, config.Subscription.Id, pubsub.SubscriptionConfig{
Topic: topic,
RetainAckedMessages: config.Subscription.RetainAckedMessages,
RetentionDuration: config.Subscription.RetentionDuration,
Expand All @@ -218,7 +218,7 @@ func getOrCreateSubscription(client *pubsub.Client, config *config.Config) (*pub
st, ok := status.FromError(err)
if ok && st.Code() == codes.AlreadyExists {
// The subscription already exists.
subscription = client.Subscription(config.Subscription.Name)
subscription = client.Subscription(config.Subscription.Id)
} else if err != nil {
return nil, fmt.Errorf(st.Message())
} else if ok && st.Code() == codes.NotFound {
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Config struct {
Topic string `config:"topic" validate:"required"`
CredentialsFile string `config:"credentials_file"`
Subscription struct {
Name string `config:"name" validate:"required"`
Id string `config:"id" validate:"required"`
RetainAckedMessages bool `config:"retain_acked_messages"`
RetentionDuration time.Duration `config:"retention_duration"`
Create bool `config:"create"`
Expand Down
4 changes: 2 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestGetAndValidateConfigMissingRequiredFields(t *testing.T) {
c.SetString("topic", -1, tc.Topic)

sConfig, _ := c.Child("subscription", -1)
sConfig.SetString("name", -1, tc.Subscription)
sConfig.SetString("id", -1, tc.Subscription)

_, err := GetAndValidateConfig(c)

Expand Down Expand Up @@ -179,7 +179,7 @@ func createDefaultTestConfig() *common.Config {
c.SetString("topic", -1, "a-topic")

sConfig := common.NewConfig()
sConfig.SetString("name", -1, "a-subscription")
sConfig.SetString("id", -1, "a-subscription")
sConfig.SetBool("retain_acked_messages", -1, false)
sConfig.SetString("retention_duration", -1, "10m")
c.SetChild("subscription", -1, sConfig)
Expand Down
4 changes: 2 additions & 2 deletions pubsubbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ pubsubbeat:
# The Pub/Sub topic name. You must first create this topic.
topic: my-topic

# The Pub/Sub subscription name.
subscription.name: my-subscription
# The Pub/Sub subscription id.
subscription.id: my-subscription

# Attempt to create the subscription.
subscription.create: true # Defaults to true
Expand Down

0 comments on commit 70f3051

Please sign in to comment.