Skip to content

Commit

Permalink
minor improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
DarthPestilane committed Sep 8, 2021
1 parent aa3d72b commit 929cbcf
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 22 deletions.
30 changes: 19 additions & 11 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/DarthPestilane/easytcp/message"
"net"
"testing"
"time"
)

// go test -bench="^BenchmarkTCPServer_\w+$" -run=none -benchmem -benchtime=250000x
Expand All @@ -21,6 +20,7 @@ func Benchmark_NoRoute(b *testing.B) {
DoNotPrintRoutes: true,
})
go s.Serve(":0") // nolint
defer s.Stop() // nolint

<-s.accepting

Expand All @@ -29,8 +29,8 @@ func Benchmark_NoRoute(b *testing.B) {
if err != nil {
panic(err)
}
time.Sleep(time.Millisecond * 5)
// defer client.Close() // nolint
defer client.Close() // nolint

packedMsg, _ := s.Packer.Pack(&message.Entry{ID: 1, Data: []byte("ping")})
beforeBench(b)
for i := 0; i < b.N; i++ {
Expand All @@ -46,6 +46,7 @@ func Benchmark_NotFoundHandler(b *testing.B) {
return ctx.Response(0, []byte("not found"))
})
go s.Serve(":0") // nolint
defer s.Stop() // nolint

<-s.accepting

Expand All @@ -54,7 +55,7 @@ func Benchmark_NotFoundHandler(b *testing.B) {
if err != nil {
panic(err)
}
// defer client.Close() // nolint
defer client.Close() // nolint

packedMsg, _ := s.Packer.Pack(&message.Entry{ID: 1, Data: []byte("ping")})
beforeBench(b)
Expand All @@ -71,6 +72,7 @@ func Benchmark_OneHandler(b *testing.B) {
return ctx.Response(2, []byte("pong"))
})
go s.Serve(":0") // nolint
defer s.Stop() // nolint

<-s.accepting

Expand All @@ -79,7 +81,7 @@ func Benchmark_OneHandler(b *testing.B) {
if err != nil {
panic(err)
}
// defer client.Close() // nolint
defer client.Close() // nolint

packedMsg, _ := s.Packer.Pack(&message.Entry{ID: 1, Data: []byte("ping")})
beforeBench(b)
Expand All @@ -104,6 +106,7 @@ func Benchmark_ManyHandlers(b *testing.B) {
}, m, m)

go s.Serve(":0") // nolint
defer s.Stop() // nolint

<-s.accepting

Expand All @@ -112,7 +115,7 @@ func Benchmark_ManyHandlers(b *testing.B) {
if err != nil {
panic(err)
}
// defer client.Close() // nolint
defer client.Close() // nolint

packedMsg, _ := s.Packer.Pack(&message.Entry{ID: 1, Data: []byte("ping")})
beforeBench(b)
Expand All @@ -131,6 +134,7 @@ func Benchmark_OneRouteCtxGetSet(b *testing.B) {
return ctx.Response(2, []byte(v))
})
go s.Serve(":0") // nolint
defer s.Stop() // nolint

<-s.accepting

Expand All @@ -139,7 +143,7 @@ func Benchmark_OneRouteCtxGetSet(b *testing.B) {
if err != nil {
panic(err)
}
// defer client.Close() // nolint
defer client.Close() // nolint

packedMsg, _ := s.Packer.Pack(&message.Entry{ID: 1, Data: []byte("ping")})
beforeBench(b)
Expand All @@ -158,6 +162,7 @@ func Benchmark_OneRouteMessageGetSet(b *testing.B) {
return ctx.Response(2, v)
})
go s.Serve(":0") // nolint
defer s.Stop() // nolint

<-s.accepting

Expand All @@ -166,7 +171,7 @@ func Benchmark_OneRouteMessageGetSet(b *testing.B) {
if err != nil {
panic(err)
}
// defer client.Close() // nolint
defer client.Close() // nolint

packedMsg, _ := s.Packer.Pack(&message.Entry{ID: 1, Data: []byte("ping")})
beforeBench(b)
Expand All @@ -186,6 +191,7 @@ func Benchmark_OneRouteJsonCodec(b *testing.B) {
return ctx.Response(2, map[string]string{"data": "pong"})
})
go s.Serve(":0") // nolint
defer s.Stop() // nolint

<-s.accepting

Expand All @@ -194,7 +200,7 @@ func Benchmark_OneRouteJsonCodec(b *testing.B) {
if err != nil {
panic(err)
}
// defer client.Close() // nolint
defer client.Close() // nolint

packedMsg, _ := s.Packer.Pack(&message.Entry{ID: 1, Data: []byte(`{"data": "ping"}`)})
beforeBench(b)
Expand All @@ -214,6 +220,7 @@ func Benchmark_OneRouteProtobufCodec(b *testing.B) {
return ctx.Response(2, &pb.Sample{Foo: "test-resp", Bar: req.Bar + 1})
})
go s.Serve(":0") // nolint
defer s.Stop() // nolint

<-s.accepting

Expand All @@ -222,7 +229,7 @@ func Benchmark_OneRouteProtobufCodec(b *testing.B) {
if err != nil {
panic(err)
}
// defer client.Close() // nolint
defer client.Close() // nolint

data, _ := s.Codec.Encode(&pb.Sample{Foo: "test", Bar: 1})
packedMsg, _ := s.Packer.Pack(&message.Entry{ID: 1, Data: data})
Expand All @@ -243,6 +250,7 @@ func Benchmark_OneRouteMsgpackCodec(b *testing.B) {
return ctx.Response(2, &msgpack.Sample{Foo: "test-resp", Bar: req.Bar + 1})
})
go s.Serve(":0") // nolint
defer s.Stop() // nolint

<-s.accepting

Expand All @@ -251,7 +259,7 @@ func Benchmark_OneRouteMsgpackCodec(b *testing.B) {
if err != nil {
panic(err)
}
// defer client.Close() // nolint
defer client.Close() // nolint

data, _ := s.Codec.Encode(&msgpack.Sample{Foo: "test", Bar: 1})
packedMsg, _ := s.Packer.Pack(&message.Entry{ID: 1, Data: data})
Expand Down
2 changes: 1 addition & 1 deletion router.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (r *Router) stop() {

// consumeRequest fetches context from reqQueue, and handle it.
func (r *Router) consumeRequest() {
defer Log.Tracef("router stopped")
defer func() { Log.Tracef("router stopped") }()
for {
select {
case <-r.stopped:
Expand Down
37 changes: 27 additions & 10 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,7 @@ func (s *Session) readInbound(reqQueue chan<- *Context, timeout time.Duration) {
ctx := s.ctxPool.Get().(*Context)
ctx.reset(s, reqEntry)

select {
case reqQueue <- ctx:
case <-s.closed:
if !s.sendReq(ctx, reqQueue) {
Log.Tracef("session %s readInbound exit because session is closed", s.id)
return
}
Expand All @@ -106,6 +104,22 @@ func (s *Session) readInbound(reqQueue chan<- *Context, timeout time.Duration) {
s.close()
}

func (s *Session) sendReq(ctx *Context, reqQueue chan<- *Context) (ok bool) {
defer func() {
if r := recover(); r != nil {
ok = false
}
}()
select {
case reqQueue <- ctx:
ok = true
case <-s.closed:
Log.Tracef("session %s readInbound exit because session is closed", s.id)
ok = false
}
return
}

// writeOutbound fetches message from respQueue channel and writes to TCP connection in a loop.
// Parameter writeTimeout specified the connection writing timeout.
// The loop breaks if errors occurred, or the session is closed.
Expand All @@ -122,20 +136,15 @@ FOR:
return
}

s.ctxPool.Put(ctx)

if ctx.respEntry == nil {
continue
}
// pack message
outboundMsg, err := s.packer.Pack(ctx.respEntry)
outboundMsg, err := s.pack(ctx)
if err != nil {
Log.Errorf("session %s pack outbound message err: %s", s.id, err)
continue
}
if outboundMsg == nil {
continue
}

if writeTimeout > 0 {
if err := s.conn.SetWriteDeadline(time.Now().Add(writeTimeout)); err != nil {
Log.Errorf("session %s set write deadline err: %s", s.id, err)
Expand All @@ -151,3 +160,11 @@ FOR:
s.close()
Log.Tracef("session %s writeOutbound exit because of error", s.id)
}

func (s *Session) pack(ctx *Context) ([]byte, error) {
defer s.ctxPool.Put(ctx)
if ctx.respEntry == nil {
return nil, nil
}
return s.packer.Pack(ctx.respEntry)
}

0 comments on commit 929cbcf

Please sign in to comment.