From 929cbcf937b187b42b9651cf43cd5dcb55b96f3e Mon Sep 17 00:00:00 2001 From: DarthPestilane Date: Wed, 8 Sep 2021 17:41:35 +0800 Subject: [PATCH] minor improvement --- benchmarks_test.go | 30 +++++++++++++++++++----------- router.go | 2 +- session.go | 37 +++++++++++++++++++++++++++---------- 3 files changed, 47 insertions(+), 22 deletions(-) diff --git a/benchmarks_test.go b/benchmarks_test.go index 292b2d3..6b6d92c 100644 --- a/benchmarks_test.go +++ b/benchmarks_test.go @@ -6,7 +6,6 @@ import ( "github.com/DarthPestilane/easytcp/message" "net" "testing" - "time" ) // go test -bench="^BenchmarkTCPServer_\w+$" -run=none -benchmem -benchtime=250000x @@ -21,6 +20,7 @@ func Benchmark_NoRoute(b *testing.B) { DoNotPrintRoutes: true, }) go s.Serve(":0") // nolint + defer s.Stop() // nolint <-s.accepting @@ -29,8 +29,8 @@ func Benchmark_NoRoute(b *testing.B) { if err != nil { panic(err) } - time.Sleep(time.Millisecond * 5) - // defer client.Close() // nolint + defer client.Close() // nolint + packedMsg, _ := s.Packer.Pack(&message.Entry{ID: 1, Data: []byte("ping")}) beforeBench(b) for i := 0; i < b.N; i++ { @@ -46,6 +46,7 @@ func Benchmark_NotFoundHandler(b *testing.B) { return ctx.Response(0, []byte("not found")) }) go s.Serve(":0") // nolint + defer s.Stop() // nolint <-s.accepting @@ -54,7 +55,7 @@ func Benchmark_NotFoundHandler(b *testing.B) { if err != nil { panic(err) } - // defer client.Close() // nolint + defer client.Close() // nolint packedMsg, _ := s.Packer.Pack(&message.Entry{ID: 1, Data: []byte("ping")}) beforeBench(b) @@ -71,6 +72,7 @@ func Benchmark_OneHandler(b *testing.B) { return ctx.Response(2, []byte("pong")) }) go s.Serve(":0") // nolint + defer s.Stop() // nolint <-s.accepting @@ -79,7 +81,7 @@ func Benchmark_OneHandler(b *testing.B) { if err != nil { panic(err) } - // defer client.Close() // nolint + defer client.Close() // nolint packedMsg, _ := s.Packer.Pack(&message.Entry{ID: 1, Data: []byte("ping")}) beforeBench(b) @@ -104,6 +106,7 @@ func Benchmark_ManyHandlers(b *testing.B) { }, m, m) go s.Serve(":0") // nolint + defer s.Stop() // nolint <-s.accepting @@ -112,7 +115,7 @@ func Benchmark_ManyHandlers(b *testing.B) { if err != nil { panic(err) } - // defer client.Close() // nolint + defer client.Close() // nolint packedMsg, _ := s.Packer.Pack(&message.Entry{ID: 1, Data: []byte("ping")}) beforeBench(b) @@ -131,6 +134,7 @@ func Benchmark_OneRouteCtxGetSet(b *testing.B) { return ctx.Response(2, []byte(v)) }) go s.Serve(":0") // nolint + defer s.Stop() // nolint <-s.accepting @@ -139,7 +143,7 @@ func Benchmark_OneRouteCtxGetSet(b *testing.B) { if err != nil { panic(err) } - // defer client.Close() // nolint + defer client.Close() // nolint packedMsg, _ := s.Packer.Pack(&message.Entry{ID: 1, Data: []byte("ping")}) beforeBench(b) @@ -158,6 +162,7 @@ func Benchmark_OneRouteMessageGetSet(b *testing.B) { return ctx.Response(2, v) }) go s.Serve(":0") // nolint + defer s.Stop() // nolint <-s.accepting @@ -166,7 +171,7 @@ func Benchmark_OneRouteMessageGetSet(b *testing.B) { if err != nil { panic(err) } - // defer client.Close() // nolint + defer client.Close() // nolint packedMsg, _ := s.Packer.Pack(&message.Entry{ID: 1, Data: []byte("ping")}) beforeBench(b) @@ -186,6 +191,7 @@ func Benchmark_OneRouteJsonCodec(b *testing.B) { return ctx.Response(2, map[string]string{"data": "pong"}) }) go s.Serve(":0") // nolint + defer s.Stop() // nolint <-s.accepting @@ -194,7 +200,7 @@ func Benchmark_OneRouteJsonCodec(b *testing.B) { if err != nil { panic(err) } - // defer client.Close() // nolint + defer client.Close() // nolint packedMsg, _ := s.Packer.Pack(&message.Entry{ID: 1, Data: []byte(`{"data": "ping"}`)}) beforeBench(b) @@ -214,6 +220,7 @@ func Benchmark_OneRouteProtobufCodec(b *testing.B) { return ctx.Response(2, &pb.Sample{Foo: "test-resp", Bar: req.Bar + 1}) }) go s.Serve(":0") // nolint + defer s.Stop() // nolint <-s.accepting @@ -222,7 +229,7 @@ func Benchmark_OneRouteProtobufCodec(b *testing.B) { if err != nil { panic(err) } - // defer client.Close() // nolint + defer client.Close() // nolint data, _ := s.Codec.Encode(&pb.Sample{Foo: "test", Bar: 1}) packedMsg, _ := s.Packer.Pack(&message.Entry{ID: 1, Data: data}) @@ -243,6 +250,7 @@ func Benchmark_OneRouteMsgpackCodec(b *testing.B) { return ctx.Response(2, &msgpack.Sample{Foo: "test-resp", Bar: req.Bar + 1}) }) go s.Serve(":0") // nolint + defer s.Stop() // nolint <-s.accepting @@ -251,7 +259,7 @@ func Benchmark_OneRouteMsgpackCodec(b *testing.B) { if err != nil { panic(err) } - // defer client.Close() // nolint + defer client.Close() // nolint data, _ := s.Codec.Encode(&msgpack.Sample{Foo: "test", Bar: 1}) packedMsg, _ := s.Packer.Pack(&message.Entry{ID: 1, Data: data}) diff --git a/router.go b/router.go index 0f66fdf..3986040 100644 --- a/router.go +++ b/router.go @@ -67,7 +67,7 @@ func (r *Router) stop() { // consumeRequest fetches context from reqQueue, and handle it. func (r *Router) consumeRequest() { - defer Log.Tracef("router stopped") + defer func() { Log.Tracef("router stopped") }() for { select { case <-r.stopped: diff --git a/session.go b/session.go index 4d21abe..88d5797 100644 --- a/session.go +++ b/session.go @@ -95,9 +95,7 @@ func (s *Session) readInbound(reqQueue chan<- *Context, timeout time.Duration) { ctx := s.ctxPool.Get().(*Context) ctx.reset(s, reqEntry) - select { - case reqQueue <- ctx: - case <-s.closed: + if !s.sendReq(ctx, reqQueue) { Log.Tracef("session %s readInbound exit because session is closed", s.id) return } @@ -106,6 +104,22 @@ func (s *Session) readInbound(reqQueue chan<- *Context, timeout time.Duration) { s.close() } +func (s *Session) sendReq(ctx *Context, reqQueue chan<- *Context) (ok bool) { + defer func() { + if r := recover(); r != nil { + ok = false + } + }() + select { + case reqQueue <- ctx: + ok = true + case <-s.closed: + Log.Tracef("session %s readInbound exit because session is closed", s.id) + ok = false + } + return +} + // writeOutbound fetches message from respQueue channel and writes to TCP connection in a loop. // Parameter writeTimeout specified the connection writing timeout. // The loop breaks if errors occurred, or the session is closed. @@ -122,13 +136,7 @@ FOR: return } - s.ctxPool.Put(ctx) - - if ctx.respEntry == nil { - continue - } - // pack message - outboundMsg, err := s.packer.Pack(ctx.respEntry) + outboundMsg, err := s.pack(ctx) if err != nil { Log.Errorf("session %s pack outbound message err: %s", s.id, err) continue @@ -136,6 +144,7 @@ FOR: if outboundMsg == nil { continue } + if writeTimeout > 0 { if err := s.conn.SetWriteDeadline(time.Now().Add(writeTimeout)); err != nil { Log.Errorf("session %s set write deadline err: %s", s.id, err) @@ -151,3 +160,11 @@ FOR: s.close() Log.Tracef("session %s writeOutbound exit because of error", s.id) } + +func (s *Session) pack(ctx *Context) ([]byte, error) { + defer s.ctxPool.Put(ctx) + if ctx.respEntry == nil { + return nil, nil + } + return s.packer.Pack(ctx.respEntry) +}