Skip to content

Commit

Permalink
feat: adding server
Browse files Browse the repository at this point in the history
  • Loading branch information
katallaxie authored Jul 30, 2024
1 parent b32fa80 commit e9d907a
Show file tree
Hide file tree
Showing 4 changed files with 405 additions and 0 deletions.
243 changes: 243 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package server

import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
)

// ErrUnimplemented is returned when a listener is not implemented.
var ErrUnimplemented = errors.New("server: unimplemented")

type token struct{}

// ReadyFunc is the function that is called by Listener to signal
// that it is ready and the next Listener can be called.
type ReadyFunc func()

// RunFunc is a function that is called to attach more routines
// to the server.
type RunFunc func(func() error)

// ServeError ...
type ServeError struct {
Err error
}

// Error implements the error interface.
func (s *ServeError) Error() string { return fmt.Sprintf("server: %s", s.Err) }

// Unwrap ...
func (s *ServeError) Unwrap() error { return s.Err }

// NewServer returns a new error.
func NewServer(err error) *ServeError {
return &ServeError{Err: err}
}

// Server is the interface to be implemented
// to run the server.
//
// s, ctx := WithContext(context.Background())
// s.Listen(listener, false)
//
// if err := s.Wait(); err != nil {
// panic(err)
// }
type Server interface {
// Run is running a new go routine
Listen(listener Listener, ready bool)

// Waits for the server to fail,
// or gracefully shutdown if context is canceled
Wait() error

// SetLimit ...
SetLimit(n int)
}

// Unimplemented is the default implementation.
type Unimplemented struct{}

// Start ...
func (s *Unimplemented) Start(context.Context, ReadyFunc, RunFunc) func() error {
return func() error {
return ErrUnimplemented
}
}

// Listener is the interface to a listener,
// so starting and shutdown of a listener,
// or any routine.
type Listener interface {
// Start is being called on the listener
Start(context.Context, ReadyFunc, RunFunc) func() error
}

type listeners map[Listener]bool

// server holds the instance info of the server
type server struct {
ctx context.Context
cancel context.CancelFunc

wg sync.WaitGroup
errOnce sync.Once
err error

sem chan token

listeners map[Listener]bool

ready chan bool
sys chan os.Signal
}

// WithContext is creating a new server with a context.
func WithContext(ctx context.Context) (*server, context.Context) {
ctx, cancel := context.WithCancel(ctx)

// new server
s := newServer(ctx)
s.cancel = cancel
s.ctx = ctx

return s, ctx
}

func newServer(ctx context.Context) *server {
s := new(server)

ctx, cancel := context.WithCancel(ctx)
s.cancel = cancel
s.ctx = ctx

s.listeners = make(listeners)
s.ready = make(chan bool, 1)
s.sys = make(chan os.Signal, 1)

return s
}

// Listen is adding a listener to the server.
func (s *server) Listen(listener Listener, ready bool) {
s.listeners[listener] = ready
}

// Wait is waiting for the server to shutdown or fail.
// The returned error is the first error that occurred from the listeners.
//
//nolint:gocyclo
func (s *server) Wait() error {
// create ticker for interrupt signals
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

signal.Notify(s.sys, syscall.SIGTERM, syscall.SIGINT)
defer signal.Reset(syscall.SIGINT, syscall.SIGTERM)

OUTTER:
// start all listeners in order
for l, ready := range s.listeners {
readyFunc := func() {
r := ready

var readyOnce sync.Once
readyOnce.Do(func() {
if r {
s.ready <- true
}
})
}

goFn := func(f func() error) { _ = s.run(f) }

// schedule to routines
err := s.run(l.Start(s.ctx, readyFunc, goFn))
if err != nil {
return err
}

// this blocks until ready is called
if ready {
select {
case <-s.ready:
continue OUTTER
case <-s.sys:
s.cancel()
break OUTTER
case <-s.ctx.Done():
break OUTTER
}
}
}

// this is the main loop
for {
select {
case <-ticker.C:
case <-s.sys:
// if there is sys interrupt
// cancel the context of the routines
s.cancel()
case <-s.ctx.Done():
if err := s.ctx.Err(); err != nil {
return NewServer(s.err)
}

return nil
}
}
}

