Skip to content

Commit

Permalink
[Metricbeat] Remove EventFetcher and EventsFetcher interface
Browse files Browse the repository at this point in the history
After migrating all metricsets (elastic#10774) to the Reporter interfaces, the EventFetcher and EventsFetcher interface can be removed.
  • Loading branch information
ruflin committed Apr 11, 2019
1 parent c6f3a09 commit 688b92e
Show file tree
Hide file tree
Showing 9 changed files with 12 additions and 292 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
==== Breaking changes

- Move Fields from package libbeat/common to libbeat/mapping. {pull}11198[11198]
- Removing Metricbeat EventFetcher and EventsFetcher interface. Use the reporter interface instead. {pull}[]

==== Bugfixes

Expand Down
82 changes: 0 additions & 82 deletions metricbeat/helper/prometheus/ptest/ptest.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/mb"
mbtest "github.com/elastic/beats/metricbeat/mb/testing"

Expand All @@ -47,87 +46,6 @@ type TestCases []struct {
ExpectedFile string
}

// TestMetricSetEventsFetcher goes over the given TestCases and ensures that source Prometheus metrics gets converted
// into the expected events when passed by the given metricset.
// If -update_expected flag is passed, the expected JSON file will be updated with the result
func TestMetricSetEventsFetcher(t *testing.T, module, metricset string, cases TestCases) {
for _, test := range cases {
t.Logf("Testing %s file\n", test.MetricsFile)

file, err := os.Open(test.MetricsFile)
assert.NoError(t, err, "cannot open test file "+test.MetricsFile)

body, err := ioutil.ReadAll(file)
assert.NoError(t, err, "cannot read test file "+test.MetricsFile)

server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Header().Set("Content-Type", "text/plain; charset=ISO-8859-1")
w.Write([]byte(body))
}))

server.Start()
defer server.Close()

config := map[string]interface{}{
"module": module,
"metricsets": []string{metricset},
"hosts": []string{server.URL},
}

f := mbtest.NewEventsFetcher(t, config)
events, err := f.Fetch()
assert.Nil(t, err, "Errors while fetching metrics")

if *expectedFlag {
sort.SliceStable(events, func(i, j int) bool {
h1, _ := hashstructure.Hash(events[i], nil)
h2, _ := hashstructure.Hash(events[j], nil)
return h1 < h2
})
eventsJSON, _ := json.MarshalIndent(events, "", "\t")
err = ioutil.WriteFile(test.ExpectedFile, eventsJSON, 0644)
assert.NoError(t, err)
}

// Read expected events from reference file
expected, err := ioutil.ReadFile(test.ExpectedFile)
if err != nil {
t.Fatal(err)
}

var expectedEvents []common.MapStr
err = json.Unmarshal(expected, &expectedEvents)
if err != nil {
t.Fatal(err)
}

for _, event := range events {
// ensure the event is in expected list
found := -1
for i, expectedEvent := range expectedEvents {
if event.String() == expectedEvent.String() {
found = i
break
}
}
if found > -1 {
expectedEvents = append(expectedEvents[:found], expectedEvents[found+1:]...)
} else {
t.Errorf("Event was not expected: %+v", event)
}
}

if len(expectedEvents) > 0 {
t.Error("Some events were missing:")
for _, e := range expectedEvents {
t.Error(e)
}
t.Fatal()
}
}
}

