diff --git a/pkg/stanza/fileconsumer/benchmark_test.go b/pkg/stanza/fileconsumer/benchmark_test.go index c998d8d977d1..e9bcfd51d88e 100644 --- a/pkg/stanza/fileconsumer/benchmark_test.go +++ b/pkg/stanza/fileconsumer/benchmark_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" ) @@ -150,7 +151,7 @@ func BenchmarkFileInput(b *testing.B) { var files []*benchFile for _, path := range bench.paths { - file := openFile(b, filepath.Join(rootDir, path)) + file := filetest.OpenFile(b, filepath.Join(rootDir, path)) files = append(files, simpleTextFile(b, file)) } diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 7f14f932e872..b23415dd7741 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -20,6 +20,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/attrs" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" @@ -38,9 +39,9 @@ func TestDefaultBehaviors(t *testing.T) { cfg := NewConfig().includeDir(tempDir) operator, sink := testManager(t, cfg) - temp := openTemp(t, tempDir) + temp := filetest.OpenTemp(t, tempDir) tempName := filepath.Base(temp.Name()) - writeString(t, temp, " testlog1 \n") + filetest.WriteString(t, temp, " testlog1 \n") require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { @@ -51,14 +52,14 @@ func TestDefaultBehaviors(t *testing.T) { sink.ExpectNoCallsUntil(t, defaultFlushPeriod) // Complete token should be emitted quickly - writeString(t, temp, " testlog2 \n") + filetest.WriteString(t, temp, " testlog2 \n") token, attributes := sink.NextCall(t) assert.Equal(t, []byte("testlog2"), token) assert.Len(t, attributes, 1) assert.Equal(t, tempName, attributes[attrs.LogFileName]) // Incomplete token should not be emitted until after flush period - writeString(t, temp, " testlog3 ") + filetest.WriteString(t, temp, " testlog3 ") sink.ExpectNoCallsUntil(t, defaultFlushPeriod/2) time.Sleep(defaultFlushPeriod) @@ -79,7 +80,7 @@ See this issue for details: https://github.com/census-instrumentation/opencensus cfg.StartAt = "beginning" operator, _ := testManager(t, cfg) - _ = openTemp(t, tempDir) + _ = filetest.OpenTemp(t, tempDir) require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) require.NoError(t, operator.Stop()) } @@ -99,8 +100,8 @@ func TestAddFileFields(t *testing.T) { operator, sink := testManager(t, cfg) // Create a file, then start - temp := openTemp(t, tempDir) - writeString(t, temp, "testlog\n") + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "testlog\n") require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { @@ -146,7 +147,7 @@ func TestAddFileResolvedFields(t *testing.T) { require.NoError(t, err) // Populate data - writeString(t, file, "testlog\n") + filetest.WriteString(t, file, "testlog\n") // Resolve path realPath, err := filepath.EvalSymlinks(file.Name()) @@ -222,7 +223,7 @@ func TestAddFileResolvedFieldsWithChangeOfSymlinkTarget(t *testing.T) { require.NoError(t, err) // Populate data - writeString(t, file1, "testlog\n") + filetest.WriteString(t, file1, "testlog\n") require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { @@ -242,7 +243,7 @@ func TestAddFileResolvedFieldsWithChangeOfSymlinkTarget(t *testing.T) { require.NoError(t, err) // Populate data (different content due to fingerprint) - writeString(t, file2, "testlog2\n") + filetest.WriteString(t, file2, "testlog2\n") _, attributes = sink.NextCall(t) require.Equal(t, filepath.Base(symLinkPath), attributes[attrs.LogFileName]) @@ -264,7 +265,7 @@ func TestFileFieldsUpdatedAfterRestart(t *testing.T) { // Create a file, then start temp, err := os.CreateTemp(tempDir, "") require.NoError(t, err) - writeString(t, temp, "testlog1\n") + filetest.WriteString(t, temp, "testlog1\n") persister := testutil.NewUnscopedMockPersister() require.NoError(t, op1.Start(persister)) @@ -282,8 +283,8 @@ func TestFileFieldsUpdatedAfterRestart(t *testing.T) { newPath := temp.Name() + "_renamed" require.NoError(t, os.Rename(temp.Name(), newPath)) - temp = openFile(t, newPath) - writeString(t, temp, "testlog2\n") + temp = filetest.OpenFile(t, newPath) + filetest.WriteString(t, temp, "testlog2\n") op2, sink2 := testManager(t, cfg) @@ -310,8 +311,8 @@ func TestReadExistingLogs(t *testing.T) { operator, sink := testManager(t, cfg) // Create a file, then start - temp := openTemp(t, tempDir) - writeString(t, temp, "testlog1\ntestlog2\n") + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "testlog1\ntestlog2\n") require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { @@ -378,7 +379,7 @@ func TestReadUsingNopEncoding(t *testing.T) { operator, sink := testManager(t, cfg) // Create a file, then start - temp := openTemp(t, tempDir) + temp := filetest.OpenTemp(t, tempDir) bytesWritten, err := temp.Write(tc.input) require.Greater(t, bytesWritten, 0) require.NoError(t, err) @@ -462,7 +463,7 @@ func TestNopEncodingDifferentLogSizes(t *testing.T) { operator, sink := testManager(t, cfg) // Create a file, then start - temp := openTemp(t, tempDir) + temp := filetest.OpenTemp(t, tempDir) bytesWritten, err := temp.Write(tc.input) require.Greater(t, bytesWritten, 0) require.NoError(t, err) @@ -491,8 +492,8 @@ func TestReadNewLogs(t *testing.T) { operator.poll(context.Background()) // Create a new file - temp := openTemp(t, tempDir) - writeString(t, temp, "testlog\n") + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "testlog\n") // Poll a second time after the file has been created operator.poll(context.Background()) @@ -514,14 +515,14 @@ func TestReadExistingAndNewLogs(t *testing.T) { // Start with a file with an entry in it, and expect that entry // to come through when we poll for the first time - temp := openTemp(t, tempDir) - writeString(t, temp, "testlog1\n") + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "testlog1\n") operator.poll(context.Background()) sink.ExpectToken(t, []byte("testlog1")) // Write a second entry, and expect that entry to come through // as well - writeString(t, temp, "testlog2\n") + filetest.WriteString(t, temp, "testlog2\n") operator.poll(context.Background()) sink.ExpectToken(t, []byte("testlog2")) } @@ -536,15 +537,15 @@ func TestStartAtEnd(t *testing.T) { operator, sink := testManager(t, cfg) operator.persister = testutil.NewUnscopedMockPersister() - temp := openTemp(t, tempDir) - writeString(t, temp, "testlog1\n") + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "testlog1\n") // Expect no entries on the first poll operator.poll(context.Background()) sink.ExpectNoCalls(t) // Expect any new entries after the first poll - writeString(t, temp, "testlog2\n") + filetest.WriteString(t, temp, "testlog2\n") operator.poll(context.Background()) sink.ExpectToken(t, []byte("testlog2")) } @@ -565,8 +566,8 @@ func TestStartAtEndNewFile(t *testing.T) { operator.persister = testutil.NewUnscopedMockPersister() operator.poll(context.Background()) - temp := openTemp(t, tempDir) - writeString(t, temp, "testlog1\ntestlog2\n") + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "testlog1\ntestlog2\n") operator.poll(context.Background()) sink.ExpectToken(t, []byte("testlog1")) @@ -584,8 +585,8 @@ func TestNoNewline(t *testing.T) { cfg.FlushPeriod = time.Nanosecond operator, sink := testManager(t, cfg) - temp := openTemp(t, tempDir) - writeString(t, temp, "testlog1\ntestlog2") + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "testlog1\ntestlog2") require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { @@ -605,8 +606,8 @@ func TestEmptyLine(t *testing.T) { cfg.StartAt = "beginning" operator, sink := testManager(t, cfg) - temp := openTemp(t, tempDir) - writeString(t, temp, "testlog1\n\ntestlog2\n") + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "testlog1\n\ntestlog2\n") require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { @@ -628,8 +629,8 @@ func TestMultipleEmpty(t *testing.T) { cfg.StartAt = "beginning" operator, sink := testManager(t, cfg) - temp := openTemp(t, tempDir) - writeString(t, temp, "\n\ntestlog1\n\n\ntestlog2\n") + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "\n\ntestlog1\n\n\ntestlog2\n") require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { @@ -655,8 +656,8 @@ func TestLeadingEmpty(t *testing.T) { cfg.StartAt = "beginning" operator, sink := testManager(t, cfg) - temp := openTemp(t, tempDir) - writeString(t, temp, "\ntestlog1\ntestlog2\n") + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "\ntestlog1\ntestlog2\n") require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { @@ -680,12 +681,12 @@ func TestSplitWrite(t *testing.T) { operator, sink := testManager(t, cfg) operator.persister = testutil.NewUnscopedMockPersister() - temp := openTemp(t, tempDir) - writeString(t, temp, "testlog1") + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "testlog1") operator.poll(context.Background()) - writeString(t, temp, "testlog2\n") + filetest.WriteString(t, temp, "testlog2\n") operator.poll(context.Background()) sink.ExpectToken(t, []byte("testlog1testlog2")) @@ -700,19 +701,19 @@ func TestIgnoreEmptyFiles(t *testing.T) { operator, sink := testManager(t, cfg) operator.persister = testutil.NewUnscopedMockPersister() - temp := openTemp(t, tempDir) - temp2 := openTemp(t, tempDir) - temp3 := openTemp(t, tempDir) - temp4 := openTemp(t, tempDir) + temp := filetest.OpenTemp(t, tempDir) + temp2 := filetest.OpenTemp(t, tempDir) + temp3 := filetest.OpenTemp(t, tempDir) + temp4 := filetest.OpenTemp(t, tempDir) - writeString(t, temp, "testlog1\n") - writeString(t, temp3, "testlog2\n") + filetest.WriteString(t, temp, "testlog1\n") + filetest.WriteString(t, temp3, "testlog2\n") operator.poll(context.Background()) sink.ExpectTokens(t, []byte("testlog1"), []byte("testlog2")) - writeString(t, temp2, "testlog3\n") - writeString(t, temp4, "testlog4\n") + filetest.WriteString(t, temp2, "testlog3\n") + filetest.WriteString(t, temp4, "testlog4\n") operator.poll(context.Background()) sink.ExpectTokens(t, []byte("testlog3"), []byte("testlog4")) @@ -731,9 +732,9 @@ func TestDecodeBufferIsResized(t *testing.T) { require.NoError(t, operator.Stop()) }() - temp := openTemp(t, tempDir) + temp := filetest.OpenTemp(t, tempDir) expected := tokenWithLength(1<<12 + 1) - writeString(t, temp, string(expected)+"\n") + filetest.WriteString(t, temp, string(expected)+"\n") sink.ExpectToken(t, expected) } @@ -746,11 +747,11 @@ func TestMultiFileSimple(t *testing.T) { cfg.StartAt = "beginning" operator, sink := testManager(t, cfg) - temp1 := openTemp(t, tempDir) - temp2 := openTemp(t, tempDir) + temp1 := filetest.OpenTemp(t, tempDir) + temp2 := filetest.OpenTemp(t, tempDir) - writeString(t, temp1, "testlog1\n") - writeString(t, temp2, "testlog2\n") + filetest.WriteString(t, temp1, "testlog1\n") + filetest.WriteString(t, temp2, "testlog2\n") require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { @@ -778,11 +779,11 @@ func TestMultiFileSort(t *testing.T) { operator, sink := testManager(t, cfg) - temp1 := openTempWithPattern(t, tempDir, ".*log1") - temp2 := openTempWithPattern(t, tempDir, ".*log2") + temp1 := filetest.OpenTempWithPattern(t, tempDir, ".*log1") + temp2 := filetest.OpenTempWithPattern(t, tempDir, ".*log2") - writeString(t, temp1, "testlog1\n") - writeString(t, temp2, "testlog2\n") + filetest.WriteString(t, temp1, "testlog1\n") + filetest.WriteString(t, temp2, "testlog2\n") require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { @@ -812,11 +813,11 @@ func TestMultiFileSortTimestamp(t *testing.T) { operator, sink := testManager(t, cfg) - temp1 := openTempWithPattern(t, tempDir, ".*2023020602.log") - temp2 := openTempWithPattern(t, tempDir, ".*2023020603.log") + temp1 := filetest.OpenTempWithPattern(t, tempDir, ".*2023020602.log") + temp2 := filetest.OpenTempWithPattern(t, tempDir, ".*2023020603.log") - writeString(t, temp1, "testlog1\n") - writeString(t, temp2, "testlog2\n") + filetest.WriteString(t, temp1, "testlog1\n") + filetest.WriteString(t, temp2, "testlog2\n") require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { @@ -849,12 +850,12 @@ func TestMultiFileParallel_PreloadedFiles(t *testing.T) { var wg sync.WaitGroup for i := 0; i < numFiles; i++ { - temp := openTemp(t, tempDir) + temp := filetest.OpenTemp(t, tempDir) wg.Add(1) go func(tf *os.File, f int) { defer wg.Done() for j := 0; j < numMessages; j++ { - writeString(t, tf, getMessage(f, j)+"\n") + filetest.WriteString(t, tf, getMessage(f, j)+"\n") } }(temp, i) } @@ -895,7 +896,7 @@ func TestMultiFileParallel_LiveFiles(t *testing.T) { temps := make([]*os.File, 0, numFiles) for i := 0; i < numFiles; i++ { - temps = append(temps, openTemp(t, tempDir)) + temps = append(temps, filetest.OpenTemp(t, tempDir)) } var wg sync.WaitGroup @@ -904,7 +905,7 @@ func TestMultiFileParallel_LiveFiles(t *testing.T) { go func(tf *os.File, f int) { defer wg.Done() for j := 0; j < numMessages; j++ { - writeString(t, tf, getMessage(f, j)+"\n") + filetest.WriteString(t, tf, getMessage(f, j)+"\n") } }(temp, i) } @@ -935,7 +936,7 @@ func TestRestartOffsets(t *testing.T) { persister := testutil.NewUnscopedMockPersister() - logFile := openTemp(t, tempDir) + logFile := filetest.OpenTemp(t, tempDir) before1stRun := tokenWithLength(tc.lineLength) during1stRun := tokenWithLength(tc.lineLength) @@ -943,23 +944,23 @@ func TestRestartOffsets(t *testing.T) { during2ndRun := tokenWithLength(tc.lineLength) operatorOne, sink1 := testManager(t, cfg) - writeString(t, logFile, string(before1stRun)+"\n") + filetest.WriteString(t, logFile, string(before1stRun)+"\n") require.NoError(t, operatorOne.Start(persister)) if tc.startAt == "beginning" { sink1.ExpectToken(t, before1stRun) } else { sink1.ExpectNoCallsUntil(t, 500*time.Millisecond) } - writeString(t, logFile, string(during1stRun)+"\n") + filetest.WriteString(t, logFile, string(during1stRun)+"\n") sink1.ExpectToken(t, during1stRun) require.NoError(t, operatorOne.Stop()) - writeString(t, logFile, string(duringRestart)+"\n") + filetest.WriteString(t, logFile, string(duringRestart)+"\n") operatorTwo, sink2 := testManager(t, cfg) require.NoError(t, operatorTwo.Start(persister)) sink2.ExpectToken(t, duringRestart) - writeString(t, logFile, string(during2ndRun)+"\n") + filetest.WriteString(t, logFile, string(during2ndRun)+"\n") sink2.ExpectToken(t, during2ndRun) require.NoError(t, operatorTwo.Stop()) }) @@ -987,7 +988,7 @@ func TestManyLogsDelivered(t *testing.T) { }() // Write lots of logs - temp := openTemp(t, tempDir) + temp := filetest.OpenTemp(t, tempDir) for _, message := range expectedTokens { _, err := temp.WriteString(message + "\n") require.NoError(t, err) @@ -1020,7 +1021,7 @@ func TestFileBatching(t *testing.T) { temps := make([]*os.File, 0, files) for i := 0; i < files; i++ { - temps = append(temps, openTemp(t, tempDir)) + temps = append(temps, filetest.OpenTemp(t, tempDir)) } // Write logs to each file @@ -1076,7 +1077,7 @@ func TestFileBatchingRespectsStartAtEnd(t *testing.T) { temps := make([]*os.File, 0, initFiles+moreFiles) for i := 0; i < initFiles; i++ { - temps = append(temps, openTemp(t, tempDir)) + temps = append(temps, filetest.OpenTemp(t, tempDir)) } // Write one log to each file @@ -1092,7 +1093,7 @@ func TestFileBatchingRespectsStartAtEnd(t *testing.T) { // Create some more files for i := 0; i < moreFiles; i++ { - temps = append(temps, openTemp(t, tempDir)) + temps = append(temps, filetest.OpenTemp(t, tempDir)) } // Write a log to each file @@ -1117,8 +1118,8 @@ func TestFileReader_FingerprintUpdated(t *testing.T) { cfg.StartAt = "beginning" operator, sink := testManager(t, cfg) - temp := openTemp(t, tempDir) - tempCopy := openFile(t, temp.Name()) + temp := filetest.OpenTemp(t, tempDir) + tempCopy := filetest.OpenFile(t, temp.Name()) fp, err := operator.readerFactory.NewFingerprint(temp) require.NoError(t, err) @@ -1126,7 +1127,7 @@ func TestFileReader_FingerprintUpdated(t *testing.T) { require.NoError(t, err) defer reader.Close() - writeString(t, temp, "testlog1\n") + filetest.WriteString(t, temp, "testlog1\n") reader.ReadToEnd(context.Background()) sink.ExpectToken(t, []byte("testlog1")) require.Equal(t, []byte("testlog1\n"), reader.Fingerprint.FirstBytes) @@ -1159,8 +1160,8 @@ func TestFingerprintGrowsAndStops(t *testing.T) { cfg.FingerprintSize = helper.ByteSize(maxFP) operator, _ := testManager(t, cfg) - temp := openTemp(t, tempDir) - tempCopy := openFile(t, temp.Name()) + temp := filetest.OpenTemp(t, tempDir) + tempCopy := filetest.OpenFile(t, temp.Name()) fp, err := operator.readerFactory.NewFingerprint(temp) require.NoError(t, err) require.Equal(t, []byte(""), fp.FirstBytes) @@ -1185,7 +1186,7 @@ func TestFingerprintGrowsAndStops(t *testing.T) { line := string(tokenWithLength(lineLen-1)) + "\n" fileContent = append(fileContent, []byte(line)...) - writeString(t, temp, line) + filetest.WriteString(t, temp, line) reader.ReadToEnd(context.Background()) require.Equal(t, fileContent[:expectedFP], reader.Fingerprint.FirstBytes) } @@ -1222,8 +1223,8 @@ func TestFingerprintChangeSize(t *testing.T) { cfg.FingerprintSize = helper.ByteSize(maxFP) operator, _ := testManager(t, cfg) - temp := openTemp(t, tempDir) - tempCopy := openFile(t, temp.Name()) + temp := filetest.OpenTemp(t, tempDir) + tempCopy := filetest.OpenFile(t, temp.Name()) fp, err := operator.readerFactory.NewFingerprint(temp) require.NoError(t, err) require.Equal(t, []byte(""), fp.FirstBytes) @@ -1248,7 +1249,7 @@ func TestFingerprintChangeSize(t *testing.T) { line := string(tokenWithLength(lineLen-1)) + "\n" fileContent = append(fileContent, []byte(line)...) - writeString(t, temp, line) + filetest.WriteString(t, temp, line) reader.ReadToEnd(context.Background()) require.Equal(t, fileContent[:expectedFP], reader.Fingerprint.FirstBytes) } @@ -1261,7 +1262,7 @@ func TestFingerprintChangeSize(t *testing.T) { line := string(tokenWithLength(lineLen-1)) + "\n" fileContent = append(fileContent, []byte(line)...) - writeString(t, temp, line) + filetest.WriteString(t, temp, line) reader.ReadToEnd(context.Background()) require.Equal(t, fileContent[:expectedFP], reader.Fingerprint.FirstBytes) @@ -1269,7 +1270,7 @@ func TestFingerprintChangeSize(t *testing.T) { line = string(tokenWithLength(lineLen-1)) + "\n" fileContent = append(fileContent, []byte(line)...) - writeString(t, temp, line) + filetest.WriteString(t, temp, line) reader.ReadToEnd(context.Background()) require.Equal(t, fileContent[:expectedFP], reader.Fingerprint.FirstBytes) }) @@ -1345,7 +1346,7 @@ func TestEncodings(t *testing.T) { operator, sink := testManager(t, cfg) // Populate the file - temp := openTemp(t, tempDir) + temp := filetest.OpenTemp(t, tempDir) _, err := temp.Write(tc.contents) require.NoError(t, err) @@ -1372,7 +1373,7 @@ func TestDeleteAfterRead(t *testing.T) { tempDir := t.TempDir() temps := make([]*os.File, 0, files) for i := 0; i < files; i++ { - temps = append(temps, openTemp(t, tempDir)) + temps = append(temps, filetest.OpenTemp(t, tempDir)) } // Write logs to each file @@ -1428,7 +1429,7 @@ func TestMaxBatching(t *testing.T) { temps := make([]*os.File, 0, files) for i := 0; i < files; i++ { - temps = append(temps, openTemp(t, tempDir)) + temps = append(temps, filetest.OpenTemp(t, tempDir)) } // Write logs to each file @@ -1479,8 +1480,8 @@ func TestReadExistingLogsWithHeader(t *testing.T) { operator, sink := testManager(t, cfg) // Create a file, then start - temp := openTemp(t, tempDir) - writeString(t, temp, "#headerField: headerValue\ntestlog\n") + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "#headerField: headerValue\ntestlog\n") require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { @@ -1511,12 +1512,12 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) { operator := testManagerWithSink(t, cfg, sink) operator.persister = testutil.NewUnscopedMockPersister() - shortFile := openTemp(t, tempDir) + shortFile := filetest.OpenTemp(t, tempDir) _, err := shortFile.WriteString(shortFileLine + "\n") require.NoError(t, err) require.NoError(t, shortFile.Close()) - longFile := openTemp(t, tempDir) + longFile := filetest.OpenTemp(t, tempDir) for line := 0; line < longFileLines; line++ { _, err := longFile.WriteString(string(tokenWithLength(100)) + "\n") require.NoError(t, err) @@ -1575,8 +1576,8 @@ func TestHeaderPersistance(t *testing.T) { op1, sink1 := testManager(t, cfg) // Create a file, then start - temp := openTemp(t, tempDir) - writeString(t, temp, "#headerField: headerValue\nlog line\n") + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "#headerField: headerValue\nlog line\n") persister := testutil.NewUnscopedMockPersister() @@ -1588,7 +1589,7 @@ func TestHeaderPersistance(t *testing.T) { }) require.NoError(t, op1.Stop()) - writeString(t, temp, "log line 2\n") + filetest.WriteString(t, temp, "log line 2\n") op2, sink2 := testManager(t, cfg) @@ -1615,8 +1616,8 @@ func TestHeaderPersistanceInHeader(t *testing.T) { op1, _ := testManager(t, cfg1) // Create a file, then start - temp := openTemp(t, tempDir) - writeString(t, temp, "|headerField1: headerValue1\n") + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "|headerField1: headerValue1\n") persister := testutil.NewUnscopedMockPersister() @@ -1625,7 +1626,7 @@ func TestHeaderPersistanceInHeader(t *testing.T) { time.Sleep(2 * cfg1.PollInterval) require.NoError(t, op1.Stop()) - writeString(t, temp, "|headerField2: headerValue2\nlog line\n") + filetest.WriteString(t, temp, "|headerField2: headerValue2\nlog line\n") cfg2 := NewConfig().includeDir(tempDir) cfg2.StartAt = "beginning" @@ -1652,13 +1653,13 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) { operator.persister = testutil.NewUnscopedMockPersister() // Both of they will be include - file1 := openTempWithPattern(t, tempDir, "*.log1") - file2 := openTempWithPattern(t, tempDir, "*.log2") + file1 := filetest.OpenTempWithPattern(t, tempDir, "*.log1") + file2 := filetest.OpenTempWithPattern(t, tempDir, "*.log2") // Two same fingerprint file , and smaller than config size content := "aaaaaaaaaaa" - writeString(t, file1, content+"\n") - writeString(t, file2, content+"\n") + filetest.WriteString(t, file1, content+"\n") + filetest.WriteString(t, file2, content+"\n") operator.poll(context.Background()) // one file will be exclude, ingest only one content sink.ExpectToken(t, []byte(content)) @@ -1672,8 +1673,8 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) { // keep append data to file1 and file2 newContent := "bbbbbbbbbbbb" newContent1 := "ddd" - writeString(t, file1, newContent1+"\n") - writeString(t, file2, newContent+"\n") + filetest.WriteString(t, file1, newContent1+"\n") + filetest.WriteString(t, file2, newContent+"\n") operator.poll(context.Background()) // We should have updated the offset for one of the files, so the second file should now // be ingested from the beginning @@ -1689,8 +1690,8 @@ func TestWindowsFilesClosedImmediately(t *testing.T) { cfg.StartAt = "beginning" operator, sink := testManager(t, cfg) - temp := openTemp(t, tempDir) - writeString(t, temp, "testlog\n") + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "testlog\n") require.NoError(t, temp.Close()) operator.poll(context.Background()) diff --git a/pkg/stanza/fileconsumer/internal/filetest/filetest.go b/pkg/stanza/fileconsumer/internal/filetest/filetest.go new file mode 100644 index 000000000000..c9ea3b209493 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/filetest/filetest.go @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package filetest // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +func OpenFile(tb testing.TB, path string) *os.File { + file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0600) + require.NoError(tb, err) + tb.Cleanup(func() { _ = file.Close() }) + return file +} + +func OpenTemp(t testing.TB, tempDir string) *os.File { + return OpenTempWithPattern(t, tempDir, "") +} + +func ReopenTemp(t testing.TB, name string) *os.File { + return OpenTempWithPattern(t, filepath.Dir(name), filepath.Base(name)) +} + +func OpenTempWithPattern(t testing.TB, tempDir, pattern string) *os.File { + file, err := os.CreateTemp(tempDir, pattern) + require.NoError(t, err) + t.Cleanup(func() { _ = file.Close() }) + return file +} + +func WriteString(t testing.TB, file *os.File, s string) { + _, err := file.WriteString(s) + require.NoError(t, err) +} diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/reader_test.go index 07a9e17e6e84..7b28b67f910e 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/reader_test.go @@ -13,6 +13,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" @@ -27,7 +28,7 @@ func TestPersistFlusher(t *testing.T) { flushPeriod := 100 * time.Millisecond f, sink := testReaderFactory(t, split.Config{}, defaultMaxLogSize, flushPeriod) - temp := openTemp(t, t.TempDir()) + temp := filetest.OpenTemp(t, t.TempDir()) fp, err := f.NewFingerprint(temp) require.NoError(t, err) @@ -113,7 +114,7 @@ func TestTokenization(t *testing.T) { t.Run(tc.testName, func(t *testing.T) { f, sink := testReaderFactory(t, split.Config{}, defaultMaxLogSize, defaultFlushPeriod) - temp := openTemp(t, t.TempDir()) + temp := filetest.OpenTemp(t, t.TempDir()) _, err := temp.Write(tc.fileContent) require.NoError(t, err) @@ -143,7 +144,7 @@ func TestTokenizationTooLong(t *testing.T) { f, sink := testReaderFactory(t, split.Config{}, 10, defaultFlushPeriod) - temp := openTemp(t, t.TempDir()) + temp := filetest.OpenTemp(t, t.TempDir()) _, err := temp.Write(fileContent) require.NoError(t, err) @@ -175,7 +176,7 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) { sCfg.LineStartPattern = `\d+-\d+-\d+` f, sink := testReaderFactory(t, sCfg, 15, defaultFlushPeriod) - temp := openTemp(t, t.TempDir()) + temp := filetest.OpenTemp(t, t.TempDir()) _, err := temp.Write(fileContent) require.NoError(t, err) @@ -207,7 +208,7 @@ func TestHeaderFingerprintIncluded(t *testing.T) { require.NoError(t, err) f.HeaderConfig = h - temp := openTemp(t, t.TempDir()) + temp := filetest.OpenTemp(t, t.TempDir()) fp, err := f.NewFingerprint(temp) require.NoError(t, err) diff --git a/pkg/stanza/fileconsumer/rotation_test.go b/pkg/stanza/fileconsumer/rotation_test.go index 4946be837c35..61b9b0883f93 100644 --- a/pkg/stanza/fileconsumer/rotation_test.go +++ b/pkg/stanza/fileconsumer/rotation_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" ) @@ -58,7 +59,7 @@ func TestMultiFileRotate(t *testing.T) { temps := make([]*os.File, 0, numFiles) for i := 0; i < numFiles; i++ { - temps = append(temps, openTemp(t, tempDir)) + temps = append(temps, filetest.OpenTemp(t, tempDir)) } var wg sync.WaitGroup @@ -68,12 +69,12 @@ func TestMultiFileRotate(t *testing.T) { defer wg.Done() for k := 0; k < numRotations; k++ { for j := 0; j < numMessages; j++ { - writeString(t, tf, getMessage(f, k, j)+"\n") + filetest.WriteString(t, tf, getMessage(f, k, j)+"\n") } require.NoError(t, tf.Close()) require.NoError(t, os.Rename(tf.Name(), fmt.Sprintf("%s.%d", tf.Name(), k))) - tf = reopenTemp(t, tf.Name()) + tf = filetest.ReopenTemp(t, tf.Name()) } }(temp, i) } @@ -124,9 +125,9 @@ func TestMultiFileRotateSlow(t *testing.T) { defer wg.Done() for rotationNum := 0; rotationNum < numRotations; rotationNum++ { - file := openFile(t, baseFileName(fn)) + file := filetest.OpenFile(t, baseFileName(fn)) for messageNum := 0; messageNum < numMessages; messageNum++ { - writeString(t, file, getMessage(fn, rotationNum, messageNum)+"\n") + filetest.WriteString(t, file, getMessage(fn, rotationNum, messageNum)+"\n") time.Sleep(5 * time.Millisecond) } @@ -178,15 +179,15 @@ func TestMultiCopyTruncateSlow(t *testing.T) { defer wg.Done() for rotationNum := 0; rotationNum < numRotations; rotationNum++ { - file := openFile(t, baseFileName(fn)) + file := filetest.OpenFile(t, baseFileName(fn)) for messageNum := 0; messageNum < numMessages; messageNum++ { - writeString(t, file, getMessage(fn, rotationNum, messageNum)+"\n") + filetest.WriteString(t, file, getMessage(fn, rotationNum, messageNum)+"\n") time.Sleep(5 * time.Millisecond) } _, err := file.Seek(0, 0) require.NoError(t, err) - dst := openFile(t, fileName(fn, rotationNum)) + dst := filetest.OpenFile(t, fileName(fn, rotationNum)) _, err = io.Copy(dst, file) require.NoError(t, err) require.NoError(t, dst.Close()) @@ -370,8 +371,8 @@ func TestMoveFile(t *testing.T) { operator, sink := testManager(t, cfg) operator.persister = testutil.NewUnscopedMockPersister() - temp1 := openTemp(t, tempDir) - writeString(t, temp1, "testlog1\n") + temp1 := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp1, "testlog1\n") temp1.Close() operator.poll(context.Background()) @@ -398,8 +399,8 @@ func TestTrackMovedAwayFiles(t *testing.T) { operator, sink := testManager(t, cfg) operator.persister = testutil.NewUnscopedMockPersister() - temp1 := openTemp(t, tempDir) - writeString(t, temp1, "testlog1\n") + temp1 := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp1, "testlog1\n") temp1.Close() operator.poll(context.Background()) @@ -418,7 +419,7 @@ func TestTrackMovedAwayFiles(t *testing.T) { movedFile, err := os.OpenFile(newFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) require.NoError(t, err) - writeString(t, movedFile, "testlog2\n") + filetest.WriteString(t, movedFile, "testlog2\n") operator.poll(context.Background()) sink.ExpectToken(t, []byte("testlog2")) @@ -437,9 +438,9 @@ func TestTrackRotatedFilesLogOrder(t *testing.T) { cfg.StartAt = "beginning" operator, sink := testManager(t, cfg) - originalFile := openTemp(t, tempDir) + originalFile := filetest.OpenTemp(t, tempDir) orginalName := originalFile.Name() - writeString(t, originalFile, "testlog1\n") + filetest.WriteString(t, originalFile, "testlog1\n") require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) defer func() { @@ -447,7 +448,7 @@ func TestTrackRotatedFilesLogOrder(t *testing.T) { }() sink.ExpectToken(t, []byte("testlog1")) - writeString(t, originalFile, "testlog2\n") + filetest.WriteString(t, originalFile, "testlog2\n") originalFile.Close() newDir := fmt.Sprintf("%s%s", tempDir[:len(tempDir)-1], "_new/") @@ -458,7 +459,7 @@ func TestTrackRotatedFilesLogOrder(t *testing.T) { newFile, err := os.OpenFile(orginalName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) require.NoError(t, err) - writeString(t, newFile, "testlog3\n") + filetest.WriteString(t, newFile, "testlog3\n") sink.ExpectTokens(t, []byte("testlog2"), []byte("testlog3")) } @@ -478,21 +479,21 @@ func TestRotatedOutOfPatternMoveCreate(t *testing.T) { operator, sink := testManager(t, cfg) operator.persister = testutil.NewUnscopedMockPersister() - originalFile := openTempWithPattern(t, tempDir, "*.log1") + originalFile := filetest.OpenTempWithPattern(t, tempDir, "*.log1") originalFileName := originalFile.Name() - writeString(t, originalFile, "testlog1\n") + filetest.WriteString(t, originalFile, "testlog1\n") operator.poll(context.Background()) sink.ExpectToken(t, []byte("testlog1")) // write more log, before next poll() begins - writeString(t, originalFile, "testlog2\n") + filetest.WriteString(t, originalFile, "testlog2\n") // move the file so it no longer matches require.NoError(t, originalFile.Close()) require.NoError(t, os.Rename(originalFileName, originalFileName+".old")) - newFile := openFile(t, originalFileName) + newFile := filetest.OpenFile(t, originalFileName) _, err := newFile.Write([]byte("testlog4\ntestlog5\n")) require.NoError(t, err) @@ -515,15 +516,15 @@ func TestRotatedOutOfPatternCopyTruncate(t *testing.T) { operator, sink := testManager(t, cfg) operator.persister = testutil.NewUnscopedMockPersister() - originalFile := openTempWithPattern(t, tempDir, "*.log1") - writeString(t, originalFile, "testlog1\n") + originalFile := filetest.OpenTempWithPattern(t, tempDir, "*.log1") + filetest.WriteString(t, originalFile, "testlog1\n") operator.poll(context.Background()) sink.ExpectToken(t, []byte("testlog1")) // write more log, before next poll() begins - writeString(t, originalFile, "testlog2\n") + filetest.WriteString(t, originalFile, "testlog2\n") // copy the file to another file i.e. rotate, out of pattern - newFile := openTempWithPattern(t, tempDir, "*.log2") + newFile := filetest.OpenTempWithPattern(t, tempDir, "*.log2") _, err := originalFile.Seek(0, 0) require.NoError(t, err) _, err = io.Copy(newFile, originalFile) @@ -555,8 +556,8 @@ func TestTruncateThenWrite(t *testing.T) { operator, sink := testManager(t, cfg) operator.persister = testutil.NewUnscopedMockPersister() - temp1 := openTemp(t, tempDir) - writeString(t, temp1, "testlog1\ntestlog2\n") + temp1 := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp1, "testlog1\ntestlog2\n") operator.poll(context.Background()) sink.ExpectTokens(t, []byte("testlog1"), []byte("testlog2")) @@ -565,7 +566,7 @@ func TestTruncateThenWrite(t *testing.T) { _, err := temp1.Seek(0, 0) require.NoError(t, err) - writeString(t, temp1, "testlog3\n") + filetest.WriteString(t, temp1, "testlog3\n") operator.poll(context.Background()) sink.ExpectToken(t, []byte("testlog3")) sink.ExpectNoCalls(t) @@ -587,15 +588,15 @@ func TestCopyTruncateWriteBoth(t *testing.T) { operator, sink := testManager(t, cfg) operator.persister = testutil.NewUnscopedMockPersister() - temp1 := openTemp(t, tempDir) - writeString(t, temp1, "testlog1\ntestlog2\n") + temp1 := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp1, "testlog1\ntestlog2\n") operator.poll(context.Background()) sink.ExpectTokens(t, []byte("testlog1"), []byte("testlog2")) operator.wg.Wait() // wait for all goroutines to finish // Copy the first file to a new file, and add another log - temp2 := openTemp(t, tempDir) + temp2 := filetest.OpenTemp(t, tempDir) _, err := io.Copy(temp2, temp1) require.NoError(t, err) @@ -605,8 +606,8 @@ func TestCopyTruncateWriteBoth(t *testing.T) { require.NoError(t, err) // Write to original and new file - writeString(t, temp2, "testlog3\n") - writeString(t, temp1, "testlog4\n") + filetest.WriteString(t, temp2, "testlog3\n") + filetest.WriteString(t, temp1, "testlog4\n") // Expect both messages to come through operator.poll(context.Background()) @@ -629,9 +630,9 @@ func TestFileMovedWhileOff_BigFiles(t *testing.T) { log2 := tokenWithLength(1002) log3 := tokenWithLength(1003) - temp := openTemp(t, tempDir) + temp := filetest.OpenTemp(t, tempDir) tempName := temp.Name() - writeString(t, temp, string(log1)+"\n") + filetest.WriteString(t, temp, string(log1)+"\n") // Run the operator to read the first log require.NoError(t, operator.Start(persister)) @@ -639,15 +640,15 @@ func TestFileMovedWhileOff_BigFiles(t *testing.T) { require.NoError(t, operator.Stop()) // Write one more log to the original file - writeString(t, temp, string(log2)+"\n") + filetest.WriteString(t, temp, string(log2)+"\n") require.NoError(t, temp.Close()) // Rename the file and open another file in the same location require.NoError(t, os.Rename(tempName, fmt.Sprintf("%s2", tempName))) // Write a different log to the new file - temp2 := reopenTemp(t, tempName) - writeString(t, temp2, string(log3)+"\n") + temp2 := filetest.ReopenTemp(t, tempName) + filetest.WriteString(t, temp2, string(log3)+"\n") // Expect the message written to the new log to come through operator2, sink2 := testManager(t, cfg) diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index 171a96a4cee9..23dab8601fcc 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -5,8 +5,6 @@ package fileconsumer import ( "math/rand" - "os" - "path/filepath" "testing" "github.com/stretchr/testify/require" @@ -27,33 +25,6 @@ func testManagerWithSink(t *testing.T, cfg *Config, sink *emittest.Sink) *Manage return input } -func openFile(tb testing.TB, path string) *os.File { - file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0600) - require.NoError(tb, err) - tb.Cleanup(func() { _ = file.Close() }) - return file -} - -func openTemp(t testing.TB, tempDir string) *os.File { - return openTempWithPattern(t, tempDir, "") -} - -func reopenTemp(t testing.TB, name string) *os.File { - return openTempWithPattern(t, filepath.Dir(name), filepath.Base(name)) -} - -func openTempWithPattern(t testing.TB, tempDir, pattern string) *os.File { - file, err := os.CreateTemp(tempDir, pattern) - require.NoError(t, err) - t.Cleanup(func() { _ = file.Close() }) - return file -} - -func writeString(t testing.TB, file *os.File, s string) { - _, err := file.WriteString(s) - require.NoError(t, err) -} - func tokenWithLength(length int) []byte { charset := "abcdefghijklmnopqrstuvwxyz" b := make([]byte, length)