Skip to content

Commit

Permalink
fix(inputs.gnmi): Allow to disable using first namespace as origin (#…
Browse files Browse the repository at this point in the history
…16507)

(cherry picked from commit 0dcdbe4)
  • Loading branch information
srebhan committed Feb 25, 2025
1 parent 597377b commit f38ef51
Show file tree
Hide file tree
Showing 12 changed files with 677 additions and 95 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
<!-- markdownlint-disable MD024 -->
# Changelog

## Unreleased

### Important Changes

- PR [#16507](https://github.com/influxdata/telegraf/pull/16507) adds the
`enforce_first_namespace_as_origin` to the GNMI input plugin. This option
allows to disable mangling of the response `path` tag by _not_ using namespaces
as origin. It is highly recommended to disable the option.
However, disabling the behavior might change the `path` tag and
thus might break existing queries. Furthermore, the tag modification might
increase cardinality in your database.

## v1.33.2 [2025-02-10]

### Important Changes
Expand Down
8 changes: 8 additions & 0 deletions plugins/inputs/gnmi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ details on how to use them.
## Remove leading slashes and dots in field-name
# trim_field_names = false

## Only receive updates for the state, also suppresses receiving the initial state
# updates_only = false

## Enforces the namespace of the first element as origin for aliases and
## response paths, required for backward compatibility.
## NOTE: Set to 'false' if possible but be aware that this might change the path tag!
# enforce_first_namespace_as_origin = true

## Guess the path-tag if an update does not contain a prefix-path
## Supported values are
## none -- do not add a 'path' tag
Expand Down
125 changes: 73 additions & 52 deletions plugins/inputs/gnmi/gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,31 +40,32 @@ including your device model and the following response data:
This message is only printed once.`

type GNMI struct {
Addresses []string `toml:"addresses"`
Subscriptions []subscription `toml:"subscription"`
TagSubscriptions []tagSubscription `toml:"tag_subscription"`
Aliases map[string]string `toml:"aliases"`
Encoding string `toml:"encoding"`
Origin string `toml:"origin"`
Prefix string `toml:"prefix"`
Target string `toml:"target"`
UpdatesOnly bool `toml:"updates_only"`
VendorSpecific []string `toml:"vendor_specific"`
Username config.Secret `toml:"username"`
Password config.Secret `toml:"password"`
Redial config.Duration `toml:"redial"`
MaxMsgSize config.Size `toml:"max_msg_size"`
Trace bool `toml:"dump_responses"`
CanonicalFieldNames bool `toml:"canonical_field_names"`
TrimFieldNames bool `toml:"trim_field_names"`
PrefixTagKeyWithPath bool `toml:"prefix_tag_key_with_path"`
GuessPathTag bool `toml:"guess_path_tag" deprecated:"1.30.0;1.35.0;use 'path_guessing_strategy' instead"`
GuessPathStrategy string `toml:"path_guessing_strategy"`
EnableTLS bool `toml:"enable_tls" deprecated:"1.27.0;1.35.0;use 'tls_enable' instead"`
KeepaliveTime config.Duration `toml:"keepalive_time"`
KeepaliveTimeout config.Duration `toml:"keepalive_timeout"`
YangModelPaths []string `toml:"yang_model_paths"`
Log telegraf.Logger `toml:"-"`
Addresses []string `toml:"addresses"`
Subscriptions []subscription `toml:"subscription"`
TagSubscriptions []tagSubscription `toml:"tag_subscription"`
Aliases map[string]string `toml:"aliases"`
Encoding string `toml:"encoding"`
Origin string `toml:"origin"`
Prefix string `toml:"prefix"`
Target string `toml:"target"`
UpdatesOnly bool `toml:"updates_only"`
VendorSpecific []string `toml:"vendor_specific"`
Username config.Secret `toml:"username"`
Password config.Secret `toml:"password"`
Redial config.Duration `toml:"redial"`
MaxMsgSize config.Size `toml:"max_msg_size"`
Trace bool `toml:"dump_responses"`
CanonicalFieldNames bool `toml:"canonical_field_names"`
TrimFieldNames bool `toml:"trim_field_names"`
PrefixTagKeyWithPath bool `toml:"prefix_tag_key_with_path"`
GuessPathTag bool `toml:"guess_path_tag" deprecated:"1.30.0;1.35.0;use 'path_guessing_strategy' instead"`
GuessPathStrategy string `toml:"path_guessing_strategy"`
EnableTLS bool `toml:"enable_tls" deprecated:"1.27.0;1.35.0;use 'tls_enable' instead"`
KeepaliveTime config.Duration `toml:"keepalive_time"`
KeepaliveTimeout config.Duration `toml:"keepalive_timeout"`
YangModelPaths []string `toml:"yang_model_paths"`
EnforceFirstNamespaceAsOrigin bool `toml:"enforce_first_namespace_as_origin"`
Log telegraf.Logger `toml:"-"`
common_tls.ClientConfig

// Internal state
Expand Down Expand Up @@ -99,6 +100,15 @@ func (*GNMI) SampleConfig() string {

func (c *GNMI) Init() error {
// Check options
switch c.Encoding {
case "":
c.Encoding = "proto"
case "proto", "json", "json_ietf", "bytes":
// Do nothing, those are valid
default:
return fmt.Errorf("unsupported encoding %s", c.Encoding)
}

if time.Duration(c.Redial) <= 0 {
return errors.New("redial duration must be positive")
}
Expand Down Expand Up @@ -184,17 +194,21 @@ func (c *GNMI) Init() error {
// Invert explicit alias list and prefill subscription names
c.internalAliases = make(map[*pathInfo]string, len(c.Subscriptions)+len(c.Aliases)+len(c.TagSubscriptions))
for _, s := range c.Subscriptions {
if err := s.buildAlias(c.internalAliases); err != nil {
if err := s.buildAlias(c.internalAliases, c.EnforceFirstNamespaceAsOrigin); err != nil {
return err
}
}
for _, s := range c.TagSubscriptions {
if err := s.buildAlias(c.internalAliases); err != nil {
if err := s.buildAlias(c.internalAliases, c.EnforceFirstNamespaceAsOrigin); err != nil {
return err
}
}
for alias, encodingPath := range c.Aliases {
c.internalAliases[newInfoFromString(encodingPath)] = alias
path := newInfoFromString(encodingPath)
if c.EnforceFirstNamespaceAsOrigin {
path.enforceFirstNamespaceAsOrigin()
}
c.internalAliases[path] = alias
}
c.Log.Debugf("Internal alias mapping: %+v", c.internalAliases)

Expand Down Expand Up @@ -279,20 +293,21 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
return
}
h := handler{
host: host,
port: port,
aliases: c.internalAliases,
tagsubs: c.TagSubscriptions,
maxMsgSize: int(c.MaxMsgSize),
vendorExt: c.VendorSpecific,
tagStore: newTagStore(c.TagSubscriptions),
trace: c.Trace,
canonicalFieldNames: c.CanonicalFieldNames,
trimSlash: c.TrimFieldNames,
tagPathPrefix: c.PrefixTagKeyWithPath,
guessPathStrategy: c.GuessPathStrategy,
decoder: c.decoder,
log: c.Log,
host: host,
port: port,
aliases: c.internalAliases,
tagsubs: c.TagSubscriptions,
maxMsgSize: int(c.MaxMsgSize),
vendorExt: c.VendorSpecific,
tagStore: newTagStore(c.TagSubscriptions),
trace: c.Trace,
canonicalFieldNames: c.CanonicalFieldNames,
trimSlash: c.TrimFieldNames,
tagPathPrefix: c.PrefixTagKeyWithPath,
guessPathStrategy: c.GuessPathStrategy,
decoder: c.decoder,
enforceFirstNamespaceAsOrigin: c.EnforceFirstNamespaceAsOrigin,
log: c.Log,
ClientParameters: keepalive.ClientParameters{
Time: time.Duration(c.KeepaliveTime),
Timeout: time.Duration(c.KeepaliveTimeout),
Expand Down Expand Up @@ -420,13 +435,16 @@ func (s *subscription) buildFullPath(c *GNMI) error {
return nil
}

func (s *subscription) buildAlias(aliases map[*pathInfo]string) error {
func (s *subscription) buildAlias(aliases map[*pathInfo]string, enforceFirstNamespaceAsOrigin bool) error {
// Build the subscription path without keys
path, err := parsePath(s.Origin, s.Path, "")
if err != nil {
return err
}
info := newInfoFromPathWithoutKeys(path)
if enforceFirstNamespaceAsOrigin {
info.enforceFirstNamespaceAsOrigin()
}

// If the user didn't provide a measurement name, use last path element
name := s.Name
Expand All @@ -439,15 +457,18 @@ func (s *subscription) buildAlias(aliases map[*pathInfo]string) error {
return nil
}

func newGNMI() telegraf.Input {
return &GNMI{
Encoding: "proto",
Redial: config.Duration(10 * time.Second),
}
}

func init() {
inputs.Add("gnmi", newGNMI)
inputs.Add("gnmi", func() telegraf.Input {
return &GNMI{
Redial: config.Duration(10 * time.Second),
EnforceFirstNamespaceAsOrigin: true,
}
})
// Backwards compatible alias:
inputs.Add("cisco_telemetry_gnmi", newGNMI)
inputs.Add("cisco_telemetry_gnmi", func() telegraf.Input {
return &GNMI{
Redial: config.Duration(10 * time.Second),
EnforceFirstNamespaceAsOrigin: true,
}
})
}
29 changes: 15 additions & 14 deletions plugins/inputs/gnmi/gnmi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,9 +779,10 @@ func TestNotification(t *testing.T) {
{
name: "issue #12257 Sonic",
plugin: &GNMI{
Log: testutil.Logger{},
Encoding: "proto",
Redial: config.Duration(1 * time.Second),
Log: testutil.Logger{},
Encoding: "proto",
Redial: config.Duration(1 * time.Second),
EnforceFirstNamespaceAsOrigin: true,
Subscriptions: []subscription{
{
Name: "temperature",
Expand Down Expand Up @@ -910,10 +911,11 @@ func TestNotification(t *testing.T) {
{
name: "Juniper Extension",
plugin: &GNMI{
Log: testutil.Logger{},
Encoding: "proto",
VendorSpecific: []string{"juniper_header"},
Redial: config.Duration(1 * time.Second),
Log: testutil.Logger{},
Encoding: "proto",
VendorSpecific: []string{"juniper_header"},
Redial: config.Duration(1 * time.Second),
EnforceFirstNamespaceAsOrigin: true,
Subscriptions: []subscription{
{
Name: "type",
Expand Down Expand Up @@ -1105,7 +1107,12 @@ func TestCases(t *testing.T) {
require.NoError(t, err)

// Register the plugin
inputs.Add("gnmi", newGNMI)
inputs.Add("gnmi", func() telegraf.Input {
return &GNMI{
Redial: config.Duration(10 * time.Second),
EnforceFirstNamespaceAsOrigin: true,
}
})

for _, f := range folders {
// Only handle folders
Expand Down Expand Up @@ -1158,12 +1165,6 @@ func TestCases(t *testing.T) {

// Prepare the server response
responseFunction := func(server gnmi.GNMI_SubscribeServer) error {
sync := &gnmi.SubscribeResponse{
Response: &gnmi.SubscribeResponse_SyncResponse{
SyncResponse: true,
},
}
_ = sync
for i := range responses {
if err := server.Send(&responses[i]); err != nil {
return err
Expand Down
41 changes: 26 additions & 15 deletions plugins/inputs/gnmi/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,22 @@ import (
const eidJuniperTelemetryHeader = 1

type handler struct {
host string
port string
aliases map[*pathInfo]string
tagsubs []tagSubscription
maxMsgSize int
emptyNameWarnShown bool
vendorExt []string
tagStore *tagStore
trace bool
canonicalFieldNames bool
trimSlash bool
tagPathPrefix bool
guessPathStrategy string
decoder *yangmodel.Decoder
log telegraf.Logger
host string
port string
aliases map[*pathInfo]string
tagsubs []tagSubscription
maxMsgSize int
emptyNameWarnShown bool
vendorExt []string
tagStore *tagStore
trace bool
canonicalFieldNames bool
trimSlash bool
tagPathPrefix bool
guessPathStrategy string
decoder *yangmodel.Decoder
enforceFirstNamespaceAsOrigin bool
log telegraf.Logger
keepalive.ClientParameters
}

Expand Down Expand Up @@ -161,6 +162,9 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon

// Extract the path part valid for the whole set of updates if any
prefix := newInfoFromPath(response.Update.Prefix)
if h.enforceFirstNamespaceAsOrigin {
prefix.enforceFirstNamespaceAsOrigin()
}

// Add info to the tags
headerTags["source"] = h.host
Expand All @@ -173,6 +177,9 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon
var valueFields []updateField
for _, update := range response.Update.Update {
fullPath := prefix.append(update.Path)
if h.enforceFirstNamespaceAsOrigin {
prefix.enforceFirstNamespaceAsOrigin()
}
if update.Path.Origin != "" {
fullPath.origin = update.Path.Origin
}
Expand Down Expand Up @@ -251,7 +258,11 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon
h.emptyNameWarnShown = true
}
}

aliasInfo := newInfoFromString(aliasPath)
if h.enforceFirstNamespaceAsOrigin {
aliasInfo.enforceFirstNamespaceAsOrigin()
}

if tags["path"] == "" && h.guessPathStrategy == "subscription" {
tags["path"] = aliasInfo.String()
Expand Down
Loading

0 comments on commit f38ef51

Please sign in to comment.