// TestMetricSet goes over the given TestCases and ensures that source Prometheus metrics gets converted into the expected
// events when passed by the given metricset.
// If -update_expected flag is passed, the expected JSON file will be updated with the result
Expand Down
8 changes: 0 additions & 8 deletions metricbeat/mb/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,6 @@ func mustHaveModule(ms MetricSet, base BaseMetricSet) error {
// of them.
func mustImplementFetcher(ms MetricSet) error {
var ifcs []string
if _, ok := ms.(EventFetcher); ok {
ifcs = append(ifcs, "EventFetcher")
}

if _, ok := ms.(EventsFetcher); ok {
ifcs = append(ifcs, "EventsFetcher")
}

if _, ok := ms.(ReportingMetricSet); ok {
ifcs = append(ifcs, "ReportingMetricSet")
}
Expand Down
14 changes: 0 additions & 14 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,20 +116,6 @@ type Closer interface {
Close() error
}

// EventFetcher is a MetricSet that returns a single event when collecting data.
// Use ReportingMetricSet for new MetricSet implementations.
type EventFetcher interface {
MetricSet
Fetch() (common.MapStr, error)
}

// EventsFetcher is a MetricSet that returns a multiple events when collecting
// data. Use ReportingMetricSet for new MetricSet implementations.
type EventsFetcher interface {
MetricSet
Fetch() ([]common.MapStr, error)
}

// Reporter is used by a MetricSet to report events, errors, or errors with
// metadata. The methods return false if and only if publishing failed because
// the MetricSet is being closed.
Expand Down
54 changes: 4 additions & 50 deletions metricbeat/mb/mb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,14 @@ func (m testModule) ParseHost(host string) (HostData, error) {
return m.hostParser(host)
}

// EventFetcher
// FetchReporter

type testMetricSet struct {
BaseMetricSet
}

func (m *testMetricSet) Fetch() (common.MapStr, error) {
return nil, nil
}

// EventsFetcher

type testMetricSetEventsFetcher struct {
BaseMetricSet
}

func (m *testMetricSetEventsFetcher) Fetch() ([]common.MapStr, error) {
return nil, nil
func (m *testMetricSet) Fetch(reporter ReporterV2) error {
return nil
}

// ReportingFetcher
Expand Down Expand Up @@ -246,46 +236,10 @@ func TestNewModulesMetricSetTypes(t *testing.T) {
r := newTestRegistry(t)

factory := func(base BaseMetricSet) (MetricSet, error) {
return &testMetricSet{base}, nil
}

name := "EventFetcher"
if err := r.AddMetricSet(moduleName, name, factory); err != nil {
t.Fatal(err)
}

t.Run(name+" MetricSet", func(t *testing.T) {
ms := newTestMetricSet(t, r, map[string]interface{}{
"module": moduleName,
"metricsets": []string{name},
})
_, ok := ms.(EventFetcher)
assert.True(t, ok, name+" not implemented")
})

factory = func(base BaseMetricSet) (MetricSet, error) {
return &testMetricSetEventsFetcher{base}, nil
}

name = "EventsFetcher"
if err := r.AddMetricSet(moduleName, name, factory); err != nil {
t.Fatal(err)
}

t.Run(name+" MetricSet", func(t *testing.T) {
ms := newTestMetricSet(t, r, map[string]interface{}{
"module": moduleName,
"metricsets": []string{name},
})
_, ok := ms.(EventsFetcher)
assert.True(t, ok, name+" not implemented")
})

factory = func(base BaseMetricSet) (MetricSet, error) {
return &testMetricSetReportingFetcher{base}, nil
}

name = "ReportingFetcher"
name := "ReportingFetcher"
if err := r.AddMetricSet(moduleName, name, factory); err != nil {
t.Fatal(err)
}
Expand Down
26 changes: 1 addition & 25 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,6 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {
ms.Run(reporter.V1())
case mb.PushMetricSetV2:
ms.Run(reporter.V2())
case mb.EventFetcher, mb.EventsFetcher,
mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error:
msw.startPeriodicFetching(reporter)
default:
// Earlier startup stages prevent this from happening.
logp.Err("MetricSet '%s/%s' does not implement an event producing interface",
Expand All @@ -204,6 +201,7 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {
// startPeriodicFetching performs an immediate fetch for the MetricSet then it
// begins a continuous timer scheduled loop to fetch data. To stop the loop the
// done channel should be closed.
// TODO: remove?
func (msw *metricSetWrapper) startPeriodicFetching(reporter reporter) {
// Fetch immediately.
msw.fetch(reporter)
Expand All @@ -226,10 +224,6 @@ func (msw *metricSetWrapper) startPeriodicFetching(reporter reporter) {
// and log a stack track if one occurs.
func (msw *metricSetWrapper) fetch(reporter reporter) {
switch fetcher := msw.MetricSet.(type) {
case mb.EventFetcher:
msw.singleEventFetch(fetcher, reporter)
case mb.EventsFetcher:
msw.multiEventFetch(fetcher, reporter)
case mb.ReportingMetricSet:
reporter.StartFetchTimer()
fetcher.Fetch(reporter.V1())
Expand All @@ -248,24 +242,6 @@ func (msw *metricSetWrapper) fetch(reporter reporter) {
}
}

func (msw *metricSetWrapper) singleEventFetch(fetcher mb.EventFetcher, reporter reporter) {
reporter.StartFetchTimer()
event, err := fetcher.Fetch()
reporter.V1().ErrorWith(err, event)
}

func (msw *metricSetWrapper) multiEventFetch(fetcher mb.EventsFetcher, reporter reporter) {
reporter.StartFetchTimer()
events, err := fetcher.Fetch()
if len(events) == 0 {
reporter.V1().ErrorWith(err, nil)
} else {
for _, event := range events {
reporter.V1().ErrorWith(err, event)
}
}
}

// close closes the underlying MetricSet if it implements the mb.Closer
// interface.
func (msw *metricSetWrapper) close() error {
Expand Down
41 changes: 6 additions & 35 deletions metricbeat/mb/module/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,13 @@ type fakeEventFetcher struct {
mb.BaseMetricSet
}

func (ms *fakeEventFetcher) Fetch() (common.MapStr, error) {
func (ms *fakeEventFetcher) Fetch(reporter mb.ReporterV2) error {
t, _ := time.Parse(time.RFC3339, "2016-05-10T23:27:58.485Z")
return common.MapStr{"@timestamp": common.Time(t), "metric": 1}, nil
reporter.Event(mb.Event{
Timestamp: t,
RootFields: common.MapStr{"metric": 1},
})
return nil
}

func (ms *fakeEventFetcher) Close() error {
Expand Down Expand Up @@ -130,39 +134,6 @@ func newConfig(t testing.TB, moduleConfig interface{}) *common.Config {

// test cases

func TestWrapperOfEventFetcher(t *testing.T) {
hosts := []string{"alpha", "beta"}
c := newConfig(t, map[string]interface{}{
"module": moduleName,
"metricsets": []string{eventFetcherName},
"hosts": hosts,
})

m, err := module.NewWrapper(c, newTestRegistry(t))
if err != nil {
t.Fatal(err)
}

done := make(chan struct{})
output := m.Start(done)

<-output
<-output
close(done)

// Validate that the channel is closed after receiving the two
// initial events.
select {
case _, ok := <-output:
if !ok {
// Channel is closed.
return
} else {
assert.Fail(t, "received unexpected event")
}
}
}

func TestWrapperOfReportingFetcher(t *testing.T) {
hosts := []string{"alpha", "beta"}
c := newConfig(t, map[string]interface{}{
Expand Down
50 changes: 0 additions & 50 deletions metricbeat/mb/testing/data_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,56 +37,6 @@ var (
dataFlag = flag.Bool("data", false, "Write updated data.json files")
)

// WriteEvent fetches a single event writes the output to a ./_meta/data.json
// file.
func WriteEvent(f mb.EventFetcher, t testing.TB) error {
if !*dataFlag {
t.Skip("skip data generation tests")
}

event, err := f.Fetch()
if err != nil {
return err
}

fullEvent := CreateFullEvent(f, event)
WriteEventToDataJSON(t, fullEvent, ".")
return nil
}

// WriteEvents fetches events and writes the first event to a ./_meta/data.json
// file.
func WriteEvents(f mb.EventsFetcher, t testing.TB) error {
return WriteEventsCond(f, t, nil)

}

// WriteEventsCond fetches events and writes the first event that matches the condition
// to a ./_meta/data.json file.
func WriteEventsCond(f mb.EventsFetcher, t testing.TB, cond func(e common.MapStr) bool) error {
if !*dataFlag {
t.Skip("skip data generation tests")
}

events, err := f.Fetch()
if err != nil {
return err
}

if len(events) == 0 {
return fmt.Errorf("no events were generated")
}

event, err := SelectEvent(events, cond)
if err != nil {
return err
}

fullEvent := CreateFullEvent(f, event)
WriteEventToDataJSON(t, fullEvent, "")
return nil
}

// WriteEventsReporterV2 fetches events and writes the first event to a ./_meta/data.json
// file.
func WriteEventsReporterV2(f mb.ReportingMetricSetV2, t testing.TB, path string) error {
Expand Down
Loading

0 comments on commit 688b92e

Please sign in to comment.