diff --git a/internal/coreinternal/scraperinttest/scraperint.go b/internal/coreinternal/scraperinttest/scraperint.go index bd6971bc2a1c..7712533cf8a3 100644 --- a/internal/coreinternal/scraperinttest/scraperint.go +++ b/internal/coreinternal/scraperinttest/scraperint.go @@ -30,13 +30,6 @@ import ( const errExposedPort = "exposed container port should not be hardcoded to host port. Use ContainerInfo.MappedPort() instead" -func EqualsLatestMetrics(expected pmetric.Metrics, sink *consumertest.MetricsSink, compareOpts []pmetrictest.CompareMetricsOption) func() bool { - return func() bool { - allMetrics := sink.AllMetrics() - return len(allMetrics) > 0 && nil == pmetrictest.CompareMetrics(expected, allMetrics[len(allMetrics)-1], compareOpts...) - } -} - func NewIntegrationTest(f receiver.Factory, opts ...TestOption) *IntegrationTest { it := &IntegrationTest{ factory: f, diff --git a/receiver/aerospikereceiver/go.mod b/receiver/aerospikereceiver/go.mod index 331b18064204..9f00d0bf63d3 100644 --- a/receiver/aerospikereceiver/go.mod +++ b/receiver/aerospikereceiver/go.mod @@ -4,6 +4,7 @@ go 1.19 require ( github.com/aerospike/aerospike-client-go/v6 v6.12.0 + github.com/docker/go-connections v0.4.0 github.com/google/go-cmp v0.5.9 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.77.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.77.0 @@ -29,7 +30,6 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/docker/distribution v2.8.2+incompatible // indirect github.com/docker/docker v23.0.6+incompatible // indirect - github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect diff --git a/receiver/aerospikereceiver/integration_test.go b/receiver/aerospikereceiver/integration_test.go index a1ce489c259e..d2d13525d186 100644 --- a/receiver/aerospikereceiver/integration_test.go +++ b/receiver/aerospikereceiver/integration_test.go @@ -8,99 +8,186 @@ package aerospikereceiver import ( "context" + "errors" "fmt" - "net" "os" "path/filepath" - "strconv" "testing" "time" as "github.com/aerospike/aerospike-client-go/v6" - "github.com/stretchr/testify/require" + "github.com/docker/go-connections/nat" testcontainers "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/collector/component" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/scraperinttest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) +var aerospikePort = "3000" + +func TestAerospikeIntegration(t *testing.T) { + t.Run("6.2", integrationTest(func(*Config) {})) + t.Run("6.2-cluster", integrationTest(func(cfg *Config) { + cfg.CollectClusterMetrics = true + })) +} + +func integrationTest(cfgMod func(*Config)) func(*testing.T) { + return scraperinttest.NewIntegrationTest( + NewFactory(), + scraperinttest.WithContainerRequest( + testcontainers.ContainerRequest{ + Image: "aerospike:ce-6.2.0.2", + ExposedPorts: []string{aerospikePort}, + WaitingFor: waitStrategy{}, + LifecycleHooks: []testcontainers.ContainerLifecycleHooks{{ + PostStarts: []testcontainers.ContainerHook{ + func(ctx context.Context, container testcontainers.Container) error { + host, err := aerospikeHost(ctx, container) + if err != nil { + return err + } + return populateMetrics(host) + }, + }, + }}, + }), + scraperinttest.WithCustomConfig( + func(t *testing.T, cfg component.Config, ci *scraperinttest.ContainerInfo) { + rCfg := cfg.(*Config) + rCfg.Endpoint = fmt.Sprintf("%s:%s", ci.Host(t), ci.MappedPort(t, aerospikePort)) + rCfg.ScraperControllerSettings.CollectionInterval = 100 * time.Millisecond + cfgMod(rCfg) + }), + scraperinttest.WithCompareOptions( + pmetrictest.IgnoreMetricValues(), + pmetrictest.IgnoreResourceAttributeValue("aerospike.node.name"), + pmetrictest.IgnoreMetricDataPointsOrder(), + pmetrictest.IgnoreStartTimestamp(), + pmetrictest.IgnoreTimestamp(), + ), + ).Run +} + +type waitStrategy struct{} + +func (ws waitStrategy) WaitUntilReady(ctx context.Context, st wait.StrategyTarget) error { + if err := wait.ForListeningPort(nat.Port(aerospikePort)). + WithStartupTimeout(time.Minute). + WaitUntilReady(ctx, st); err != nil { + return err + } + host, err := aerospikeHost(ctx, st) + if err != nil { + return err + } + var clientErr error + for { + select { + case <-ctx.Done(): + return clientErr + default: + _, clientErr = as.NewClientWithPolicyAndHost(clientPolicy(), host) + if clientErr == nil { + return nil + } + } + } +} + +func aerospikeHost(ctx context.Context, st wait.StrategyTarget) (*as.Host, error) { + host, err := st.Host(ctx) + if err != nil { + return nil, err + } + port, err := st.MappedPort(ctx, nat.Port(aerospikePort)) + if err != nil { + return nil, err + } + return as.NewHost(host, port.Int()), nil +} + type doneCheckable interface { IsDone() (bool, as.Error) } -type RecordsCheckable interface { +type recordsCheckable interface { Results() <-chan *as.Result } type aeroDoneFunc func() (doneCheckable, as.Error) -type aeroRecordsFunc func() (RecordsCheckable, as.Error) +type aeroRecordsFunc func() (recordsCheckable, as.Error) -func doneWaitAndCheck(f aeroDoneFunc, t *testing.T) { - t.Log("starting doneWaitAndCheck") +func doneWaitAndCheck(f aeroDoneFunc) error { chk, err := f() - require.NoError(t, err) + if err != nil { + return err + } for res := false; !res; res, err = chk.IsDone() { - require.NoError(t, err) + if err != nil { + return err + } time.Sleep(time.Second / 3) } - - t.Log("leaving doneWaitAndCheck") + return nil } -func RecordsWaitAndCheck(f aeroRecordsFunc, t *testing.T) { - t.Log("starting RecordsWaitAndCheck") +func recordsWaitAndCheck(f aeroRecordsFunc) error { chk, err := f() - require.NoError(t, err) + if err != nil { + return err + } // consume all records for range chk.Results() { } - - t.Log("leaving RecordsWaitAndCheck") + return nil } -func populateMetrics(t *testing.T, host *as.Host) { +func clientPolicy() *as.ClientPolicy { clientPolicy := as.NewClientPolicy() clientPolicy.Timeout = 60 * time.Second // minconns is used to populate the client connections metric clientPolicy.MinConnectionsPerNode = 50 + return clientPolicy +} - var c *as.Client - var clientErr error - require.Eventually(t, func() bool { - c, clientErr = as.NewClientWithPolicyAndHost(clientPolicy, host) - return clientErr == nil - }, 2*time.Minute, 1*time.Second, "failed to populate metrics") +func populateMetrics(host *as.Host) error { + errSetFilter := errors.New("failed to set filter") + errCreateSindex := errors.New("failed to create sindex") + errRunningCreateSindex := errors.New("failed running create index") + + c, err := as.NewClientWithPolicyAndHost(clientPolicy(), host) + if err != nil { + return err + } ns := "test" set := "integration" - pibin := "bin1" sibin := "bin2" // write 100 records to get some memory usage for i := 0; i < 100; i++ { - key, err := as.NewKey(ns, set, i) - require.NoError(t, err, "failed to create key") - - bins := as.BinMap{ - pibin: i, - sibin: i, + var key *as.Key + key, err = as.NewKey(ns, set, i) + if err != nil { + return errors.New("failed to create key") + } + err = c.Put(nil, key, as.BinMap{pibin: i, sibin: i}) + if err != nil { + return errors.New("failed to write record") } - - err = c.Put(nil, key, bins) - require.NoError(t, err, "failed to write record") } // register UDFs for aggregation queries cwd, wderr := os.Getwd() - require.NoError(t, wderr, "can't get working directory") + if wderr != nil { + return errors.New("can't get working directory") + } udfFile := "udf" udfFunc := "sum_single_bin" @@ -108,8 +195,12 @@ func populateMetrics(t *testing.T, host *as.Host) { as.SetLuaPath(luaPath) task, err := c.RegisterUDFFromFile(nil, filepath.Join(luaPath, udfFile+".lua"), udfFile+".lua", as.LUA) - require.NoError(t, err, "failed registering udf file") - require.NoError(t, <-task.OnComplete(), "failed while registering udf file") + if err != nil { + return errors.New("failed registering udf file") + } + if nil != <-task.OnComplete() { + return errors.New("failed while registering udf file") + } queryPolicy := as.NewQueryPolicy() queryPolicyShort := as.NewQueryPolicy() @@ -120,85 +211,117 @@ func populateMetrics(t *testing.T, host *as.Host) { // *** Primary Index Queries *** // // perform a basic primary index query - s1 := as.NewStatement(ns, set) - RecordsWaitAndCheck(func() (RecordsCheckable, as.Error) { + if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) { return c.Query(queryPolicy, s1) - }, t) + }); err != nil { + return err + } // aggregation query on primary index s2 := as.NewStatement(ns, set) - RecordsWaitAndCheck(func() (RecordsCheckable, as.Error) { + if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) { return c.QueryAggregate(queryPolicy, s2, "/"+udfFile, udfFunc, as.StringValue(pibin)) - }, t) - // c.QueryAggregate(queryPolicy, s2, "/"+udfFile, udfFunc, as.StringValue(pibin)) + }); err != nil { + return err + } // background udf query on primary index s3 := as.NewStatement(ns, set) - doneWaitAndCheck(func() (doneCheckable, as.Error) { + if err := doneWaitAndCheck(func() (doneCheckable, as.Error) { return c.ExecuteUDF(queryPolicy, s3, "/"+udfFile, udfFunc, as.StringValue(pibin)) - }, t) + }); err != nil { + return err + } // ops query on primary index s4 := as.NewStatement(ns, set) wbin := as.NewBin(pibin, 200) ops := as.PutOp(wbin) - doneWaitAndCheck(func() (doneCheckable, as.Error) { + if err := doneWaitAndCheck(func() (doneCheckable, as.Error) { return c.QueryExecute(queryPolicy, writePolicy, s4, ops) - }, t) + }); err != nil { + return err + } // perform a basic short primary index query s5 := as.NewStatement(ns, set) - RecordsWaitAndCheck(func() (RecordsCheckable, as.Error) { + if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) { return c.Query(queryPolicyShort, s5) - }, t) + }); err != nil { + return err + } // *** Secondary Index Queries *** // // create secondary index for SI queries itask, err := c.CreateIndex(writePolicy, ns, set, "sitest", "bin2", as.NUMERIC) - require.NoError(t, err, "failed to create sindex") - require.NoError(t, <-itask.OnComplete(), "failed running create index") + if err != nil { + return errCreateSindex + } + if err = <-itask.OnComplete(); err != nil { + return errRunningCreateSindex + } // SI filter filt := as.NewRangeFilter(sibin, 0, 100) // perform a basic secondary index query s6 := as.NewStatement(ns, set) - require.NoError(t, s6.SetFilter(filt)) - RecordsWaitAndCheck(func() (RecordsCheckable, as.Error) { + if sferr := s6.SetFilter(filt); sferr != nil { + return errSetFilter + } + if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) { return c.Query(queryPolicy, s6) - }, t) + }); err != nil { + return err + } // aggregation query on secondary index s7 := as.NewStatement(ns, set) - require.NoError(t, s7.SetFilter(filt)) - RecordsWaitAndCheck(func() (RecordsCheckable, as.Error) { + if sferr := s7.SetFilter(filt); sferr != nil { + return errSetFilter + } + if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) { return c.QueryAggregate(queryPolicy, s7, "/"+udfFile, udfFunc, as.StringValue(sibin)) - }, t) + }); err != nil { + return err + } // background udf query on secondary index s8 := as.NewStatement(ns, set) - require.NoError(t, s8.SetFilter(filt)) - doneWaitAndCheck(func() (doneCheckable, as.Error) { + if sferr := s8.SetFilter(filt); sferr != nil { + return errSetFilter + } + if err := doneWaitAndCheck(func() (doneCheckable, as.Error) { return c.ExecuteUDF(queryPolicy, s8, "/"+udfFile, udfFunc, as.StringValue(sibin)) - }, t) + }); err != nil { + return err + } // ops query on secondary index s9 := as.NewStatement(ns, set) - require.NoError(t, s9.SetFilter(filt)) + if sferr := s9.SetFilter(filt); sferr != nil { + return errSetFilter + } siwbin := as.NewBin("bin4", 400) siops := as.PutOp(siwbin) - doneWaitAndCheck(func() (doneCheckable, as.Error) { + if err := doneWaitAndCheck(func() (doneCheckable, as.Error) { return c.QueryExecute(queryPolicy, writePolicy, s9, siops) - }, t) + }); err != nil { + return err + } // perform a basic short secondary index query s10 := as.NewStatement(ns, set) - require.NoError(t, s10.SetFilter(filt)) - RecordsWaitAndCheck(func() (RecordsCheckable, as.Error) { + if sferr := s10.SetFilter(filt); sferr != nil { + return errSetFilter + } + if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) { return c.Query(queryPolicyShort, s10) - }, t) + }); err != nil { + return err + } // *** GeoJSON *** // @@ -239,124 +362,30 @@ func populateMetrics(t *testing.T, host *as.Host) { for i, b := range bins { key, _ := as.NewKey(ns, geoSet, i) err = c.Put(nil, key, b) - require.NoError(t, err, "failed to write geojson record") + if err != nil { + return errors.New("failed to write geojson record") + } } // create secondary index for geo queries itask, err = c.CreateIndex(writePolicy, ns, geoSet, "testset_geo_index", "coord", as.GEO2DSPHERE) - require.NoError(t, err, "failed to create sindex") - require.NoError(t, <-itask.OnComplete(), "failed running create index") + if err != nil { + return errCreateSindex + } + if err := <-itask.OnComplete(); err != nil { + return errRunningCreateSindex + } // run geoJSON query geoStm1 := as.NewStatement(ns, geoSet) geoFilt1 := as.NewGeoWithinRadiusFilter("coord", float64(13.009318762), float64(80.003157854), float64(50000)) - require.NoError(t, geoStm1.SetFilter(geoFilt1)) - RecordsWaitAndCheck(func() (RecordsCheckable, as.Error) { - return c.Query(queryPolicy, geoStm1) - }, t) -} - -func TestAerospikeIntegration(t *testing.T) { - t.Run("6.2", test6_2.run) - t.Run("6.2-cluster", test6_2Cluster.run) -} - -type testCase struct { - name string - container testcontainers.ContainerRequest - cfgMod func(defaultCfg *Config, endpoint string) -} - -var ( - test6_2 = testCase{ - name: "6.2", - container: testcontainers.ContainerRequest{ - Image: "aerospike:ce-6.2.0.2", - ExposedPorts: []string{"3000/tcp"}, - WaitingFor: wait.ForListeningPort("3000/tcp"), - }, - cfgMod: func(defaultCfg *Config, endpoint string) { - defaultCfg.Endpoint = endpoint - defaultCfg.ScraperControllerSettings.CollectionInterval = 100 * time.Millisecond - }, + if sferr := geoStm1.SetFilter(geoFilt1); sferr != nil { + return errSetFilter } - test6_2Cluster = testCase{ - name: "6.2", - container: testcontainers.ContainerRequest{ - Image: "aerospike:ce-6.2.0.2", - ExposedPorts: []string{"3000/tcp"}, - WaitingFor: wait.ForListeningPort("3000/tcp"), - }, - cfgMod: func(defaultCfg *Config, endpoint string) { - defaultCfg.Endpoint = endpoint - defaultCfg.ScraperControllerSettings.CollectionInterval = 100 * time.Millisecond - defaultCfg.CollectClusterMetrics = true - }, - } -) - -func (tt testCase) run(t *testing.T) { - container, host := getContainer(t, tt.container) - defer func() { - require.NoError(t, container.Terminate(context.Background())) - }() - - f := NewFactory() - cfg := f.CreateDefaultConfig().(*Config) - tt.cfgMod(cfg, host) - - consumer := new(consumertest.MetricsSink) - settings := receivertest.NewNopCreateSettings() - rcvr, err := f.CreateMetricsReceiver(context.Background(), settings, cfg, consumer) - require.NoError(t, err, "failed creating metrics receiver") - - require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()), "failed starting metrics receiver") - defer func() { - require.NoError(t, rcvr.Shutdown(context.Background())) - }() - - expectedFile := filepath.Join("testdata", "integration", "expected.yaml") - expectedMetrics, err := golden.ReadMetrics(expectedFile) - require.NoError(t, err, "failed reading expected metrics") - - compareOpts := []pmetrictest.CompareMetricsOption{ - pmetrictest.IgnoreMetricValues(), - pmetrictest.IgnoreResourceAttributeValue("aerospike.node.name"), - pmetrictest.IgnoreMetricDataPointsOrder(), - pmetrictest.IgnoreStartTimestamp(), - pmetrictest.IgnoreTimestamp(), + if err := recordsWaitAndCheck(func() (recordsCheckable, as.Error) { + return c.Query(queryPolicy, geoStm1) + }); err != nil { + return err } - - require.Eventually(t, scraperinttest.EqualsLatestMetrics(expectedMetrics, consumer, compareOpts), 30*time.Second, time.Second) -} - -func getContainer(t *testing.T, req testcontainers.ContainerRequest) (testcontainers.Container, string) { - require.NoError(t, req.Validate()) - ctx := context.Background() - - container, err := testcontainers.GenericContainer( - ctx, - testcontainers.GenericContainerRequest{ - ContainerRequest: req, - Started: true, - }) - require.NoError(t, err) - - mappedPort, err := container.MappedPort(ctx, "3000") - require.Nil(t, err) - - hostIP, err := container.Host(ctx) - require.Nil(t, err) - - host := fmt.Sprintf("%s:%s", hostIP, mappedPort.Port()) - ip, portStr, err := net.SplitHostPort(host) - require.NoError(t, err) - - port, err := strconv.Atoi(portStr) - require.NoError(t, err) - - asHost := as.NewHost(ip, port) - populateMetrics(t, asHost) - - return container, host + return nil }