// SetLimit limits the number of active listeners in this server
func (s *server) SetLimit(n int) {
if n < 0 {
s.sem = nil
return
}

if len(s.sem) != 0 {
panic(fmt.Errorf("server: modify limit while %v listeners run", len(s.sem)))
}

s.sem = make(chan token, n)
}

func (s *server) run(f func() error) error {
if s.sem != nil {
s.sem <- token{}
}

s.wg.Add(1)

fn := func() {
defer s.done()

if err := f(); err != nil {
s.errOnce.Do(func() {
s.err = err
if s.cancel != nil {
s.cancel()
}
})
}
}

go fn()

return nil
}

func (s *server) done() {
if s.sem != nil {
<-s.sem
}

s.wg.Done()
}
69 changes: 69 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package server

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestWithContext(t *testing.T) {
srv, ctx := WithContext(context.Background())
assert.Implements(t, (*Server)(nil), srv)
assert.NotNil(t, ctx)
assert.NotNil(t, srv)
assert.Nil(t, srv.sem)
}

func TestSetLimit(t *testing.T) {
srv, ctx := WithContext(context.Background())
assert.Implements(t, (*Server)(nil), srv)
assert.NotNil(t, srv)
assert.NotNil(t, ctx)

srv.SetLimit(1)
assert.NotNil(t, srv.sem)
}

func TestSetLimitZero(t *testing.T) {
srv, ctx := WithContext(context.Background())
assert.Implements(t, (*Server)(nil), srv)
assert.NotNil(t, srv)
assert.NotNil(t, ctx)

srv.SetLimit(0)
assert.NotNil(t, srv.sem)
}

func TestSetLimitNegative(t *testing.T) {
srv, ctx := WithContext(context.Background())
assert.Implements(t, (*Server)(nil), srv)
assert.NotNil(t, srv)
assert.NotNil(t, ctx)

srv.SetLimit(-1)
assert.Nil(t, srv.sem)
}

func TestUnimplemented(t *testing.T) {
srv, ctx := WithContext(context.Background())
assert.Implements(t, (*Server)(nil), srv)
assert.NotNil(t, srv)
assert.NotNil(t, ctx)

l := &Unimplemented{}
assert.Implements(t, (*Listener)(nil), l)

srv.Listen(l, false)
err := srv.Wait()
require.Error(t, err)
require.ErrorIs(t, err, ErrUnimplemented)
}

func TestNewError(t *testing.T) {
err := NewServer(ErrUnimplemented)
assert.Implements(t, (*error)(nil), err)
require.Error(t, err)
require.ErrorIs(t, err, ErrUnimplemented)
}
63 changes: 63 additions & 0 deletions server/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package server

import (
"os"
"path"
"sync"
)

// ServiceEnv is a list of environment variables to lookup the service name.
type ServiceEnv []Name

// Name is used to return the service name.
type Name string

// String returns the name as a string.
func (n Name) String() string {
return string(n)
}

// DefaultEnv is the default environment variables to lookup the service name.
var DefaultEnv = ServiceEnv{Name("SERVICE_NAME")}

func init() {
Service.lookup(DefaultEnv...)
}

// Service is used to configure the
type service struct {
name string

sync.Once
}

// Service is used to configure the service.
var Service = &service{}

// Name returns the service name.
func (s *service) Name() string {
return s.name
}

// Loopkup is used to lookup the service name.
func (s *service) Lookup(names ...Name) string {
s.Do(func() {
s.lookup(names...)
})

return s.Name()
}

func (s *service) lookup(names ...Name) {
for _, name := range names {
v, ok := os.LookupEnv(name.String())
if ok {
s.name = v
break
}
}

if s.name == "" {
s.name = path.Base(os.Args[0])
}
}
Loading

0 comments on commit e9d907a

Please sign in to comment.