From 4f7a6b1e32ac5c2c0f5721a18cbd01f4709c5a7b Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Fri, 31 May 2024 16:33:41 -0500 Subject: [PATCH] [chore] Add internal pdatautil.GroupByResourceLogs (#33311) This is a first step towards a [Flatten/Unflatten workaround](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32080) for OTTL's data corruption issue, specifically this would support Unflatten. The workaround was discussed in a recent collector SIG and it sounded like it would be acceptable if available behind a feature gate and only for the transform processor. If this is accepted I'll work on a Flatten utility next, then integrate them into the tranform processor to prove end-to-end. Finally, I'll implement similar features for metrics and traces. This PR adds an internal utility package which simplifies grouping logs by resource and scope. I'm proposing this initially in `internal/pdatautil` but the functionality could eventually be merged into `pkg/pdatautil` if we find it useful elsewhere. --- .github/CODEOWNERS | 1 + .github/ISSUE_TEMPLATE/bug_report.yaml | 1 + .github/ISSUE_TEMPLATE/feature_request.yaml | 1 + .github/ISSUE_TEMPLATE/other.yaml | 1 + .github/ISSUE_TEMPLATE/unmaintained.yaml | 1 + internal/pdatautil/Makefile | 1 + internal/pdatautil/go.mod | 34 ++ internal/pdatautil/go.sum | 81 ++++ internal/pdatautil/logs.go | 103 +++++ internal/pdatautil/logs_test.go | 475 ++++++++++++++++++++ internal/pdatautil/metadata.yaml | 3 + versions.yaml | 1 + 12 files changed, 703 insertions(+) create mode 100644 internal/pdatautil/Makefile create mode 100644 internal/pdatautil/go.mod create mode 100644 internal/pdatautil/go.sum create mode 100644 internal/pdatautil/logs.go create mode 100644 internal/pdatautil/logs_test.go create mode 100644 internal/pdatautil/metadata.yaml diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 935cb4aa35ca..1bdb4f8fe2fc 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -131,6 +131,7 @@ internal/k8stest/ @open-telemetry/collect internal/kafka/ @open-telemetry/collector-contrib-approvers @pavolloffay @MovieStoreGuy internal/kubelet/ @open-telemetry/collector-contrib-approvers @dmitryax internal/metadataproviders/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @dashpole +internal/pdatautil/ @open-telemetry/collector-contrib-approvers @djaglowski internal/sharedcomponent/ @open-telemetry/collector-contrib-approvers @open-telemetry/collector-approvers internal/splunk/ @open-telemetry/collector-contrib-approvers @dmitryax internal/sqlquery/ @open-telemetry/collector-contrib-approvers @crobert-1 @dmitryax diff --git a/.github/ISSUE_TEMPLATE/bug_report.yaml b/.github/ISSUE_TEMPLATE/bug_report.yaml index 1930ed7f5a0a..a84793b9912a 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yaml +++ b/.github/ISSUE_TEMPLATE/bug_report.yaml @@ -129,6 +129,7 @@ body: - internal/kafka - internal/kubelet - internal/metadataproviders + - internal/pdatautil - internal/sharedcomponent - internal/splunk - internal/sqlquery diff --git a/.github/ISSUE_TEMPLATE/feature_request.yaml b/.github/ISSUE_TEMPLATE/feature_request.yaml index 4ae0299d2098..12789d412fe8 100644 --- a/.github/ISSUE_TEMPLATE/feature_request.yaml +++ b/.github/ISSUE_TEMPLATE/feature_request.yaml @@ -123,6 +123,7 @@ body: - internal/kafka - internal/kubelet - internal/metadataproviders + - internal/pdatautil - internal/sharedcomponent - internal/splunk - internal/sqlquery diff --git a/.github/ISSUE_TEMPLATE/other.yaml b/.github/ISSUE_TEMPLATE/other.yaml index 9e0d7ffbcacb..50939096177d 100644 --- a/.github/ISSUE_TEMPLATE/other.yaml +++ b/.github/ISSUE_TEMPLATE/other.yaml @@ -123,6 +123,7 @@ body: - internal/kafka - internal/kubelet - internal/metadataproviders + - internal/pdatautil - internal/sharedcomponent - internal/splunk - internal/sqlquery diff --git a/.github/ISSUE_TEMPLATE/unmaintained.yaml b/.github/ISSUE_TEMPLATE/unmaintained.yaml index fbe662a592e7..7252c63b3fee 100644 --- a/.github/ISSUE_TEMPLATE/unmaintained.yaml +++ b/.github/ISSUE_TEMPLATE/unmaintained.yaml @@ -128,6 +128,7 @@ body: - internal/kafka - internal/kubelet - internal/metadataproviders + - internal/pdatautil - internal/sharedcomponent - internal/splunk - internal/sqlquery diff --git a/internal/pdatautil/Makefile b/internal/pdatautil/Makefile new file mode 100644 index 000000000000..ded7a36092dc --- /dev/null +++ b/internal/pdatautil/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common diff --git a/internal/pdatautil/go.mod b/internal/pdatautil/go.mod new file mode 100644 index 000000000000..8d08bc91d734 --- /dev/null +++ b/internal/pdatautil/go.mod @@ -0,0 +1,34 @@ +module github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil + +go 1.21.0 + +require ( + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.101.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.101.0 + github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/collector/pdata v1.8.1-0.20240529223953-eaab76e46d38 +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + golang.org/x/net v0.24.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect + google.golang.org/grpc v1.64.0 // indirect + google.golang.org/protobuf v1.34.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/internal/pdatautil/go.sum b/internal/pdatautil/go.sum new file mode 100644 index 000000000000..3b711e6d8071 --- /dev/null +++ b/internal/pdatautil/go.sum @@ -0,0 +1,81 @@ +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/collector/pdata v1.8.1-0.20240529223953-eaab76e46d38 h1:WOatQWY4MmVW4N4J3exbCb0RinS3B5pqKhcRjNWSNk8= +go.opentelemetry.io/collector/pdata v1.8.1-0.20240529223953-eaab76e46d38/go.mod h1:vk7LrfpyVpGZrRWcpjyy0DDZzL3SZiYMQxfap25551w= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= +golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= +google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/pdatautil/logs.go b/internal/pdatautil/logs.go new file mode 100644 index 000000000000..b27557d09e55 --- /dev/null +++ b/internal/pdatautil/logs.go @@ -0,0 +1,103 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pdatautil // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" +) + +// GroupByResourceLogs groups ScopeLogs by Resource. Modifications are made in place. +func GroupByResourceLogs(rls plog.ResourceLogsSlice) { + // Hash each ResourceLogs based on identifying information. + resourceHashes := make([][16]byte, rls.Len()) + for i := 0; i < rls.Len(); i++ { + resourceHashes[i] = pdatautil.MapHash(rls.At(i).Resource().Attributes()) + } + + // Find the first occurrence of each hash and note the index. + firstScopeIndex := make([]int, rls.Len()) + for i := 0; i < rls.Len(); i++ { + firstScopeIndex[i] = i + for j := 0; j < i; j++ { + if resourceHashes[i] == resourceHashes[j] { + firstScopeIndex[i] = j + break + } + } + } + + // Merge Resources with the same hash. + for i := 0; i < rls.Len(); i++ { + if i == firstScopeIndex[i] { + // This is the first occurrence of this hash. + continue + } + rls.At(i).ScopeLogs().MoveAndAppendTo(rls.At(firstScopeIndex[i]).ScopeLogs()) + } + + // Remove the ResourceLogs which were merged onto others. + i := 0 + rls.RemoveIf(func(plog.ResourceLogs) bool { + remove := i != firstScopeIndex[i] + i++ + return remove + }) + + // Merge ScopeLogs within each ResourceLogs. + for i := 0; i < rls.Len(); i++ { + GroupByScopeLogs(rls.At(i).ScopeLogs()) + } +} + +// GroupByScopeLogs groups LogRecords by scope. Modifications are made in place. +func GroupByScopeLogs(sls plog.ScopeLogsSlice) { + // Hash each ScopeLogs based on identifying information. + scopeHashes := make([][16]byte, sls.Len()) + for i := 0; i < sls.Len(); i++ { + scopeHashes[i] = HashScopeLogs(sls.At(i)) + } + + // Find the first occurrence of each hash and note the index. + firstScopeIndex := make([]int, sls.Len()) + for i := 0; i < sls.Len(); i++ { + firstScopeIndex[i] = i + for j := 0; j < i; j++ { + if scopeHashes[i] == scopeHashes[j] { + firstScopeIndex[i] = j + break + } + } + } + + // Merge ScopeLogs with the same hash. + for i := 0; i < sls.Len(); i++ { + if i == firstScopeIndex[i] { + // This is the first occurrence of this hash. + continue + } + sls.At(i).LogRecords().MoveAndAppendTo(sls.At(firstScopeIndex[i]).LogRecords()) + } + + // Remove the ScopeLogs which were merged onto others. + i := 0 + sls.RemoveIf(func(plog.ScopeLogs) bool { + remove := i != firstScopeIndex[i] + i++ + return remove + }) +} + +// Creates a hash based on the ScopeLogs attributes, name, and version +func HashScopeLogs(sl plog.ScopeLogs) [16]byte { + scopeHash := pcommon.NewMap() + scopeHash.PutStr("schema_url", sl.SchemaUrl()) + scopeHash.PutStr("name", sl.Scope().Name()) + scopeHash.PutStr("version", sl.Scope().Version()) + attrHash := pdatautil.MapHash(sl.Scope().Attributes()) + scopeHash.PutStr("attributes_hash", string(attrHash[:])) + return pdatautil.MapHash(scopeHash) +} diff --git a/internal/pdatautil/logs_test.go b/internal/pdatautil/logs_test.go new file mode 100644 index 000000000000..e45632bdafc1 --- /dev/null +++ b/internal/pdatautil/logs_test.go @@ -0,0 +1,475 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pdatautil + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" +) + +func TestGroupByResourceLogs(t *testing.T) { + testCases := []struct { + name string + input []resourceLogs + expected []resourceLogs + }{ + { + name: "empty", + input: []resourceLogs{}, + expected: []resourceLogs{}, + }, + { + name: "single", + input: []resourceLogs{newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + ), + }, + expected: []resourceLogs{newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + ), + }, + }, + { + name: "distinct", + input: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + newScopeLogs(22, 201, 202, 203), + ), + newResourceLogs(2, + newScopeLogs(33, 301, 302, 303), + newScopeLogs(44, 401, 402, 403), + ), + }, + expected: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + newScopeLogs(22, 201, 202, 203), + ), + newResourceLogs(2, + newScopeLogs(33, 301, 302, 303), + newScopeLogs(44, 401, 402, 403), + ), + }, + }, + { + name: "simple_merge_scopes", + input: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + newScopeLogs(11, 104, 105, 106), + ), + newResourceLogs(2, + newScopeLogs(22, 201, 202, 203), + newScopeLogs(22, 204, 205, 206), + ), + }, + expected: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103, 104, 105, 106), + ), + newResourceLogs(2, + newScopeLogs(22, 201, 202, 203, 204, 205, 206), + ), + }, + }, + { + name: "merge_scopes_on_some_resources", + input: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + newScopeLogs(11, 104, 105, 106), + ), + newResourceLogs(2, + newScopeLogs(22, 201, 202, 203), + newScopeLogs(33, 301, 302, 303), + ), + newResourceLogs(3, + newScopeLogs(44, 401, 402, 403), + newScopeLogs(44, 404, 405, 406), + ), + }, + expected: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103, 104, 105, 106), + ), + newResourceLogs(2, + newScopeLogs(22, 201, 202, 203), + newScopeLogs(33, 301, 302, 303), + ), + newResourceLogs(3, + newScopeLogs(44, 401, 402, 403, 404, 405, 406), + ), + }, + }, + { + name: "leave_same_scopes_on_distinct_resources", + input: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + ), + newResourceLogs(2, + newScopeLogs(11, 101, 102, 103), + ), + }, + expected: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + ), + newResourceLogs(2, + newScopeLogs(11, 101, 102, 103), + ), + }, + }, + { + name: "merge_scopes_within_distinct_resources", + input: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + newScopeLogs(11, 104, 105, 106), + ), + newResourceLogs(2, + newScopeLogs(11, 101, 102, 103), + newScopeLogs(11, 104, 105, 106), + ), + }, + expected: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103, 104, 105, 106), + ), + newResourceLogs(2, + newScopeLogs(11, 101, 102, 103, 104, 105, 106), + ), + }, + }, + { + name: "merge_resources_preserve_distinct_scopes", + input: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + newScopeLogs(22, 201, 202, 203), + ), + newResourceLogs(1, + newScopeLogs(33, 301, 302, 303), + newScopeLogs(44, 401, 402, 403), + ), + }, + expected: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + newScopeLogs(22, 201, 202, 203), + newScopeLogs(33, 301, 302, 303), + newScopeLogs(44, 401, 402, 403), + ), + }, + }, + { + name: "merge_interleaved_scopes_within_resource", + input: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + newScopeLogs(22, 201, 202, 203), + newScopeLogs(11, 104, 105, 106), + newScopeLogs(22, 204, 205, 206), + ), + }, + expected: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103, 104, 105, 106), + newScopeLogs(22, 201, 202, 203, 204, 205, 206), + ), + }, + }, + { + name: "merge_interleaved_scopes_across_resources", + input: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + newScopeLogs(22, 201, 202, 203), + ), + newResourceLogs(1, + newScopeLogs(11, 104, 105, 106), + newScopeLogs(22, 204, 205, 206), + ), + }, + expected: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103, 104, 105, 106), + newScopeLogs(22, 201, 202, 203, 204, 205, 206), + ), + }, + }, + { + name: "merge_interleaved_scopes_across_interleaved_resources", + input: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + newScopeLogs(22, 201, 202, 203), + ), + newResourceLogs(2, + newScopeLogs(33, 301, 302, 303), + newScopeLogs(44, 401, 402, 403), + ), + newResourceLogs(1, + newScopeLogs(11, 104, 105, 106), + newScopeLogs(22, 204, 205, 206), + ), + newResourceLogs(2, + newScopeLogs(33, 304, 305, 306), + newScopeLogs(44, 404, 405, 406), + ), + }, + expected: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103, 104, 105, 106), + newScopeLogs(22, 201, 202, 203, 204, 205, 206), + ), + newResourceLogs(2, + newScopeLogs(33, 301, 302, 303, 304, 305, 306), + newScopeLogs(44, 401, 402, 403, 404, 405, 406), + ), + }, + }, + { + name: "merge_some_scopes_across_some_resources", + input: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + newScopeLogs(22, 201, 202, 203), + ), + newResourceLogs(2, + newScopeLogs(33, 301, 302, 303), + newScopeLogs(11, 104, 105, 106), + ), + newResourceLogs(1, + newScopeLogs(33, 301, 302, 303), + newScopeLogs(11, 104, 105, 106), + ), + newResourceLogs(2, + newScopeLogs(33, 304, 305, 306), + newScopeLogs(44, 404, 405, 406), + ), + }, + expected: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103, 104, 105, 106), + newScopeLogs(22, 201, 202, 203), + newScopeLogs(33, 301, 302, 303), + ), + newResourceLogs(2, + newScopeLogs(33, 301, 302, 303, 304, 305, 306), + newScopeLogs(11, 104, 105, 106), + newScopeLogs(44, 404, 405, 406), + ), + }, + }, + { + name: "merge_all_resources_and_scopes", + input: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102), + newScopeLogs(11, 103, 104), + ), + newResourceLogs(1, + newScopeLogs(11, 105, 106), + newScopeLogs(11, 107, 108), + ), + newResourceLogs(1, + newScopeLogs(11, 109, 110), + newScopeLogs(11, 111, 112), + ), + }, + expected: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112), + ), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actual := plog.NewResourceLogsSlice() + for _, r := range tc.input { + r.setup(actual.AppendEmpty()) + } + expected := plog.NewResourceLogsSlice() + for _, r := range tc.expected { + r.setup(expected.AppendEmpty()) + } + + GroupByResourceLogs(actual) + assert.Equal(t, expected.Len(), actual.Len()) + for i := 0; i < expected.Len(); i++ { + assert.NoError(t, plogtest.CompareResourceLogs(expected.At(i), actual.At(i))) + } + }) + } +} + +func TestGroupByScopeLogs(t *testing.T) { + testCases := []struct { + name string + input []scopeLogs + expected []scopeLogs + }{ + { + name: "empty", + input: []scopeLogs{}, + expected: []scopeLogs{}, + }, + { + name: "single", + input: []scopeLogs{ + newScopeLogs(1, 11, 12, 13), + }, + expected: []scopeLogs{ + newScopeLogs(1, 11, 12, 13), + }, + }, + { + name: "distinct", + input: []scopeLogs{ + newScopeLogs(1, 11, 12, 13), + newScopeLogs(2, 21, 22, 23), + }, + expected: []scopeLogs{ + newScopeLogs(1, 11, 12, 13), + newScopeLogs(2, 21, 22, 23), + }, + }, + { + name: "simple_merge", + input: []scopeLogs{ + newScopeLogs(1, 11, 12, 13), + newScopeLogs(1, 14, 15, 16), + }, + expected: []scopeLogs{ + newScopeLogs(1, 11, 12, 13, 14, 15, 16), + }, + }, + { + name: "interleaved", + input: []scopeLogs{ + newScopeLogs(1, 11, 12, 13), + newScopeLogs(2, 21, 22, 23), + newScopeLogs(1, 14, 15, 16), + newScopeLogs(2, 24, 25, 26), + }, + expected: []scopeLogs{ + newScopeLogs(1, 11, 12, 13, 14, 15, 16), + newScopeLogs(2, 21, 22, 23, 24, 25, 26), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actual := plog.NewScopeLogsSlice() + for _, s := range tc.input { + s.setup(actual.AppendEmpty()) + } + expected := plog.NewScopeLogsSlice() + for _, s := range tc.expected { + s.setup(expected.AppendEmpty()) + } + + GroupByScopeLogs(actual) + assert.Equal(t, expected.Len(), actual.Len()) + for i := 0; i < expected.Len(); i++ { + assert.NoError(t, plogtest.CompareScopeLogs(expected.At(i), actual.At(i))) + } + }) + } +} + +func TestHashScopeLogs(t *testing.T) { + schemas := []string{"", "schema_1", "schema_2"} + names := []string{"", "name_1", "name_2"} + versions := []string{"", "version_1", "version_2"} + attributes := []map[string]any{ + {}, + {"attr.name": "attr_1"}, + {"attr.name": "attr_2"}, + {"attr.name": "attr_1", "other.name": "other"}, + } + + distinctScopeLogs := make([]plog.ScopeLogs, 0, len(schemas)*len(names)*len(versions)*len(attributes)) + + for _, schema := range schemas { + for _, name := range names { + for _, version := range versions { + for _, attr := range attributes { + sl := plog.NewScopeLogs() + sl.SetSchemaUrl(schema) + ss := sl.Scope() + ss.SetName(name) + ss.SetVersion(version) + require.NoError(t, ss.Attributes().FromRaw(attr)) + distinctScopeLogs = append(distinctScopeLogs, sl) + } + } + } + } + + for i, slOne := range distinctScopeLogs { + for j, slTwo := range distinctScopeLogs { + if i == j { + assert.Equal(t, HashScopeLogs(slOne), HashScopeLogs(slTwo)) + } else { + assert.NotEqual(t, HashScopeLogs(slOne), HashScopeLogs(slTwo)) + } + } + } +} + +type resourceLogs struct { + num int + scopes []scopeLogs +} + +func newResourceLogs(num int, scopes ...scopeLogs) resourceLogs { + return resourceLogs{ + num: num, + scopes: scopes, + } +} + +func (r resourceLogs) setup(rl plog.ResourceLogs) { + rl.Resource().Attributes().PutStr("attr.name", fmt.Sprintf("attr_%d", r.num)) + for _, s := range r.scopes { + s.setup(rl.ScopeLogs().AppendEmpty()) + } +} + +type scopeLogs struct { + num int + recordNums []int +} + +func newScopeLogs(num int, recordNums ...int) scopeLogs { + return scopeLogs{ + num: num, + recordNums: recordNums, + } +} + +func (s scopeLogs) setup(sl plog.ScopeLogs) { + sl.SetSchemaUrl(fmt.Sprintf("schema_%d", s.num)) + ss := sl.Scope() + ss.SetName(fmt.Sprintf("name_%d", s.num)) + ss.SetVersion(fmt.Sprintf("version_%d", s.num)) + ss.Attributes().PutStr("attr.name", fmt.Sprintf("attr_%d", s.num)) + for _, n := range s.recordNums { + lr := sl.LogRecords().AppendEmpty() + lr.Attributes().PutInt("num", int64(n)) + lr.Body().SetInt(int64(n)) + } +} diff --git a/internal/pdatautil/metadata.yaml b/internal/pdatautil/metadata.yaml new file mode 100644 index 000000000000..2f7e4ffd565a --- /dev/null +++ b/internal/pdatautil/metadata.yaml @@ -0,0 +1,3 @@ +status: + codeowners: + active: [djaglowski] diff --git a/versions.yaml b/versions.yaml index 64068305aae4..99530970c193 100644 --- a/versions.yaml +++ b/versions.yaml @@ -128,6 +128,7 @@ module-sets: - github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka - github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet - github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders + - github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil - github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent - github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk - github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery