From 1ad8190e94477c288eff9090a4ae0f0cf2740110 Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Wed, 24 Jan 2024 11:57:24 -0800 Subject: [PATCH 1/7] Fix issues created by wrapping a BatchLogRecordExportProcessor inside another processor. --- src/OpenTelemetry/Batch.cs | 19 ++++++-- .../Logs/BatchLogRecordExportProcessor.cs | 24 ++++++++-- src/OpenTelemetry/Logs/LogRecord.cs | 10 ++++ .../Logs/Pool/LogRecordSharedPool.cs | 11 ++++- .../Logs/Pool/LogRecordThreadStaticPool.cs | 9 +++- .../BatchLogRecordExportProcessorTests.cs | 48 ++++++++++++++++++- .../Logs/LogRecordSharedPoolTests.cs | 4 +- .../Logs/LogRecordThreadStaticPoolTests.cs | 4 +- 8 files changed, 112 insertions(+), 17 deletions(-) diff --git a/src/OpenTelemetry/Batch.cs b/src/OpenTelemetry/Batch.cs index 7beebfbd95b..b4ddd08cf17 100644 --- a/src/OpenTelemetry/Batch.cs +++ b/src/OpenTelemetry/Batch.cs @@ -71,7 +71,11 @@ public void Dispose() T item = this.circularBuffer.Read(); if (typeof(T) == typeof(LogRecord)) { - LogRecordSharedPool.Current.Return((LogRecord)(object)item); + var logRecord = (LogRecord)(object)item; + if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool) + { + LogRecordSharedPool.Current.Return(logRecord); + } } } } @@ -134,7 +138,11 @@ public struct Enumerator : IEnumerator if (currentItem != null) { - LogRecordSharedPool.Current.Return((LogRecord)(object)currentItem); + var logRecord = (LogRecord)(object)currentItem; + if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool) + { + LogRecordSharedPool.Current.Return(logRecord); + } } if (circularBuffer!.RemovedCount < enumerator.targetCount) @@ -215,7 +223,12 @@ public void Dispose() var currentItem = this.current; if (currentItem != null) { - LogRecordSharedPool.Current.Return((LogRecord)(object)currentItem); + var logRecord = (LogRecord)(object)currentItem; + if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool) + { + LogRecordSharedPool.Current.Return(logRecord); + } + this.current = null; } } diff --git a/src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs b/src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs index f62758dd793..7b930197d9a 100644 --- a/src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs +++ b/src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs @@ -42,13 +42,27 @@ public override void OnEnd(LogRecord data) // happen here. Debug.Assert(data != null, "LogRecord was null."); - data!.Buffer(); + switch (data.Source) + { + case LogRecord.LogRecordSource.FromSharedPool: + data.Buffer(); + data.AddReference(); + if (!this.TryExport(data)) + { + LogRecordSharedPool.Current.Return(data); + } - data.AddReference(); + break; + case LogRecord.LogRecordSource.CreatedManually: + data.Buffer(); + this.TryExport(data); + break; + default: + Debug.Assert(data.Source == LogRecord.LogRecordSource.FromThreadStaticPool, "LogRecord source was something unexpected"); - if (!this.TryExport(data)) - { - LogRecordSharedPool.Current.Return(data); + // Note: If we are using ThreadStatic pool we make a copy of the record. + this.TryExport(data.Copy()); + break; } } } diff --git a/src/OpenTelemetry/Logs/LogRecord.cs b/src/OpenTelemetry/Logs/LogRecord.cs index 877ab83d1dd..603fe452f27 100644 --- a/src/OpenTelemetry/Logs/LogRecord.cs +++ b/src/OpenTelemetry/Logs/LogRecord.cs @@ -21,6 +21,7 @@ public sealed class LogRecord internal IReadOnlyList>? AttributeData; internal List>? AttributeStorage; internal List? ScopeStorage; + internal LogRecordSource Source = LogRecordSource.CreatedManually; internal int PoolReferenceCount = int.MaxValue; private static readonly Action> AddScopeToBufferedList = (object? scope, List state) => @@ -80,6 +81,15 @@ internal LogRecord( } } + internal enum LogRecordSource + { +#pragma warning disable SA1602 // Enumeration items should be documented + CreatedManually, + FromThreadStaticPool, + FromSharedPool, +#pragma warning restore SA1602 // Enumeration items should be documented + } + /// /// Gets or sets the log timestamp. /// diff --git a/src/OpenTelemetry/Logs/Pool/LogRecordSharedPool.cs b/src/OpenTelemetry/Logs/Pool/LogRecordSharedPool.cs index ccaa47678a9..f5daa5a9904 100644 --- a/src/OpenTelemetry/Logs/Pool/LogRecordSharedPool.cs +++ b/src/OpenTelemetry/Logs/Pool/LogRecordSharedPool.cs @@ -1,6 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using OpenTelemetry.Internal; @@ -17,7 +18,7 @@ internal sealed class LogRecordSharedPool : ILogRecordPool private long rentIndex; private long returnIndex; - public LogRecordSharedPool(int capacity) + private LogRecordSharedPool(int capacity) { this.Capacity = capacity; this.pool = new LogRecord?[capacity]; @@ -54,18 +55,24 @@ public LogRecord Rent() continue; } + Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromSharedPool, "logRecord.Source was not FromSharedPool"); logRecord.ResetReferenceCount(); return logRecord; } } - var newLogRecord = new LogRecord(); + var newLogRecord = new LogRecord() + { + Source = LogRecord.LogRecordSource.FromSharedPool, + }; newLogRecord.ResetReferenceCount(); return newLogRecord; } public void Return(LogRecord logRecord) { + Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromSharedPool, "logRecord.Source was not FromSharedPool"); + if (logRecord.RemoveReference() != 0) { return; diff --git a/src/OpenTelemetry/Logs/Pool/LogRecordThreadStaticPool.cs b/src/OpenTelemetry/Logs/Pool/LogRecordThreadStaticPool.cs index eba6aad18a3..8d752967d04 100644 --- a/src/OpenTelemetry/Logs/Pool/LogRecordThreadStaticPool.cs +++ b/src/OpenTelemetry/Logs/Pool/LogRecordThreadStaticPool.cs @@ -1,6 +1,8 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +using System.Diagnostics; + namespace OpenTelemetry.Logs; internal sealed class LogRecordThreadStaticPool : ILogRecordPool @@ -19,15 +21,20 @@ public LogRecord Rent() var logRecord = Storage; if (logRecord != null) { + Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromThreadStaticPool, "logRecord.Source was not FromThreadStaticPool"); Storage = null; return logRecord; } - return new(); + return new() + { + Source = LogRecord.LogRecordSource.FromThreadStaticPool, + }; } public void Return(LogRecord logRecord) { + Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromThreadStaticPool, "logRecord.Source was not FromThreadStaticPool"); if (Storage == null) { LogRecordPoolHelper.Clear(logRecord); diff --git a/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs b/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs index c19997c5761..d285302fcfd 100644 --- a/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs +++ b/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs @@ -23,7 +23,9 @@ public void StateValuesAndScopeBufferingTest() using var scope = scopeProvider.Push(exportedItems); - var logRecord = new LogRecord(); + var pool = LogRecordSharedPool.Current; + + var logRecord = pool.Rent(); var state = new LogRecordTest.DisposingState("Hello world"); @@ -60,6 +62,7 @@ public void StateValuesAndScopeBufferingTest() processor.Shutdown(); Assert.Single(exportedItems); + Assert.Same(logRecord, exportedItems[0]); } [Fact] @@ -74,7 +77,9 @@ public void StateBufferingTest() using var processor = new BatchLogRecordExportProcessor( new InMemoryExporter(exportedItems)); - var logRecord = new LogRecord(); + var pool = LogRecordSharedPool.Current; + + var logRecord = pool.Rent(); var state = new LogRecordTest.DisposingState("Hello world"); logRecord.State = state; @@ -82,6 +87,9 @@ public void StateBufferingTest() processor.OnEnd(logRecord); processor.Shutdown(); + Assert.Single(exportedItems); + Assert.Same(logRecord, exportedItems[0]); + state.Dispose(); Assert.Throws(() => @@ -93,5 +101,41 @@ public void StateBufferingTest() } }); } + + [Fact] + public void CopyMadeWhenLogRecordIsFromThreadStaticPoolTest() + { + List exportedItems = new(); + + using var processor = new BatchLogRecordExportProcessor( + new InMemoryExporter(exportedItems)); + + var pool = LogRecordThreadStaticPool.Instance; + + var logRecord = pool.Rent(); + + processor.OnEnd(logRecord); + processor.Shutdown(); + + Assert.Single(exportedItems); + Assert.NotSame(logRecord, exportedItems[0]); + } + + [Fact] + public void LogRecordAddedToBatchIfNotFromAnyPoolTest() + { + List exportedItems = new(); + + using var processor = new BatchLogRecordExportProcessor( + new InMemoryExporter(exportedItems)); + + var logRecord = new LogRecord(); + + processor.OnEnd(logRecord); + processor.Shutdown(); + + Assert.Single(exportedItems); + Assert.Same(logRecord, exportedItems[0]); + } } #endif diff --git a/test/OpenTelemetry.Tests/Logs/LogRecordSharedPoolTests.cs b/test/OpenTelemetry.Tests/Logs/LogRecordSharedPoolTests.cs index 69aa4a958be..69c77ce3157 100644 --- a/test/OpenTelemetry.Tests/Logs/LogRecordSharedPoolTests.cs +++ b/test/OpenTelemetry.Tests/Logs/LogRecordSharedPoolTests.cs @@ -45,7 +45,7 @@ public void RentReturnTests() Assert.Equal(1, pool.Count); // Note: This is ignored because logRecord manually created has PoolReferenceCount = int.MaxValue. - LogRecord manualRecord = new(); + LogRecord manualRecord = new() { Source = LogRecord.LogRecordSource.FromSharedPool }; Assert.Equal(int.MaxValue, manualRecord.PoolReferenceCount); pool.Return(manualRecord); @@ -163,7 +163,7 @@ public async Task ExportTest(bool warmup) { for (int i = 0; i < LogRecordSharedPool.DefaultMaxPoolSize; i++) { - pool.Return(new LogRecord { PoolReferenceCount = 1 }); + pool.Return(new LogRecord { Source = LogRecord.LogRecordSource.FromSharedPool, PoolReferenceCount = 1 }); } } diff --git a/test/OpenTelemetry.Tests/Logs/LogRecordThreadStaticPoolTests.cs b/test/OpenTelemetry.Tests/Logs/LogRecordThreadStaticPoolTests.cs index ff337bccd4f..8478aee7ea0 100644 --- a/test/OpenTelemetry.Tests/Logs/LogRecordThreadStaticPoolTests.cs +++ b/test/OpenTelemetry.Tests/Logs/LogRecordThreadStaticPoolTests.cs @@ -22,13 +22,13 @@ public void RentReturnTests() Assert.NotNull(LogRecordThreadStaticPool.Storage); Assert.Equal(logRecord, LogRecordThreadStaticPool.Storage); - LogRecordThreadStaticPool.Instance.Return(new()); + LogRecordThreadStaticPool.Instance.Return(new() { Source = LogRecord.LogRecordSource.FromThreadStaticPool }); Assert.NotNull(LogRecordThreadStaticPool.Storage); Assert.Equal(logRecord, LogRecordThreadStaticPool.Storage); LogRecordThreadStaticPool.Storage = null; - var manual = new LogRecord(); + var manual = new LogRecord() { Source = LogRecord.LogRecordSource.FromThreadStaticPool }; LogRecordThreadStaticPool.Instance.Return(manual); Assert.NotNull(LogRecordThreadStaticPool.Storage); Assert.Equal(manual, LogRecordThreadStaticPool.Storage); From 1fadd16cd4f9dc5391f0773ad6b57a005bacae1f Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Wed, 24 Jan 2024 15:14:58 -0800 Subject: [PATCH 2/7] CHANGELOG patch. --- src/OpenTelemetry/CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/OpenTelemetry/CHANGELOG.md b/src/OpenTelemetry/CHANGELOG.md index 3809ccee889..4f153b0b6fa 100644 --- a/src/OpenTelemetry/CHANGELOG.md +++ b/src/OpenTelemetry/CHANGELOG.md @@ -13,6 +13,11 @@ state for cumulative temporality. [#5230](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5230) +* Fixed an issue causing `LogRecord`s to be incorrectly reused when wrapping an + instance of `BatchLogRecordExportProcessor` inside another + `BaseProcessor`. + [#5255](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5255) + ## 1.7.0 Released 2023-Dec-08 From 21407581d084f44a3244ffb78206bdded5eb80b7 Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Thu, 25 Jan 2024 10:23:22 -0800 Subject: [PATCH 3/7] Warning fix. --- src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs b/src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs index 7b930197d9a..c1e341585a8 100644 --- a/src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs +++ b/src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs @@ -42,7 +42,7 @@ public override void OnEnd(LogRecord data) // happen here. Debug.Assert(data != null, "LogRecord was null."); - switch (data.Source) + switch (data!.Source) { case LogRecord.LogRecordSource.FromSharedPool: data.Buffer(); From 6f83ba9a7b1f134a9a2ac3f5262a1c039035de74 Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Thu, 25 Jan 2024 10:33:09 -0800 Subject: [PATCH 4/7] Code review. --- .../Logs/Pool/LogRecordThreadStaticPool.cs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/OpenTelemetry/Logs/Pool/LogRecordThreadStaticPool.cs b/src/OpenTelemetry/Logs/Pool/LogRecordThreadStaticPool.cs index 8d752967d04..8763cf8679d 100644 --- a/src/OpenTelemetry/Logs/Pool/LogRecordThreadStaticPool.cs +++ b/src/OpenTelemetry/Logs/Pool/LogRecordThreadStaticPool.cs @@ -23,13 +23,16 @@ public LogRecord Rent() { Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromThreadStaticPool, "logRecord.Source was not FromThreadStaticPool"); Storage = null; - return logRecord; } - - return new() + else { - Source = LogRecord.LogRecordSource.FromThreadStaticPool, - }; + logRecord = new() + { + Source = LogRecord.LogRecordSource.FromThreadStaticPool, + }; + } + + return logRecord; } public void Return(LogRecord logRecord) From 7154dd6bf337cd981b7b40058fefbdfb486b11f2 Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Thu, 25 Jan 2024 12:59:13 -0800 Subject: [PATCH 5/7] Document enum members. --- src/OpenTelemetry/Logs/LogRecord.cs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/OpenTelemetry/Logs/LogRecord.cs b/src/OpenTelemetry/Logs/LogRecord.cs index 603fe452f27..f98363ab766 100644 --- a/src/OpenTelemetry/Logs/LogRecord.cs +++ b/src/OpenTelemetry/Logs/LogRecord.cs @@ -83,11 +83,20 @@ internal LogRecord( internal enum LogRecordSource { -#pragma warning disable SA1602 // Enumeration items should be documented + /// + /// A created manually. + /// CreatedManually, + + /// + /// A rented from the . + /// FromThreadStaticPool, + + /// + /// A rented from the . + /// FromSharedPool, -#pragma warning restore SA1602 // Enumeration items should be documented } /// From e7a0f7efa74883399345ff4e394ed9d43ad5f82f Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Fri, 26 Jan 2024 16:08:02 -0800 Subject: [PATCH 6/7] CHANGELOG tweak. --- src/OpenTelemetry/CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/OpenTelemetry/CHANGELOG.md b/src/OpenTelemetry/CHANGELOG.md index 4f153b0b6fa..4a284473573 100644 --- a/src/OpenTelemetry/CHANGELOG.md +++ b/src/OpenTelemetry/CHANGELOG.md @@ -15,7 +15,8 @@ * Fixed an issue causing `LogRecord`s to be incorrectly reused when wrapping an instance of `BatchLogRecordExportProcessor` inside another - `BaseProcessor`. + `BaseProcessor` which leads to missing or incorrect data during + export. [#5255](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5255) ## 1.7.0 From 2fe047f0479e7c618869001e0622973e289e059c Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Mon, 29 Jan 2024 09:53:25 -0800 Subject: [PATCH 7/7] Test tweaks. --- .../Logs/LogRecordSharedPoolTests.cs | 19 +++++++++++-------- .../Logs/LogRecordThreadStaticPoolTests.cs | 7 ++++--- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/test/OpenTelemetry.Tests/Logs/LogRecordSharedPoolTests.cs b/test/OpenTelemetry.Tests/Logs/LogRecordSharedPoolTests.cs index 69c77ce3157..d4be69397b1 100644 --- a/test/OpenTelemetry.Tests/Logs/LogRecordSharedPoolTests.cs +++ b/test/OpenTelemetry.Tests/Logs/LogRecordSharedPoolTests.cs @@ -44,20 +44,23 @@ public void RentReturnTests() Assert.Equal(1, pool.Count); - // Note: This is ignored because logRecord manually created has PoolReferenceCount = int.MaxValue. - LogRecord manualRecord = new() { Source = LogRecord.LogRecordSource.FromSharedPool }; - Assert.Equal(int.MaxValue, manualRecord.PoolReferenceCount); - pool.Return(manualRecord); + var logRecordWithReferencesAdded = pool.Rent(); - Assert.Equal(1, pool.Count); + // Note: This record won't be returned to the pool because we add a reference to it. + logRecordWithReferencesAdded.AddReference(); + + Assert.Equal(2, logRecordWithReferencesAdded.PoolReferenceCount); + pool.Return(logRecordWithReferencesAdded); + + Assert.Equal(0, pool.Count); pool.Return(logRecord2); - Assert.Equal(2, pool.Count); + Assert.Equal(1, pool.Count); logRecord1 = pool.Rent(); Assert.NotNull(logRecord1); - Assert.Equal(1, pool.Count); + Assert.Equal(0, pool.Count); logRecord2 = pool.Rent(); Assert.NotNull(logRecord2); @@ -70,7 +73,7 @@ public void RentReturnTests() pool.Return(logRecord1); pool.Return(logRecord2); - pool.Return(logRecord3); + pool.Return(logRecord3); // <- Discarded due to pool size of 2 pool.Return(logRecord4); // <- Discarded due to pool size of 2 Assert.Equal(2, pool.Count); diff --git a/test/OpenTelemetry.Tests/Logs/LogRecordThreadStaticPoolTests.cs b/test/OpenTelemetry.Tests/Logs/LogRecordThreadStaticPoolTests.cs index 8478aee7ea0..59c0b53454e 100644 --- a/test/OpenTelemetry.Tests/Logs/LogRecordThreadStaticPoolTests.cs +++ b/test/OpenTelemetry.Tests/Logs/LogRecordThreadStaticPoolTests.cs @@ -22,16 +22,17 @@ public void RentReturnTests() Assert.NotNull(LogRecordThreadStaticPool.Storage); Assert.Equal(logRecord, LogRecordThreadStaticPool.Storage); + // Note: This record will be ignored because there is already something in the ThreadStatic storage. LogRecordThreadStaticPool.Instance.Return(new() { Source = LogRecord.LogRecordSource.FromThreadStaticPool }); Assert.NotNull(LogRecordThreadStaticPool.Storage); Assert.Equal(logRecord, LogRecordThreadStaticPool.Storage); LogRecordThreadStaticPool.Storage = null; - var manual = new LogRecord() { Source = LogRecord.LogRecordSource.FromThreadStaticPool }; - LogRecordThreadStaticPool.Instance.Return(manual); + var newLogRecord = new LogRecord() { Source = LogRecord.LogRecordSource.FromThreadStaticPool }; + LogRecordThreadStaticPool.Instance.Return(newLogRecord); Assert.NotNull(LogRecordThreadStaticPool.Storage); - Assert.Equal(manual, LogRecordThreadStaticPool.Storage); + Assert.Equal(newLogRecord, LogRecordThreadStaticPool.Storage); } [Fact]