From 0477eaa68f608956cd437bb30788f1b0e9edf440 Mon Sep 17 00:00:00 2001
From: Jaime Soriano Pastor <jaime.soriano@elastic.co>
Date: Mon, 29 Apr 2019 19:26:24 +0200
Subject: [PATCH 1/8] Update changelog

---
 CHANGELOG-developer.next.asciidoc | 1 +
 1 file changed, 1 insertion(+)

diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc
index 6704596f3171..2db1d19ce2f3 100644
--- a/CHANGELOG-developer.next.asciidoc
+++ b/CHANGELOG-developer.next.asciidoc
@@ -37,3 +37,4 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
 - Update Jinja2 version to 2.10.1. {pull}11817[11817]
 - Reduce idxmgmt.Supporter interface and rework export commands to reuse logic. {pull}11777[11777]
 - Update urllib3 version to 1.24.2 {pull}11930[11930]
+- New ReporterV2 interfaces that can receive a context on `Fetch(ctx, reporter)`, or `Run(ctx, reporter)`. {pull}11981[11981]

From da007c52fe5d00deefc1d6abd104583d03c90fb5 Mon Sep 17 00:00:00 2001
From: Jaime Soriano Pastor <jaime.soriano@elastic.co>
Date: Mon, 29 Apr 2019 19:10:55 +0200
Subject: [PATCH 2/8] PushMetricSetV2WithContext

---
 metricbeat/mb/builders.go                     |  5 +++
 metricbeat/mb/mb.go                           | 10 +++++
 metricbeat/mb/module/wrapper.go               | 20 +++++++++
 metricbeat/mb/testing/modules.go              | 44 ++++++++++++++++++-
 metricbeat/module/docker/event/event.go       |  8 ++--
 .../docker/event/event_integration_test.go    |  6 +--
 6 files changed, 83 insertions(+), 10 deletions(-)

diff --git a/metricbeat/mb/builders.go b/metricbeat/mb/builders.go
index deb2e43ec63f..87765ecdaac4 100644
--- a/metricbeat/mb/builders.go
+++ b/metricbeat/mb/builders.go
@@ -244,6 +244,11 @@ func mustImplementFetcher(ms MetricSet) error {
 	if _, ok := ms.(PushMetricSetV2); ok {
 		ifcs = append(ifcs, "PushMetricSetV2")
 	}
+
+	if _, ok := ms.(PushMetricSetV2WithContext); ok {
+		ifcs = append(ifcs, "PushMetricSetV2WithContext")
+	}
+
 	switch len(ifcs) {
 	case 0:
 		return fmt.Errorf("MetricSet '%s/%s' does not implement an event "+
diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go
index e38655a6907f..bc14fb57c2b5 100644
--- a/metricbeat/mb/mb.go
+++ b/metricbeat/mb/mb.go
@@ -22,6 +22,7 @@ to implement Modules and their associated MetricSets.
 package mb
 
 import (
+	"context"
 	"fmt"
 	"net/url"
 	"time"
@@ -217,6 +218,15 @@ type PushMetricSetV2 interface {
 	Run(r PushReporterV2)
 }
 
+// PushMetricSetV2Context is a MetricSet that pushes events (rather than pulling them
+// periodically via a Fetch callback). Run is invoked to start the event
+// subscription and it should block until the MetricSet is ready to stop or
+// the context is closed.
+type PushMetricSetV2WithContext interface {
+	MetricSet
+	Run(ctx context.Context, r ReporterV2)
+}
+
 // HostData contains values parsed from the 'host' configuration. Other
 // configuration data like protocols, usernames, and passwords may also be
 // used to construct this HostData data.
diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go
index 245876cd655b..535042d94464 100644
--- a/metricbeat/mb/module/wrapper.go
+++ b/metricbeat/mb/module/wrapper.go
@@ -18,6 +18,7 @@
 package module
 
 import (
+	"context"
 	"fmt"
 	"math/rand"
 	"sync"
@@ -191,6 +192,8 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {
 		ms.Run(reporter.V1())
 	case mb.PushMetricSetV2:
 		ms.Run(reporter.V2())
+	case mb.PushMetricSetV2WithContext:
+		ms.Run(&channelContext{done}, reporter.V2())
 	case mb.EventFetcher, mb.EventsFetcher,
 		mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error:
 		msw.startPeriodicFetching(reporter)
@@ -313,6 +316,23 @@ func (r *eventReporter) V1() mb.PushReporter {
 }
 func (r *eventReporter) V2() mb.PushReporterV2 { return reporterV2{r} }
 
+// channelContext implements context.Context by wrapping a channel
+type channelContext struct {
+	done <-chan struct{}
+}
+
+func (r *channelContext) Deadline() (time.Time, bool) { return time.Time{}, false }
+func (r *channelContext) Done() <-chan struct{}       { return r.done }
+func (r *channelContext) Err() error {
+	select {
+	case <-r.done:
+		return context.Canceled
+	default:
+		return nil
+	}
+}
+func (r *channelContext) Value(key interface{}) interface{} { return nil }
+
 // reporterV1 wraps V2 to provide a v1 interface.
 type reporterV1 struct {
 	v2     mb.PushReporterV2
diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go
index ec5ebc0f5322..45e0edfc506d 100644
--- a/metricbeat/mb/testing/modules.go
+++ b/metricbeat/mb/testing/modules.go
@@ -54,6 +54,7 @@ that Metricbeat does it and with the same validations.
 package testing
 
 import (
+	"context"
 	"sync"
 	"testing"
 	"time"
@@ -301,7 +302,21 @@ func NewPushMetricSetV2(t testing.TB, config interface{}) mb.PushMetricSetV2 {
 
 	pushMetricSet, ok := metricSet.(mb.PushMetricSetV2)
 	if !ok {
-		t.Fatal("MetricSet does not implement PushMetricSet")
+		t.Fatal("MetricSet does not implement PushMetricSetV2")
+	}
+
+	return pushMetricSet
+}
+
+// NewPushMetricSetV2WithContext instantiates a new PushMetricSetV2WithContext
+// using the given configuration. The ModuleFactory and MetricSetFactory are
+// obtained from the global Registry.
+func NewPushMetricSetV2WithContext(t testing.TB, config interface{}) mb.PushMetricSetV2WithContext {
+	metricSet := NewMetricSet(t, config)
+
+	pushMetricSet, ok := metricSet.(mb.PushMetricSetV2WithContext)
+	if !ok {
+		t.Fatal("MetricSet does not implement PushMetricSetV2WithContext")
 	}
 
 	return pushMetricSet
@@ -345,7 +360,7 @@ func (r *capturingPushReporterV2) Done() <-chan struct{} {
 // time and returns all of the events and errors that occur during that period.
 func RunPushMetricSetV2(timeout time.Duration, waitEvents int, metricSet mb.PushMetricSetV2) []mb.Event {
 	var (
-		r      = &capturingPushReporterV2{doneC: make(chan struct{}), eventsC: make(chan mb.Event)}
+		r      = &capturingPushReporterV2{eventsC: make(chan mb.Event)}
 		wg     sync.WaitGroup
 		events []mb.Event
 	)
@@ -387,3 +402,28 @@ func RunPushMetricSetV2(timeout time.Duration, waitEvents int, metricSet mb.Push
 	wg.Wait()
 	return events
 }
+
+// RunPushMetricSetV2WithContext run the given push metricset for the specific amount of
+// time and returns all of the events that occur during that period.
+func RunPushMetricSetV2WithContext(timeout time.Duration, waitEvents int, metricSet mb.PushMetricSetV2WithContext) []mb.Event {
+	ctx, cancel := context.WithTimeout(context.Background(), timeout)
+	r := &capturingPushReporterV2{eventsC: make(chan mb.Event)}
+
+	go metricSet.Run(ctx, r)
+
+	var events []mb.Event
+	for {
+		select {
+		case <-ctx.Done():
+			// Timeout
+			return events
+		case e := <-r.eventsC:
+			events = append(events, e)
+			if len(events) >= waitEvents {
+				cancel()
+				return events
+			}
+		}
+	}
+	return events
+}
diff --git a/metricbeat/module/docker/event/event.go b/metricbeat/module/docker/event/event.go
index cae7ceaa561e..6426ffeb961a 100644
--- a/metricbeat/module/docker/event/event.go
+++ b/metricbeat/module/docker/event/event.go
@@ -76,8 +76,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
 }
 
 // Run listens for docker events and reports them
-func (m *MetricSet) Run(reporter mb.PushReporterV2) {
-	ctx, cancel := context.WithCancel(context.Background())
+func (m *MetricSet) Run(ctx context.Context, reporter mb.ReporterV2) {
 	options := types.EventsOptions{
 		Since: fmt.Sprintf("%d", time.Now().Unix()),
 	}
@@ -100,16 +99,15 @@ func (m *MetricSet) Run(reporter mb.PushReporterV2) {
 				time.Sleep(1 * time.Second)
 				break WATCH
 
-			case <-reporter.Done():
+			case <-ctx.Done():
 				m.logger.Debug("docker", "event watcher stopped")
-				cancel()
 				return
 			}
 		}
 	}
 }
 
-func (m *MetricSet) reportEvent(reporter mb.PushReporterV2, event events.Message) {
+func (m *MetricSet) reportEvent(reporter mb.ReporterV2, event events.Message) {
 	time := time.Unix(event.Time, 0)
 
 	attributes := make(map[string]string, len(event.Actor.Attributes))
diff --git a/metricbeat/module/docker/event/event_integration_test.go b/metricbeat/module/docker/event/event_integration_test.go
index 361cb0e5c95a..3cce1486672c 100644
--- a/metricbeat/module/docker/event/event_integration_test.go
+++ b/metricbeat/module/docker/event/event_integration_test.go
@@ -20,6 +20,7 @@
 package event
 
 import (
+	"context"
 	"io"
 	"os"
 	"testing"
@@ -28,7 +29,6 @@ import (
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/api/types/container"
 	"github.com/docker/docker/client"
-	"golang.org/x/net/context"
 
 	"github.com/elastic/beats/auditbeat/core"
 	"github.com/elastic/beats/metricbeat/mb"
@@ -36,11 +36,11 @@ import (
 )
 
 func TestData(t *testing.T) {
-	ms := mbtest.NewPushMetricSetV2(t, getConfig())
+	ms := mbtest.NewPushMetricSetV2WithContext(t, getConfig())
 	var events []mb.Event
 	done := make(chan interface{})
 	go func() {
-		events = mbtest.RunPushMetricSetV2(10*time.Second, 1, ms)
+		events = mbtest.RunPushMetricSetV2WithContext(10*time.Second, 1, ms)
 		close(done)
 	}()
 

From b1ceef9dacb012d0df531c848fe492991e1808e6 Mon Sep 17 00:00:00 2001
From: Jaime Soriano Pastor <jaime.soriano@elastic.co>
Date: Mon, 29 Apr 2019 19:24:33 +0200
Subject: [PATCH 3/8] ReportingMetricSetV2WithContext

---
 metricbeat/mb/builders.go                     |  8 +++++--
 metricbeat/mb/mb.go                           |  7 ++++++
 metricbeat/mb/module/wrapper.go               | 19 ++++++++++-----
 metricbeat/mb/testing/data_generator.go       | 21 ++++++++++++++++
 metricbeat/mb/testing/modules.go              | 24 +++++++++++++++++++
 .../module/docker/container/container.go      |  4 ++--
 .../container/container_integration_test.go   | 12 ++--------
 7 files changed, 75 insertions(+), 20 deletions(-)

diff --git a/metricbeat/mb/builders.go b/metricbeat/mb/builders.go
index 87765ecdaac4..04b32f5638aa 100644
--- a/metricbeat/mb/builders.go
+++ b/metricbeat/mb/builders.go
@@ -241,6 +241,10 @@ func mustImplementFetcher(ms MetricSet) error {
 		ifcs = append(ifcs, "ReportingMetricSetV2Error")
 	}
 
+	if _, ok := ms.(ReportingMetricSetV2WithContext); ok {
+		ifcs = append(ifcs, "ReportingMetricSetV2WithContext")
+	}
+
 	if _, ok := ms.(PushMetricSetV2); ok {
 		ifcs = append(ifcs, "PushMetricSetV2")
 	}
@@ -253,8 +257,8 @@ func mustImplementFetcher(ms MetricSet) error {
 	case 0:
 		return fmt.Errorf("MetricSet '%s/%s' does not implement an event "+
 			"producing interface (EventFetcher, EventsFetcher, "+
-			"ReportingMetricSet, ReportingMetricSetV2, ReportingMetricSetV2Error, PushMetricSet, or "+
-			"PushMetricSetV2)",
+			"ReportingMetricSet, ReportingMetricSetV2, ReportingMetricSetV2Error, ReportingMetricSetV2WithContext"+
+			"PushMetricSet, PushMetricSetV2, or PushMetricSetV2WithContext)",
 			ms.Module().Name(), ms.Name())
 	case 1:
 		return nil
diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go
index bc14fb57c2b5..674333eb62b3 100644
--- a/metricbeat/mb/mb.go
+++ b/metricbeat/mb/mb.go
@@ -209,6 +209,13 @@ type ReportingMetricSetV2Error interface {
 	Fetch(r ReporterV2) error
 }
 
+// ReportingMetricSetV2WithContext is a MetricSet that reports events or errors through the
+// ReporterV2 interface. Fetch is called periodically to collect events.
+type ReportingMetricSetV2WithContext interface {
+	MetricSet
+	Fetch(ctx context.Context, r ReporterV2) error
+}
+
 // PushMetricSetV2 is a MetricSet that pushes events (rather than pulling them
 // periodically via a Fetch callback). Run is invoked to start the event
 // subscription and it should block until the MetricSet is ready to stop or
diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go
index 535042d94464..061e7987b811 100644
--- a/metricbeat/mb/module/wrapper.go
+++ b/metricbeat/mb/module/wrapper.go
@@ -195,8 +195,8 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {
 	case mb.PushMetricSetV2WithContext:
 		ms.Run(&channelContext{done}, reporter.V2())
 	case mb.EventFetcher, mb.EventsFetcher,
-		mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error:
-		msw.startPeriodicFetching(reporter)
+		mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error, mb.ReportingMetricSetV2WithContext:
+		msw.startPeriodicFetching(&channelContext{done}, reporter)
 	default:
 		// Earlier startup stages prevent this from happening.
 		logp.Err("MetricSet '%s/%s' does not implement an event producing interface",
@@ -207,9 +207,9 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {
 // startPeriodicFetching performs an immediate fetch for the MetricSet then it
 // begins a continuous timer scheduled loop to fetch data. To stop the loop the
 // done channel should be closed.
-func (msw *metricSetWrapper) startPeriodicFetching(reporter reporter) {
+func (msw *metricSetWrapper) startPeriodicFetching(ctx context.Context, reporter reporter) {
 	// Fetch immediately.
-	msw.fetch(reporter)
+	msw.fetch(ctx, reporter)
 
 	// Start timer for future fetches.
 	t := time.NewTicker(msw.Module().Config().Period)
@@ -219,7 +219,7 @@ func (msw *metricSetWrapper) startPeriodicFetching(reporter reporter) {
 		case <-reporter.V2().Done():
 			return
 		case <-t.C:
-			msw.fetch(reporter)
+			msw.fetch(ctx, reporter)
 		}
 	}
 }
@@ -227,7 +227,7 @@ func (msw *metricSetWrapper) startPeriodicFetching(reporter reporter) {
 // fetch invokes the appropriate Fetch method for the MetricSet and publishes
 // the result using the publisher client. This method will recover from panics
 // and log a stack track if one occurs.
-func (msw *metricSetWrapper) fetch(reporter reporter) {
+func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
 	switch fetcher := msw.MetricSet.(type) {
 	case mb.EventFetcher:
 		msw.singleEventFetch(fetcher, reporter)
@@ -246,6 +246,13 @@ func (msw *metricSetWrapper) fetch(reporter reporter) {
 			reporter.V2().Error(err)
 			logp.Info("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
 		}
+	case mb.ReportingMetricSetV2WithContext:
+		reporter.StartFetchTimer()
+		err := fetcher.Fetch(ctx, reporter.V2())
+		if err != nil {
+			reporter.V2().Error(err)
+			logp.Info("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
+		}
 	default:
 		panic(fmt.Sprintf("unexpected fetcher type for %v", msw))
 	}
diff --git a/metricbeat/mb/testing/data_generator.go b/metricbeat/mb/testing/data_generator.go
index 1bdb6c69e5d4..1f75f44e9d7b 100644
--- a/metricbeat/mb/testing/data_generator.go
+++ b/metricbeat/mb/testing/data_generator.go
@@ -99,6 +99,12 @@ func WriteEventsReporterV2Error(f mb.ReportingMetricSetV2Error, t testing.TB, pa
 	return WriteEventsReporterV2ErrorCond(f, t, path, nil)
 }
 
+// WriteEventsReporterV2WithContext fetches events and writes the first event to a ./_meta/data.json
+// file.
+func WriteEventsReporterV2WithContext(f mb.ReportingMetricSetV2WithContext, t testing.TB, path string) error {
+	return WriteEventsReporterV2WithContextCond(f, t, path, nil)
+}
+
 // WriteEventsReporterV2Cond fetches events and writes the first event that matches
 // the condition to a file.
 func WriteEventsReporterV2Cond(f mb.ReportingMetricSetV2, t testing.TB, path string, cond func(common.MapStr) bool) error {
@@ -129,6 +135,21 @@ func WriteEventsReporterV2ErrorCond(f mb.ReportingMetricSetV2Error, t testing.TB
 	return writeEvent(events, f, t, path, cond)
 }
 
+// WriteEventsReporterV2WithContextCond fetches events and writes the first event that matches
+// the condition to a file.
+func WriteEventsReporterV2WithContextCond(f mb.ReportingMetricSetV2WithContext, t testing.TB, path string, cond func(common.MapStr) bool) error {
+	if !*dataFlag {
+		t.Skip("skip data generation tests")
+	}
+
+	events, errs := ReportingFetchV2WithContext(f)
+	if len(errs) > 0 {
+		return errs[0]
+	}
+
+	return writeEvent(events, f, t, path, cond)
+}
+
 func writeEvent(events []mb.Event, f mb.MetricSet, t testing.TB, path string, cond func(common.MapStr) bool) error {
 	if len(events) == 0 {
 		return fmt.Errorf("no events were generated")
diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go
index 45e0edfc506d..6e9ef0aab12a 100644
--- a/metricbeat/mb/testing/modules.go
+++ b/metricbeat/mb/testing/modules.go
@@ -182,6 +182,19 @@ func NewReportingMetricSetV2Error(t testing.TB, config interface{}) mb.Reporting
 	return reportingMetricSetV2Error
 }
 
+// NewReportingMetricSetV2WithContext returns a new ReportingMetricSetV2WithContext instance. Then
+// you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet.
+func NewReportingMetricSetV2WithContext(t testing.TB, config interface{}) mb.ReportingMetricSetV2WithContext {
+	metricSet := NewMetricSet(t, config)
+
+	reportingMetricSet, ok := metricSet.(mb.ReportingMetricSetV2WithContext)
+	if !ok {
+		t.Fatal("MetricSet does not implement ReportingMetricSetV2WithContext")
+	}
+
+	return reportingMetricSet
+}
+
 // CapturingReporterV2 is a reporter used for testing which stores all events and errors
 type CapturingReporterV2 struct {
 	events []mb.Event
@@ -229,6 +242,17 @@ func ReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error) ([]mb.Event,
 	return r.events, r.errs
 }
 
+// ReportingFetchV2WithContext runs the given reporting metricset and returns all of the
+// events and errors that occur during that period.
+func ReportingFetchV2WithContext(metricSet mb.ReportingMetricSetV2WithContext) ([]mb.Event, []error) {
+	r := &CapturingReporterV2{}
+	err := metricSet.Fetch(context.Background(), r)
+	if err != nil {
+		r.errs = append(r.errs, err)
+	}
+	return r.events, r.errs
+}
+
 // NewPushMetricSet instantiates a new PushMetricSet using the given
 // configuration. The ModuleFactory and MetricSetFactory are obtained from the
 // global Registry.
diff --git a/metricbeat/module/docker/container/container.go b/metricbeat/module/docker/container/container.go
index 8f84dc3eda2d..940d6e333f1e 100644
--- a/metricbeat/module/docker/container/container.go
+++ b/metricbeat/module/docker/container/container.go
@@ -63,9 +63,9 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
 
 // Fetch returns a list of all containers as events.
 // This is based on https://docs.docker.com/engine/reference/api/docker_remote_api_v1.24/#/list-containers.
-func (m *MetricSet) Fetch(r mb.ReporterV2) error {
+func (m *MetricSet) Fetch(ctx context.Context, r mb.ReporterV2) error {
 	// Fetch a list of all containers.
-	containers, err := m.dockerClient.ContainerList(context.Background(), types.ContainerListOptions{})
+	containers, err := m.dockerClient.ContainerList(ctx, types.ContainerListOptions{})
 	if err != nil {
 		return errors.Wrap(err, "failed to get docker containers list")
 	}
diff --git a/metricbeat/module/docker/container/container_integration_test.go b/metricbeat/module/docker/container/container_integration_test.go
index cd1dd4afcc39..8802c30f6c32 100644
--- a/metricbeat/module/docker/container/container_integration_test.go
+++ b/metricbeat/module/docker/container/container_integration_test.go
@@ -22,20 +22,12 @@ package container
 import (
 	"testing"
 
-	"github.com/stretchr/testify/assert"
-
 	mbtest "github.com/elastic/beats/metricbeat/mb/testing"
 )
 
 func TestData(t *testing.T) {
-	f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
-	events, errs := mbtest.ReportingFetchV2Error(f)
-	if len(errs) > 0 {
-		t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
-	}
-	assert.NotEmpty(t, events)
-
-	if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
+	f := mbtest.NewReportingMetricSetV2WithContext(t, getConfig())
+	if err := mbtest.WriteEventsReporterV2WithContext(f, t, ""); err != nil {
 		t.Fatal("write", err)
 	}
 }

From 26009c3c814e8bda7f17e9543c276f1cdb139202 Mon Sep 17 00:00:00 2001
From: Jaime Soriano Pastor <jaime.soriano@elastic.co>
Date: Mon, 29 Apr 2019 19:42:06 +0200
Subject: [PATCH 4/8] Make hound happy

---
 metricbeat/mb/mb.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go
index 674333eb62b3..d55b8971d4a8 100644
--- a/metricbeat/mb/mb.go
+++ b/metricbeat/mb/mb.go
@@ -225,7 +225,7 @@ type PushMetricSetV2 interface {
 	Run(r PushReporterV2)
 }
 
-// PushMetricSetV2Context is a MetricSet that pushes events (rather than pulling them
+// PushMetricSetV2WithContext is a MetricSet that pushes events (rather than pulling them
 // periodically via a Fetch callback). Run is invoked to start the event
 // subscription and it should block until the MetricSet is ready to stop or
 // the context is closed.

From ba7c1a6502c74b37282997b03a789c30051e97bf Mon Sep 17 00:00:00 2001
From: Jaime Soriano Pastor <jaime.soriano@elastic.co>
Date: Tue, 30 Apr 2019 10:32:49 +0200
Subject: [PATCH 5/8] Call cancel on defer

---
 metricbeat/mb/testing/modules.go | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go
index 6e9ef0aab12a..f48416e194b3 100644
--- a/metricbeat/mb/testing/modules.go
+++ b/metricbeat/mb/testing/modules.go
@@ -431,6 +431,8 @@ func RunPushMetricSetV2(timeout time.Duration, waitEvents int, metricSet mb.Push
 // time and returns all of the events that occur during that period.
 func RunPushMetricSetV2WithContext(timeout time.Duration, waitEvents int, metricSet mb.PushMetricSetV2WithContext) []mb.Event {
 	ctx, cancel := context.WithTimeout(context.Background(), timeout)
+	defer cancel()
+
 	r := &capturingPushReporterV2{eventsC: make(chan mb.Event)}
 
 	go metricSet.Run(ctx, r)
@@ -444,10 +446,8 @@ func RunPushMetricSetV2WithContext(timeout time.Duration, waitEvents int, metric
 		case e := <-r.eventsC:
 			events = append(events, e)
 			if len(events) >= waitEvents {
-				cancel()
 				return events
 			}
 		}
 	}
-	return events
 }

From 2429aac05e706bd65ba469167c9128764b727a5e Mon Sep 17 00:00:00 2001
From: Jaime Soriano Pastor <jaime.soriano@elastic.co>
Date: Tue, 25 Jun 2019 11:14:33 +0200
Subject: [PATCH 6/8] Fix changelog

---
 CHANGELOG-developer.next.asciidoc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc
index 6152145df453..32a90173657d 100644
--- a/CHANGELOG-developer.next.asciidoc
+++ b/CHANGELOG-developer.next.asciidoc
@@ -35,10 +35,10 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
 - Update Jinja2 version to 2.10.1. {pull}11817[11817]
 - Reduce idxmgmt.Supporter interface and rework export commands to reuse logic. {pull}11777[11777],{pull}12065[12065],{pull}12067[12067],{pull}12160[12160]
 - Update urllib3 version to 1.24.2 {pull}11930[11930]
-- New ReporterV2 interfaces that can receive a context on `Fetch(ctx, reporter)`, or `Run(ctx, reporter)`. {pull}11981[11981]
 - Add libbeat/common/cleanup package. {pull}12134[12134]
 - New helper to check for leaked goroutines on tests. {pull}12106[12106]
 - Only Load minimal template if no fields are provided. {pull}12103[12103]
 - Add new option `IgnoreAllErrors` to `libbeat.common.schema` for skipping fields that failed while converting. {pull}12089[12089]
 - Deprecate setup cmds for `template` and `ilm-policy`. Add new setup cmd for `index-management`. {pull}12132[12132]
 - Use the go-lookslike library for testing in heartbeat. Eventually the mapval package will be replaced with it. {pull}12540[12540]
+- New ReporterV2 interfaces that can receive a context on `Fetch(ctx, reporter)`, or `Run(ctx, reporter)`. {pull}11981[11981]

From 1ca4e53e567bc7224823c77d2478ac880e6f1867 Mon Sep 17 00:00:00 2001
From: Jaime Soriano Pastor <jaime.soriano@elastic.co>
Date: Tue, 25 Jun 2019 13:00:26 +0200
Subject: [PATCH 7/8] Refactor capturing push reporter used in tests

---
 metricbeat/mb/testing/modules.go | 87 ++++++++++----------------------
 1 file changed, 27 insertions(+), 60 deletions(-)

diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go
index f48416e194b3..541d0e428c1f 100644
--- a/metricbeat/mb/testing/modules.go
+++ b/metricbeat/mb/testing/modules.go
@@ -349,15 +349,19 @@ func NewPushMetricSetV2WithContext(t testing.TB, config interface{}) mb.PushMetr
 // capturingPushReporterV2 stores all the events and errors from a metricset's
 // Run method.
 type capturingPushReporterV2 struct {
-	doneC   chan struct{}
+	context.Context
 	eventsC chan mb.Event
 }
 
+func newCapturingPushReporterV2(ctx context.Context) *capturingPushReporterV2 {
+	return &capturingPushReporterV2{Context: ctx, eventsC: make(chan mb.Event)}
+}
+
 // report writes an event to the output channel and returns true. If the output
 // is closed it returns false.
 func (r *capturingPushReporterV2) report(event mb.Event) bool {
 	select {
-	case <-r.doneC:
+	case <-r.Done():
 		// Publisher is stopped.
 		return false
 	case r.eventsC <- event:
@@ -375,56 +379,32 @@ func (r *capturingPushReporterV2) Error(err error) bool {
 	return r.report(mb.Event{Error: err})
 }
 
-// Done returns the Done channel for this reporter.
-func (r *capturingPushReporterV2) Done() <-chan struct{} {
-	return r.doneC
+func (r *capturingPushReporterV2) capture(waitEvents int) []mb.Event {
+	var events []mb.Event
+	for {
+		select {
+		case <-r.Done():
+			// Timeout
+			return events
+		case e := <-r.eventsC:
+			events = append(events, e)
+			if len(events) >= waitEvents {
+				return events
+			}
+		}
+	}
 }
 
 // RunPushMetricSetV2 run the given push metricset for the specific amount of
 // time and returns all of the events and errors that occur during that period.
 func RunPushMetricSetV2(timeout time.Duration, waitEvents int, metricSet mb.PushMetricSetV2) []mb.Event {
-	var (
-		r      = &capturingPushReporterV2{eventsC: make(chan mb.Event)}
-		wg     sync.WaitGroup
-		events []mb.Event
-	)
-	wg.Add(2)
-
-	// Producer
-	go func() {
-		defer wg.Done()
-		defer close(r.eventsC)
-		if closer, ok := metricSet.(mb.Closer); ok {
-			defer closer.Close()
-		}
-		metricSet.Run(r)
-	}()
+	ctx, cancel := context.WithTimeout(context.Background(), timeout)
+	defer cancel()
 
-	// Consumer
-	go func() {
-		defer wg.Done()
-		defer close(r.doneC)
-
-		timer := time.NewTimer(timeout)
-		defer timer.Stop()
-		for {
-			select {
-			case <-timer.C:
-				return
-			case e, ok := <-r.eventsC:
-				if !ok {
-					return
-				}
-				events = append(events, e)
-				if waitEvents > 0 && waitEvents <= len(events) {
-					return
-				}
-			}
-		}
-	}()
+	r := newCapturingPushReporterV2(ctx)
 
-	wg.Wait()
-	return events
+	go metricSet.Run(r)
+	return r.capture(waitEvents)
 }
 
 // RunPushMetricSetV2WithContext run the given push metricset for the specific amount of
@@ -433,21 +413,8 @@ func RunPushMetricSetV2WithContext(timeout time.Duration, waitEvents int, metric
 	ctx, cancel := context.WithTimeout(context.Background(), timeout)
 	defer cancel()
 
-	r := &capturingPushReporterV2{eventsC: make(chan mb.Event)}
+	r := newCapturingPushReporterV2(ctx)
 
 	go metricSet.Run(ctx, r)
-
-	var events []mb.Event
-	for {
-		select {
-		case <-ctx.Done():
-			// Timeout
-			return events
-		case e := <-r.eventsC:
-			events = append(events, e)
-			if len(events) >= waitEvents {
-				return events
-			}
-		}
-	}
+	return r.capture(waitEvents)
 }

From fc22fa3673f36caea554e01ea91ff52ae2948eb2 Mon Sep 17 00:00:00 2001
From: Jaime Soriano Pastor <jaime.soriano@elastic.co>
Date: Tue, 25 Jun 2019 13:07:01 +0200
Subject: [PATCH 8/8] Keep waiting for timeout if waiting for 0 events in test
 capturing reporter

---
 metricbeat/mb/testing/modules.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go
index 541d0e428c1f..12ff0e9a1add 100644
--- a/metricbeat/mb/testing/modules.go
+++ b/metricbeat/mb/testing/modules.go
@@ -388,7 +388,7 @@ func (r *capturingPushReporterV2) capture(waitEvents int) []mb.Event {
 			return events
 		case e := <-r.eventsC:
 			events = append(events, e)
-			if len(events) >= waitEvents {
+			if waitEvents > 0 && len(events) >= waitEvents {
 				return events
 			}
 		}