Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new MetricSet interfaces for Module Developers #3908

Merged
merged 2 commits into from
Apr 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- Add experimental metricset `perfmon` to Windows module. {pull}3758[3758]
- Add memcached module with stats metricset. {pull}3693[3693]
- Add the `process.cmdline.cache.enabled` config option to the System Process Metricset. {pull}3891[3891]
- Add new MetricSet interfaces for developers (`Closer`, `ReportingFetcher`, and `PushMetricSet`). {pull}3908[3908]

*Packetbeat*
- Add `fields` and `fields_under_root` to packetbeat protocols configurations. {pull}3518[3518]
Expand Down
16 changes: 13 additions & 3 deletions metricbeat/mb/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,25 @@ func mustImplementFetcher(ms MetricSet) error {
ifcs = append(ifcs, "EventsFetcher")
}

if _, ok := ms.(ReportingMetricSet); ok {
ifcs = append(ifcs, "ReportingMetricSet")
}

if _, ok := ms.(PushMetricSet); ok {
ifcs = append(ifcs, "PushMetricSet")
}

switch len(ifcs) {
case 0:
return fmt.Errorf("MetricSet '%s/%s' does not implement a Fetcher "+
"interface", ms.Module().Name(), ms.Name())
return fmt.Errorf("MetricSet '%s/%s' does not implement an event "+
"producing interface (EventFetcher, EventsFetcher, "+
"ReportingMetricSet, or PushMetricSet)",
ms.Module().Name(), ms.Name())
case 1:
return nil
default:
return fmt.Errorf("MetricSet '%s/%s' can only implement a single "+
"Fetcher interface, but implements %v", ms.Module().Name(),
"event producing interface, but implements %v", ms.Module().Name(),
ms.Name(), ifcs)
}
}
Expand Down
31 changes: 21 additions & 10 deletions metricbeat/mb/example_metricset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"github.com/elastic/beats/metricbeat/mb/parse"
)

var hostParser = parse.URLHostParserBuilder{DefaultScheme: "http"}.Build()
var hostParser = parse.URLHostParserBuilder{
DefaultScheme: "http",
}.Build()

func init() {
// Register the MetricSetFactory function for the "status" MetricSet.
Expand All @@ -26,14 +28,23 @@ func NewMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{BaseMetricSet: base}, nil
}

