From b1ceef9dacb012d0df531c848fe492991e1808e6 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Mon, 29 Apr 2019 19:24:33 +0200 Subject: [PATCH] ReportingMetricSetV2WithContext --- metricbeat/mb/builders.go | 8 +++++-- metricbeat/mb/mb.go | 7 ++++++ metricbeat/mb/module/wrapper.go | 19 ++++++++++----- metricbeat/mb/testing/data_generator.go | 21 ++++++++++++++++ metricbeat/mb/testing/modules.go | 24 +++++++++++++++++++ .../module/docker/container/container.go | 4 ++-- .../container/container_integration_test.go | 12 ++-------- 7 files changed, 75 insertions(+), 20 deletions(-) diff --git a/metricbeat/mb/builders.go b/metricbeat/mb/builders.go index 87765ecdaac..04b32f5638a 100644 --- a/metricbeat/mb/builders.go +++ b/metricbeat/mb/builders.go @@ -241,6 +241,10 @@ func mustImplementFetcher(ms MetricSet) error { ifcs = append(ifcs, "ReportingMetricSetV2Error") } + if _, ok := ms.(ReportingMetricSetV2WithContext); ok { + ifcs = append(ifcs, "ReportingMetricSetV2WithContext") + } + if _, ok := ms.(PushMetricSetV2); ok { ifcs = append(ifcs, "PushMetricSetV2") } @@ -253,8 +257,8 @@ func mustImplementFetcher(ms MetricSet) error { case 0: return fmt.Errorf("MetricSet '%s/%s' does not implement an event "+ "producing interface (EventFetcher, EventsFetcher, "+ - "ReportingMetricSet, ReportingMetricSetV2, ReportingMetricSetV2Error, PushMetricSet, or "+ - "PushMetricSetV2)", + "ReportingMetricSet, ReportingMetricSetV2, ReportingMetricSetV2Error, ReportingMetricSetV2WithContext"+ + "PushMetricSet, PushMetricSetV2, or PushMetricSetV2WithContext)", ms.Module().Name(), ms.Name()) case 1: return nil diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index bc14fb57c2b..674333eb62b 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -209,6 +209,13 @@ type ReportingMetricSetV2Error interface { Fetch(r ReporterV2) error } +// ReportingMetricSetV2WithContext is a MetricSet that reports events or errors through the +// ReporterV2 interface. Fetch is called periodically to collect events. +type ReportingMetricSetV2WithContext interface { + MetricSet + Fetch(ctx context.Context, r ReporterV2) error +} + // PushMetricSetV2 is a MetricSet that pushes events (rather than pulling them // periodically via a Fetch callback). Run is invoked to start the event // subscription and it should block until the MetricSet is ready to stop or diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index 535042d9446..061e7987b81 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -195,8 +195,8 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) { case mb.PushMetricSetV2WithContext: ms.Run(&channelContext{done}, reporter.V2()) case mb.EventFetcher, mb.EventsFetcher, - mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error: - msw.startPeriodicFetching(reporter) + mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error, mb.ReportingMetricSetV2WithContext: + msw.startPeriodicFetching(&channelContext{done}, reporter) default: // Earlier startup stages prevent this from happening. logp.Err("MetricSet '%s/%s' does not implement an event producing interface", @@ -207,9 +207,9 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) { // startPeriodicFetching performs an immediate fetch for the MetricSet then it // begins a continuous timer scheduled loop to fetch data. To stop the loop the // done channel should be closed. -func (msw *metricSetWrapper) startPeriodicFetching(reporter reporter) { +func (msw *metricSetWrapper) startPeriodicFetching(ctx context.Context, reporter reporter) { // Fetch immediately. - msw.fetch(reporter) + msw.fetch(ctx, reporter) // Start timer for future fetches. t := time.NewTicker(msw.Module().Config().Period) @@ -219,7 +219,7 @@ func (msw *metricSetWrapper) startPeriodicFetching(reporter reporter) { case <-reporter.V2().Done(): return case <-t.C: - msw.fetch(reporter) + msw.fetch(ctx, reporter) } } } @@ -227,7 +227,7 @@ func (msw *metricSetWrapper) startPeriodicFetching(reporter reporter) { // fetch invokes the appropriate Fetch method for the MetricSet and publishes // the result using the publisher client. This method will recover from panics // and log a stack track if one occurs. -func (msw *metricSetWrapper) fetch(reporter reporter) { +func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) { switch fetcher := msw.MetricSet.(type) { case mb.EventFetcher: msw.singleEventFetch(fetcher, reporter) @@ -246,6 +246,13 @@ func (msw *metricSetWrapper) fetch(reporter reporter) { reporter.V2().Error(err) logp.Info("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) } + case mb.ReportingMetricSetV2WithContext: + reporter.StartFetchTimer() + err := fetcher.Fetch(ctx, reporter.V2()) + if err != nil { + reporter.V2().Error(err) + logp.Info("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) + } default: panic(fmt.Sprintf("unexpected fetcher type for %v", msw)) } diff --git a/metricbeat/mb/testing/data_generator.go b/metricbeat/mb/testing/data_generator.go index 1bdb6c69e5d..1f75f44e9d7 100644 --- a/metricbeat/mb/testing/data_generator.go +++ b/metricbeat/mb/testing/data_generator.go @@ -99,6 +99,12 @@ func WriteEventsReporterV2Error(f mb.ReportingMetricSetV2Error, t testing.TB, pa return WriteEventsReporterV2ErrorCond(f, t, path, nil) } +// WriteEventsReporterV2WithContext fetches events and writes the first event to a ./_meta/data.json +// file. +func WriteEventsReporterV2WithContext(f mb.ReportingMetricSetV2WithContext, t testing.TB, path string) error { + return WriteEventsReporterV2WithContextCond(f, t, path, nil) +} + // WriteEventsReporterV2Cond fetches events and writes the first event that matches // the condition to a file. func WriteEventsReporterV2Cond(f mb.ReportingMetricSetV2, t testing.TB, path string, cond func(common.MapStr) bool) error { @@ -129,6 +135,21 @@ func WriteEventsReporterV2ErrorCond(f mb.ReportingMetricSetV2Error, t testing.TB return writeEvent(events, f, t, path, cond) } +// WriteEventsReporterV2WithContextCond fetches events and writes the first event that matches +// the condition to a file. +func WriteEventsReporterV2WithContextCond(f mb.ReportingMetricSetV2WithContext, t testing.TB, path string, cond func(common.MapStr) bool) error { + if !*dataFlag { + t.Skip("skip data generation tests") + } + + events, errs := ReportingFetchV2WithContext(f) + if len(errs) > 0 { + return errs[0] + } + + return writeEvent(events, f, t, path, cond) +} + func writeEvent(events []mb.Event, f mb.MetricSet, t testing.TB, path string, cond func(common.MapStr) bool) error { if len(events) == 0 { return fmt.Errorf("no events were generated") diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 45e0edfc506..6e9ef0aab12 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -182,6 +182,19 @@ func NewReportingMetricSetV2Error(t testing.TB, config interface{}) mb.Reporting return reportingMetricSetV2Error } +// NewReportingMetricSetV2WithContext returns a new ReportingMetricSetV2WithContext instance. Then +// you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet. +func NewReportingMetricSetV2WithContext(t testing.TB, config interface{}) mb.ReportingMetricSetV2WithContext { + metricSet := NewMetricSet(t, config) + + reportingMetricSet, ok := metricSet.(mb.ReportingMetricSetV2WithContext) + if !ok { + t.Fatal("MetricSet does not implement ReportingMetricSetV2WithContext") + } + + return reportingMetricSet +} + // CapturingReporterV2 is a reporter used for testing which stores all events and errors type CapturingReporterV2 struct { events []mb.Event @@ -229,6 +242,17 @@ func ReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error) ([]mb.Event, return r.events, r.errs } +// ReportingFetchV2WithContext runs the given reporting metricset and returns all of the +// events and errors that occur during that period. +func ReportingFetchV2WithContext(metricSet mb.ReportingMetricSetV2WithContext) ([]mb.Event, []error) { + r := &CapturingReporterV2{} + err := metricSet.Fetch(context.Background(), r) + if err != nil { + r.errs = append(r.errs, err) + } + return r.events, r.errs +} + // NewPushMetricSet instantiates a new PushMetricSet using the given // configuration. The ModuleFactory and MetricSetFactory are obtained from the // global Registry. diff --git a/metricbeat/module/docker/container/container.go b/metricbeat/module/docker/container/container.go index 8f84dc3eda2..940d6e333f1 100644 --- a/metricbeat/module/docker/container/container.go +++ b/metricbeat/module/docker/container/container.go @@ -63,9 +63,9 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch returns a list of all containers as events. // This is based on https://docs.docker.com/engine/reference/api/docker_remote_api_v1.24/#/list-containers. -func (m *MetricSet) Fetch(r mb.ReporterV2) error { +func (m *MetricSet) Fetch(ctx context.Context, r mb.ReporterV2) error { // Fetch a list of all containers. - containers, err := m.dockerClient.ContainerList(context.Background(), types.ContainerListOptions{}) + containers, err := m.dockerClient.ContainerList(ctx, types.ContainerListOptions{}) if err != nil { return errors.Wrap(err, "failed to get docker containers list") } diff --git a/metricbeat/module/docker/container/container_integration_test.go b/metricbeat/module/docker/container/container_integration_test.go index cd1dd4afcc3..8802c30f6c3 100644 --- a/metricbeat/module/docker/container/container_integration_test.go +++ b/metricbeat/module/docker/container/container_integration_test.go @@ -22,20 +22,12 @@ package container import ( "testing" - "github.com/stretchr/testify/assert" - mbtest "github.com/elastic/beats/metricbeat/mb/testing" ) func TestData(t *testing.T) { - f := mbtest.NewReportingMetricSetV2Error(t, getConfig()) - events, errs := mbtest.ReportingFetchV2Error(f) - if len(errs) > 0 { - t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) - } - assert.NotEmpty(t, events) - - if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil { + f := mbtest.NewReportingMetricSetV2WithContext(t, getConfig()) + if err := mbtest.WriteEventsReporterV2WithContext(f, t, ""); err != nil { t.Fatal("write", err) } }