From 56f315d8a9aa8a9c086f7ee35f24e49f86c7d191 Mon Sep 17 00:00:00 2001 From: Prashant Varanasi Date: Wed, 27 Jul 2016 07:54:50 -0700 Subject: [PATCH 1/2] Support concurrent logging to a single WriteSyncer Currently, the WriteSyncer assumes the underlying Writer will support concurrent Writes correctly. However, this is not true for common io.Writer implementations: - *os.File may call the write syscall multiple times - *bytes.Buffer does not support concurrent writes at all Instead, we will wrap all passed in WriteSyncers in a lockedWriteSyncer which will use a mutex to serialize calls to the underlying WriteSyncer. In future, we may want to expose a way of setting a WriteSyncer that already supports concurrent calls, but since most common implementations cannot handle concurrent writes correctly, locking the writer seems like a simpler default. --- logger_test.go | 39 +++++++++++++++++++++++++++++++++++++++ meta.go | 4 ++-- options.go | 4 ++-- writer.go | 24 ++++++++++++++++++++++++ 4 files changed, 67 insertions(+), 4 deletions(-) diff --git a/logger_test.go b/logger_test.go index 1b21127b9..05110c878 100644 --- a/logger_test.go +++ b/logger_test.go @@ -26,6 +26,7 @@ import ( "io/ioutil" "os" "strings" + "sync" "testing" "github.com/uber-go/zap/spywrite" @@ -274,3 +275,41 @@ func TestJSONLoggerSyncsOutput(t *testing.T) { assert.Panics(t, func() { logger.Panic("foo") }, "Expected panic when logging at Panic level.") assert.True(t, sink.Called(), "Expected logging at panic level to Sync underlying WriteSyncer.") } + +func TestLoggerConcurrent(t *testing.T) { + buf := &bytes.Buffer{} + withJSONLogger(t, []Option{Output(AddSync(buf))}, func(jl *jsonLogger, output func() []string) { + jl.StubTime() + jl2 := jl.With(String("foo", "bar")) + + wg := &sync.WaitGroup{} + runNTimes(5, wg, func() { + for i := 0; i < 10; i++ { + jl.Info("info", String("foo", "bar")) + jl.Info("info", String("foo", "bar")) + } + }) + runNTimes(5, wg, func() { + for i := 0; i < 10; i++ { + jl2.Info("info") + jl2.Info("info") + } + }) + + wg.Wait() + + // Make sure the output doesn't contain interspersed entries. + expected := `{"msg":"info","level":"info","ts":0,"fields":{"foo":"bar"}}` + "\n" + assert.Equal(t, strings.Repeat(expected, 200), buf.String()) + }) +} + +func runNTimes(n int, wg *sync.WaitGroup, f func()) { + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + f() + }() + } +} diff --git a/meta.go b/meta.go index c048dacaa..c8fe3dd52 100644 --- a/meta.go +++ b/meta.go @@ -50,8 +50,8 @@ func NewMeta() *Meta { return &Meta{ lvl: atomic.NewInt32(int32(InfoLevel)), Encoder: newJSONEncoder(), - Output: os.Stdout, - ErrorOutput: os.Stderr, + Output: newLockedWriteSyncer(os.Stdout), + ErrorOutput: newLockedWriteSyncer(os.Stderr), } } diff --git a/options.go b/options.go index cfb35cae9..a9ed1372e 100644 --- a/options.go +++ b/options.go @@ -47,14 +47,14 @@ func Fields(fields ...Field) Option { // Output sets the destination for the logger's output. func Output(w WriteSyncer) Option { return optionFunc(func(m *Meta) { - m.Output = w + m.Output = newLockedWriteSyncer(w) }) } // ErrorOutput sets the destination for errors generated by the logger. func ErrorOutput(w WriteSyncer) Option { return optionFunc(func(m *Meta) { - m.ErrorOutput = w + m.ErrorOutput = newLockedWriteSyncer(w) }) } diff --git a/writer.go b/writer.go index 537855b3a..51735e91d 100644 --- a/writer.go +++ b/writer.go @@ -23,6 +23,7 @@ package zap import ( "io" "io/ioutil" + "sync" ) // Discard is a convenience wrapper around ioutil.Discard. @@ -56,6 +57,29 @@ func AddSync(w io.Writer) WriteSyncer { } } +type lockedWriteSyncer struct { + sync.Mutex + ws WriteSyncer +} + +func newLockedWriteSyncer(ws WriteSyncer) WriteSyncer { + return &lockedWriteSyncer{ws: ws} +} + +func (s *lockedWriteSyncer) Write(bs []byte) (int, error) { + s.Lock() + n, err := s.ws.Write(bs) + s.Unlock() + return n, err +} + +func (s *lockedWriteSyncer) Sync() error { + s.Lock() + err := s.ws.Sync() + s.Unlock() + return err +} + type writerWrapper struct { io.Writer } From 9d327a58af3be4acf62497e4e228b83945a9052e Mon Sep 17 00:00:00 2001 From: Prashant Varanasi Date: Wed, 27 Jul 2016 10:43:01 -0700 Subject: [PATCH 2/2] runNTimes will do iterations and goroutines --- logger_test.go | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/logger_test.go b/logger_test.go index 05110c878..67a11e12b 100644 --- a/logger_test.go +++ b/logger_test.go @@ -283,33 +283,29 @@ func TestLoggerConcurrent(t *testing.T) { jl2 := jl.With(String("foo", "bar")) wg := &sync.WaitGroup{} - runNTimes(5, wg, func() { - for i := 0; i < 10; i++ { - jl.Info("info", String("foo", "bar")) - jl.Info("info", String("foo", "bar")) - } + runNTimes(5 /* goroutines */, 10 /* iterations */, wg, func() { + jl.Info("info", String("foo", "bar")) }) - runNTimes(5, wg, func() { - for i := 0; i < 10; i++ { - jl2.Info("info") - jl2.Info("info") - } + runNTimes(5 /* goroutines */, 10 /* iterations */, wg, func() { + jl2.Info("info") }) wg.Wait() // Make sure the output doesn't contain interspersed entries. expected := `{"msg":"info","level":"info","ts":0,"fields":{"foo":"bar"}}` + "\n" - assert.Equal(t, strings.Repeat(expected, 200), buf.String()) + assert.Equal(t, strings.Repeat(expected, 100), buf.String()) }) } -func runNTimes(n int, wg *sync.WaitGroup, f func()) { - wg.Add(n) - for i := 0; i < n; i++ { +func runNTimes(goroutines, iterations int, wg *sync.WaitGroup, f func()) { + wg.Add(goroutines) + for g := 0; g < goroutines; g++ { go func() { defer wg.Done() - f() + for i := 0; i < iterations; i++ { + f() + } }() } }