func (ms *MetricSet) Fetch() (common.MapStr, error) {
// Fetch data from the host (using ms.HostData().URI) and return the data.
return common.MapStr{
"someParam": "value",
"otherParam": 42,
}, nil
// Fetch will be called periodically by the framework.
func (ms *MetricSet) Fetch(report mb.Reporter) {
// Fetch data from the host at ms.HostData().URI and return the data.
data, err := common.MapStr{
"some_metric": 18.0,
"answer_to_everything": 42,
}, error(nil)
if err != nil {
// Report an error if it occurs.
report.Error(err)
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That pattern we will probably see quite often. If Error would not return anything, we could use return report.Error(err) probably.

}

// Otherwise report the collected data.
report.Event(data)
}

// ExampleMetricSetFactory demonstrates how to register a MetricSetFactory
// and unpack additional configuration data.
func ExampleMetricSetFactory() {}
// ExampleReportingMetricSet demonstrates how to register a MetricSetFactory
// and implement a ReportingMetricSet.
func ExampleReportingMetricSet() {}
45 changes: 44 additions & 1 deletion metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,62 @@ type MetricSet interface {
HostData() HostData // HostData returns the parsed host data.
}

// Closer is an optional interface that a MetricSet can implement in order to
// cleanup any resources it has open at shutdown.
type Closer interface {
Close() error
}

// EventFetcher is a MetricSet that returns a single event when collecting data.
// Use ReportingMetricSet for new MetricSet implementations.
type EventFetcher interface {
MetricSet
Fetch() (common.MapStr, error)
}

// EventsFetcher is a MetricSet that returns a multiple events when collecting
// data.
// data. Use ReportingMetricSet for new MetricSet implementations.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case we decided to keep both, we need to rename it later to something like EventsMetricSet to be consistent. I still like the old Fetch interfaces so not sure yet if Reporter will replace it or just be an other option.

type EventsFetcher interface {
MetricSet
Fetch() ([]common.MapStr, error)
}

// Reporter is used by a MetricSet to report events, errors, or errors with
// metadata. The methods return false if and only if publishing failed because
// the MetricSet is being closed.
type Reporter interface {
Event(event common.MapStr) bool // Event reports a single successful event.
ErrorWith(err error, meta common.MapStr) bool // ErrorWith reports a single error event with the additional metadata.
Error(err error) bool // Error reports a single error event.
}

// ReportingMetricSet is a MetricSet that reports events or errors through the
// Reporter interface. Fetch is called periodically to collect events.
type ReportingMetricSet interface {
MetricSet
Fetch(r Reporter)
}

// PushReporter is used by a MetricSet to report events, errors, or errors with
// metadata. It provides a done channel used to signal that reporter should
// stop.
type PushReporter interface {
Reporter

// Done returns a channel that's closed when work done on behalf of this
// reporter should be canceled.
Done() <-chan struct{}
}

// PushMetricSet is a MetricSet that pushes events (rather than pulling them
// periodically via a Fetch callback). Run is invoked to start the event
// subscription and it should block until the MetricSet is ready to stop or
// the PushReporter's done channel is closed.
type PushMetricSet interface {
MetricSet
Run(r PushReporter)
}

// HostData contains values parsed from the 'host' configuration. Other
// configuration data like protocols, usernames, and passwords may also be
// used to construct this HostData data.
Expand Down
155 changes: 154 additions & 1 deletion metricbeat/mb/mb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func (m testModule) ParseHost(host string) (HostData, error) {
return m.hostParser(host)
}

// EventFetcher

type testMetricSet struct {
BaseMetricSet
}
Expand All @@ -28,6 +30,32 @@ func (m *testMetricSet) Fetch() (common.MapStr, error) {
return nil, nil
}

// EventsFetcher

type testMetricSetEventsFetcher struct {
BaseMetricSet
}

func (m *testMetricSetEventsFetcher) Fetch() ([]common.MapStr, error) {
return nil, nil
}

// ReportingFetcher

type testMetricSetReportingFetcher struct {
BaseMetricSet
}

func (m *testMetricSetReportingFetcher) Fetch(r Reporter) {}

// PushMetricSet

type testPushMetricSet struct {
BaseMetricSet
}

func (m *testPushMetricSet) Run(r PushReporter) {}

func TestModuleConfig(t *testing.T) {
tests := []struct {
in interface{}
Expand Down Expand Up @@ -169,7 +197,7 @@ func TestNewModulesDuplicateHosts(t *testing.T) {
assert.Error(t, err)
}

func TestNewModules(t *testing.T) {
func TestNewModulesHostParser(t *testing.T) {
const (
name = "HostParser"
host = "example.com"
Expand Down Expand Up @@ -235,6 +263,131 @@ func TestNewModules(t *testing.T) {
}
assert.FailNow(t, "no modules found")
})

}

func TestNewModulesMetricSetTypes(t *testing.T) {
r := newTestRegistry(t)

factory := func(base BaseMetricSet) (MetricSet, error) {
return &testMetricSet{base}, nil
}

name := "EventFetcher"
if err := r.AddMetricSet(moduleName, name, factory); err != nil {
t.Fatal(err)
}

t.Run(name+" MetricSet", func(t *testing.T) {
c := newConfig(t, map[string]interface{}{
"module": moduleName,
"metricsets": []string{name},
})

modules, err := NewModules(c, r)
if err != nil {
t.Fatal(err)
}
assert.Len(t, modules, 1)

for _, metricSets := range modules {
if assert.Len(t, metricSets, 1) {
metricSet := metricSets[0]
_, ok := metricSet.(EventFetcher)
assert.True(t, ok, name+" not implemented")
}
}
})

factory = func(base BaseMetricSet) (MetricSet, error) {
return &testMetricSetEventsFetcher{base}, nil
}

name = "EventsFetcher"
if err := r.AddMetricSet(moduleName, name, factory); err != nil {
t.Fatal(err)
}

t.Run(name+" MetricSet", func(t *testing.T) {
c := newConfig(t, map[string]interface{}{
"module": moduleName,
"metricsets": []string{name},
})

modules, err := NewModules(c, r)
if err != nil {
t.Fatal(err)
}
assert.Len(t, modules, 1)

for _, metricSets := range modules {
if assert.Len(t, metricSets, 1) {
metricSet := metricSets[0]
_, ok := metricSet.(EventsFetcher)
assert.True(t, ok, name+" not implemented")
}
}
})

factory = func(base BaseMetricSet) (MetricSet, error) {
return &testMetricSetReportingFetcher{base}, nil
}

name = "ReportingFetcher"
if err := r.AddMetricSet(moduleName, name, factory); err != nil {
t.Fatal(err)
}

t.Run(name+" MetricSet", func(t *testing.T) {
c := newConfig(t, map[string]interface{}{
"module": moduleName,
"metricsets": []string{name},
})

modules, err := NewModules(c, r)
if err != nil {
t.Fatal(err)
}
assert.Len(t, modules, 1)

for _, metricSets := range modules {
if assert.Len(t, metricSets, 1) {
metricSet := metricSets[0]
_, ok := metricSet.(ReportingMetricSet)
assert.True(t, ok, name+" not implemented")
}
}
})

factory = func(base BaseMetricSet) (MetricSet, error) {
return &testPushMetricSet{base}, nil
}

name = "Push"
if err := r.AddMetricSet(moduleName, name, factory); err != nil {
t.Fatal(err)
}

t.Run(name+" MetricSet", func(t *testing.T) {
c := newConfig(t, map[string]interface{}{
"module": moduleName,
"metricsets": []string{name},
})

modules, err := NewModules(c, r)
if err != nil {
t.Fatal(err)
}
assert.Len(t, modules, 1)

for _, metricSets := range modules {
if assert.Len(t, metricSets, 1) {
metricSet := metricSets[0]
_, ok := metricSet.(PushMetricSet)
assert.True(t, ok, name+" not implemented")
}
}
})
}

// TestNewBaseModuleFromModuleConfigStruct tests the creation a new BaseModule.
Expand Down
4 changes: 3 additions & 1 deletion metricbeat/mb/module/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ func (b EventBuilder) Build() (common.MapStr, error) {
metricsetData := common.MapStr{
"module": b.ModuleName,
"name": b.MetricSetName,
"rtt": b.FetchDuration.Nanoseconds() / int64(time.Microsecond),
}
if b.FetchDuration != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, that means we also remove this value in the system module case 👍

metricsetData["rtt"] = b.FetchDuration.Nanoseconds() / int64(time.Microsecond)
}

namespace := b.MetricSetName
Expand Down
14 changes: 14 additions & 0 deletions metricbeat/mb/module/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,17 @@ func TestEventBuilderNoHost(t *testing.T) {
_, found := event["metricset-host"]
assert.False(t, found)
}

func TestEventBuildNoRTT(t *testing.T) {
b := builder
b.FetchDuration = 0

event, err := b.Build()
if err != nil {
t.Fatal(err)
}

metricset := event["metricset"].(common.MapStr)
_, found := metricset["rtt"]
assert.False(t, found, "found rtt")
}
8 changes: 4 additions & 4 deletions metricbeat/mb/module/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func ExampleWrapper() {
// Build a configuration object.
config, err := common.NewConfigFrom(map[string]interface{}{
"module": moduleName,
"metricsets": []string{metricSetName},
"metricsets": []string{eventFetcherName},
})
if err != nil {
fmt.Println("Error:", err)
Expand Down Expand Up @@ -68,13 +68,13 @@ func ExampleWrapper() {
// "Tags": null
// },
// "fake": {
// "status": {
// "eventfetcher": {
// "metric": 1
// }
// },
// "metricset": {
// "module": "fake",
// "name": "status",
// "name": "eventfetcher",
// "rtt": 111
// },
// "type": "metricsets"
Expand All @@ -91,7 +91,7 @@ func ExampleRunner() {

config, err := common.NewConfigFrom(map[string]interface{}{
"module": moduleName,
"metricsets": []string{metricSetName},
"metricsets": []string{eventFetcherName},
})
if err != nil {
return
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/mb/module/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestRunner(t *testing.T) {

config, err := common.NewConfigFrom(map[string]interface{}{
"module": moduleName,
"metricsets": []string{metricSetName},
"metricsets": []string{eventFetcherName},
})
if err != nil {
t.Fatal(err)
Expand Down
Loading