Skip to content

Commit

Permalink
feat: add server events (#86)
Browse files Browse the repository at this point in the history
* feat: add server events

* fix: race condition
  • Loading branch information
Reasno authored Mar 15, 2021
1 parent 96ea4bf commit acd9797
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 13 deletions.
38 changes: 37 additions & 1 deletion c_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package core

import (
"context"
"github.com/DoNewsCode/core/contract"
"github.com/DoNewsCode/core/events"
"io/ioutil"
"os"
"sync/atomic"
"testing"
"time"

Expand All @@ -14,12 +17,45 @@ import (
)

func TestC_Serve(t *testing.T) {
c := New(WithInline("http.addr", ":19998"), WithInline("grpc.addr", ":19999"))
var called int32
c := New(
WithInline("http.addr", ":19998"),
WithInline("grpc.addr", ":19999"),
)
c.ProvideEssentials()
c.Invoke(func(dispatcher contract.Dispatcher) {
dispatcher.Subscribe(events.Listen(events.From(OnHTTPServerStart{}), func(ctx context.Context, start contract.Event) error {
atomic.AddInt32(&called, 1)
assert.Equal(t, "[::]:19998", start.Data().(OnHTTPServerStart).Listener.Addr().String())
return nil
}))
})
c.Invoke(func(dispatcher contract.Dispatcher) {
dispatcher.Subscribe(events.Listen(events.From(OnHTTPServerShutdown{}), func(ctx context.Context, shutdown contract.Event) error {
atomic.AddInt32(&called, 1)
assert.Equal(t, "[::]:19998", shutdown.Data().(OnHTTPServerShutdown).Listener.Addr().String())
return nil
}))
})
c.Invoke(func(dispatcher contract.Dispatcher) {
dispatcher.Subscribe(events.Listen(events.From(OnGRPCServerStart{}), func(ctx context.Context, start contract.Event) error {
atomic.AddInt32(&called, 1)
assert.Equal(t, "[::]:19999", start.Data().(OnGRPCServerStart).Listener.Addr().String())
return nil
}))
})
c.Invoke(func(dispatcher contract.Dispatcher) {
dispatcher.Subscribe(events.Listen(events.From(OnGRPCServerShutdown{}), func(ctx context.Context, shutdown contract.Event) error {
atomic.AddInt32(&called, 1)
assert.Equal(t, "[::]:19999", shutdown.Data().(OnGRPCServerShutdown).Listener.Addr().String())
return nil
}))
})
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
e := c.Serve(ctx)
assert.NoError(t, e)
assert.Equal(t, int32(4), atomic.LoadInt32(&called))
}

func TestC_Default(t *testing.T) {
Expand Down
41 changes: 41 additions & 0 deletions events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package core

import (
"google.golang.org/grpc"
"net"
"net/http"
)

// OnHTTPServerStart is an event triggered when the http server is ready to serve
// traffic. At this point the module is already wired up. This event is useful to
// register service to service discovery.
type OnHTTPServerStart struct {
HTTPServer *http.Server
Listener net.Listener
}

// OnHTTPServerShutdown is an event triggered when the http server is shutting down.
// traffic. At this point The traffic can no longer reach the server, but the
// database and other infrastructures are not closed yet. This event is useful
// to unregister service to service discovery.
type OnHTTPServerShutdown struct {
HTTPServer *http.Server
Listener net.Listener
}

// OnGRPCServerStart is an event triggered when the grpc server is ready to serve
// traffic. At this point the module is already wired up. This event is useful to
// register service to service discovery.
type OnGRPCServerStart struct {
GRPCServer *grpc.Server
Listener net.Listener
}

// OnGRPCServerShutdown is an event triggered when the http server is shutting down.
// traffic. At this point The traffic can no longer reach the server, but the
// database and other infrastructures are not closed yet. This event is useful
// to unregister service to service discovery.
type OnGRPCServerShutdown struct {
GRPCServer *grpc.Server
Listener net.Listener
}
42 changes: 30 additions & 12 deletions serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"context"
"github.com/DoNewsCode/core/events"
"net"
"net/http"
"os"
Expand All @@ -24,11 +25,12 @@ import (
type serveIn struct {
di.In

Dispatcher contract.Dispatcher
Config contract.ConfigAccessor
Logger log.Logger
Container contract.Container
HttpServer *http.Server `optional:"true"`
GrpcServer *grpc.Server `optional:"true"`
HTTPServer *http.Server `optional:"true"`
GRPCServer *grpc.Server `optional:"true"`
Cron *cron.Cron `optional:"true"`
}

Expand Down Expand Up @@ -67,17 +69,25 @@ func newServeCmd(p serveIn) *cobra.Command {
if err != nil {
return errors.Wrap(err, "failed start http server")
}
if p.HttpServer == nil {
p.HttpServer = &http.Server{}
if p.HTTPServer == nil {
p.HTTPServer = &http.Server{}
}
router := mux.NewRouter()
p.Container.ApplyRouter(router)
p.HttpServer.Handler = router
p.HTTPServer.Handler = router
g.Add(func() error {
l.Infof("http service is listening at %s", ln.Addr())
return p.HttpServer.Serve(ln)
p.Dispatcher.Dispatch(
cmd.Context(),
events.Of(OnHTTPServerStart{p.HTTPServer, ln}),
)
defer p.Dispatcher.Dispatch(
cmd.Context(),
events.Of(OnHTTPServerShutdown{p.HTTPServer, ln}),
)
return p.HTTPServer.Serve(ln)
}, func(err error) {
_ = p.HttpServer.Shutdown(context.Background())
_ = p.HTTPServer.Shutdown(context.Background())
_ = ln.Close()
})
}
Expand All @@ -89,15 +99,23 @@ func newServeCmd(p serveIn) *cobra.Command {
if err != nil {
return errors.Wrap(err, "failed start grpc server")
}
if p.GrpcServer == nil {
p.GrpcServer = grpc.NewServer()
if p.GRPCServer == nil {
p.GRPCServer = grpc.NewServer()
}
p.Container.ApplyGRPCServer(p.GrpcServer)
p.Container.ApplyGRPCServer(p.GRPCServer)
g.Add(func() error {
l.Infof("gRPC service is listening at %s", ln.Addr())
return p.GrpcServer.Serve(ln)
p.Dispatcher.Dispatch(
cmd.Context(),
events.Of(OnGRPCServerStart{p.GRPCServer, ln}),
)
defer p.Dispatcher.Dispatch(
cmd.Context(),
events.Of(OnGRPCServerShutdown{p.GRPCServer, ln}),
)
return p.GRPCServer.Serve(ln)
}, func(err error) {
p.GrpcServer.GracefulStop()
p.GRPCServer.GracefulStop()
_ = ln.Close()
})
}
Expand Down

0 comments on commit acd9797

Please sign in to comment.