Skip to content

Commit

Permalink
Add ClientFactory to TCP input source to add SplitFunc/NetworkF… (#8543)
Browse files Browse the repository at this point in the history
This PR changes the way TCP input source is created to a ConnectionFactory based model which takes a SplitFunc, NetworkFunc and two ClientCallback funcs that can be called during a client connect and disconnect.

These hooks allow the TCP implementer to do better logging, stateful processing and also spin up splitters per client.
  • Loading branch information
vjsamuel authored and Steffen Siering committed Jul 17, 2019
1 parent 34dcc89 commit e5c955f
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 57 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Use the go-lookslike library for testing in heartbeat. Eventually the mapval package will be replaced with it. {pull}12540[12540]
- New ReporterV2 interfaces that can receive a context on `Fetch(ctx, reporter)`, or `Run(ctx, reporter)`. {pull}11981[11981]
- Generate configuration from `mage` for all Beats. {pull}12618[12618]
- Add ClientFactory to TCP input source to add SplitFunc/NetworkFuncs per client. {pull}8543[8543]
8 changes: 5 additions & 3 deletions filebeat/input/syslog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var defaultUDP = udp.Config{
}

func factory(
cb inputsource.NetworkFunc,
nf inputsource.NetworkFunc,
config common.ConfigNamespace,
) (inputsource.Network, error) {
n, cfg := config.Name(), config.Config()
Expand All @@ -77,13 +77,15 @@ func factory(
return nil, fmt.Errorf("error creating splitFunc from delimiter %s", config.LineDelimiter)
}

return tcp.New(&config.Config, splitFunc, cb)
factory := tcp.SplitHandlerFactory(nf, splitFunc)

return tcp.New(&config.Config, factory)
case udp.Name:
config := defaultUDP
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
return udp.New(&config, cb), nil
return udp.New(&config, nf), nil
default:
return nil, fmt.Errorf("you must choose between TCP or UDP")
}
Expand Down
4 changes: 3 additions & 1 deletion filebeat/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ func NewInput(
return nil, fmt.Errorf("unable to create splitFunc for delimiter %s", config.LineDelimiter)
}

server, err := tcp.New(&config.Config, splitFunc, cb)
factory := tcp.SplitHandlerFactory(cb, splitFunc)

server, err := tcp.New(&config.Config, factory)
if err != nil {
return nil, err
}
Expand Down
61 changes: 39 additions & 22 deletions filebeat/inputsource/tcp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ import (
"github.com/elastic/beats/libbeat/logp"
)

// Client is a remote client.
type client struct {
// splitHandler is a TCP client that has splitting capabilities.
type splitHandler struct {
conn net.Conn
log *logp.Logger
callback inputsource.NetworkFunc
done chan struct{}
metadata inputsource.NetworkMetadata
Expand All @@ -43,33 +42,50 @@ type client struct {
timeout time.Duration
}

func newClient(
conn net.Conn,
log *logp.Logger,
// ClientFactory returns a ConnectionHandler func
type ClientFactory func(config Config) ConnectionHandler

// ConnectionHandler interface provides mechanisms for handling of incoming TCP connections
type ConnectionHandler interface {
Handle(conn net.Conn) error
Close()
}

// SplitHandlerFactory allows creation of a ConnectionHandler that can do splitting of messages received on a TCP connection.
func SplitHandlerFactory(callback inputsource.NetworkFunc, splitFunc bufio.SplitFunc) ClientFactory {
return func(config Config) ConnectionHandler {
return newSplitHandler(callback, splitFunc, uint64(config.MaxMessageSize), config.Timeout)
}
}

// newSplitHandler allows creation of a TCP client that has splitting capabilities.
func newSplitHandler(
callback inputsource.NetworkFunc,
splitFunc bufio.SplitFunc,
maxReadMessage uint64,
timeout time.Duration,
) *client {
client := &client{
conn: conn,
log: log.With("remote_address", conn.RemoteAddr()),
) ConnectionHandler {
client := &splitHandler{
callback: callback,
done: make(chan struct{}),
splitFunc: splitFunc,
maxMessageSize: maxReadMessage,
timeout: timeout,
metadata: inputsource.NetworkMetadata{
RemoteAddr: conn.RemoteAddr(),
TLS: extractSSLInformation(conn),
},
}
extractSSLInformation(conn)
return client
}

func (c *client) handle() error {
r := NewResetableLimitedReader(NewDeadlineReader(c.conn, c.timeout), c.maxMessageSize)
// Handle takes a connection as input and processes data received on it.
func (c *splitHandler) Handle(conn net.Conn) error {
c.conn = conn
c.metadata = inputsource.NetworkMetadata{
RemoteAddr: conn.RemoteAddr(),
TLS: extractSSLInformation(conn),
}

log := logp.NewLogger("split_client").With("remote_addr", conn.RemoteAddr().String())

r := NewResetableLimitedReader(NewDeadlineReader(conn, c.timeout), c.maxMessageSize)
buf := bufio.NewReader(r)
scanner := bufio.NewScanner(buf)
scanner.Split(c.splitFunc)
Expand All @@ -79,24 +95,24 @@ func (c *client) handle() error {
for scanner.Scan() {
err := scanner.Err()
if err != nil {
// we are forcing a close on the socket, lets ignore any error that could happen.
// we are forcing a Close on the socket, lets ignore any error that could happen.
select {
case <-c.done:
break
default:
}
// This is a user defined limit and we should notify the user.
if IsMaxReadBufferErr(err) {
c.log.Errorw("client error", "error", err)
log.Errorw("split_client error", "error", err)
}
return errors.Wrap(err, "tcp client error")
return errors.Wrap(err, "tcp split_client error")
}
r.Reset()
c.callback(scanner.Bytes(), c.metadata)
}

// We are out of the scanner, either we reached EOF or another fatal error occurred.
// like we failed to complete the TLS handshake or we are missing the client certificate when
// like we failed to complete the TLS handshake or we are missing the splitHandler certificate when
// mutual auth is on, which is the default.
if err := scanner.Err(); err != nil {
return err
Expand All @@ -105,7 +121,8 @@ func (c *client) handle() error {
return nil
}

func (c *client) close() {
// Close is used to perform clean up before the client is released.
func (c *splitHandler) Close() {
close(c.done)
c.conn.Close()
}
Expand Down
51 changes: 22 additions & 29 deletions filebeat/inputsource/tcp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"golang.org/x/net/netutil"

"github.com/elastic/beats/filebeat/inputsource"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/transport"
Expand All @@ -36,38 +35,35 @@ import (
// Server represent a TCP server
type Server struct {
sync.RWMutex
callback inputsource.NetworkFunc
config *Config
Listener net.Listener
clients map[*client]struct{}
clients map[ConnectionHandler]struct{}
wg sync.WaitGroup
done chan struct{}
splitFunc bufio.SplitFunc
factory ClientFactory
log *logp.Logger
tlsConfig *transport.TLSConfig
}

// New creates a new tcp server
func New(
config *Config,
splitFunc bufio.SplitFunc,
callback inputsource.NetworkFunc,
factory ClientFactory,
) (*Server, error) {
tlsConfig, err := tlscommon.LoadTLSServerConfig(config.TLS)
if err != nil {
return nil, err
}

if splitFunc == nil {
return nil, fmt.Errorf("SplitFunc can't be empty")
if factory == nil {
return nil, fmt.Errorf("ClientFactory can't be empty")
}

return &Server{
config: config,
callback: callback,
clients: make(map[*client]struct{}, 0),
clients: make(map[ConnectionHandler]struct{}, 0),
done: make(chan struct{}),
splitFunc: splitFunc,
factory: factory,
log: logp.NewLogger("tcp").With("address", config.Host),
tlsConfig: tlsConfig,
}, nil
Expand All @@ -91,7 +87,11 @@ func (s *Server) Start() error {
return nil
}

// Run start and run a new TCP listener to receive new data
// Run start and run a new TCP listener to receive new data. When a new connection is accepted, the factory is used
// to create a ConnectionHandler. The ConnectionHandler takes the connection as input and handles the data that is
// being received via tha io.Reader. Most clients use the splitHandler which can take a bufio.SplitFunc and parse
// out each message into an appropriate event. The Close() of the ConnectionHandler can be used to clean up the
// connection either by client or server based on need.
func (s *Server) run() {
for {
conn, err := s.Listener.Accept()
Expand All @@ -105,14 +105,7 @@ func (s *Server) run() {
}
}

client := newClient(
conn,
s.log,
s.callback,
s.splitFunc,
uint64(s.config.MaxMessageSize),
s.config.Timeout,
)
client := s.factory(*s.config)

s.wg.Add(1)
go func() {
Expand All @@ -124,13 +117,13 @@ func (s *Server) run() {
defer s.unregisterClient(client)
s.log.Debugw("New client", "remote_address", conn.RemoteAddr(), "total", s.clientsCount())

err := client.handle()
err := client.Handle(conn)
if err != nil {
s.log.Debugw("Client error", "error", err)
s.log.Debugw("client error", "error", err)
}

defer s.log.Debugw(
"Client disconnected",
"client disconnected",
"remote_address",
conn.RemoteAddr(),
"total",
Expand All @@ -140,34 +133,34 @@ func (s *Server) run() {
}
}

// Stop stops accepting new incoming TCP connection and close any active clients
// Stop stops accepting new incoming TCP connection and Close any active clients
func (s *Server) Stop() {
s.log.Info("Stopping TCP server")
close(s.done)
s.Listener.Close()
for _, client := range s.allClients() {
client.close()
client.Close()
}
s.wg.Wait()
s.log.Info("TCP server stopped")
}

func (s *Server) registerClient(client *client) {
func (s *Server) registerClient(client ConnectionHandler) {
s.Lock()
defer s.Unlock()
s.clients[client] = struct{}{}
}

func (s *Server) unregisterClient(client *client) {
func (s *Server) unregisterClient(client ConnectionHandler) {
s.Lock()
defer s.Unlock()
delete(s.clients, client)
}

func (s *Server) allClients() []*client {
func (s *Server) allClients() []ConnectionHandler {
s.RLock()
defer s.RUnlock()
currentClients := make([]*client, len(s.clients))
currentClients := make([]ConnectionHandler, len(s.clients))
idx := 0
for client := range s.clients {
currentClients[idx] = client
Expand Down
9 changes: 7 additions & 2 deletions filebeat/inputsource/tcp/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ func TestReceiveEventsAndMetadata(t *testing.T) {
if !assert.NoError(t, err) {
return
}
server, err := New(&config, test.splitFunc, to)

factory := SplitHandlerFactory(to, test.splitFunc)
server, err := New(&config, factory)
if !assert.NoError(t, err) {
return
}
Expand Down Expand Up @@ -217,7 +219,10 @@ func TestReceiveNewEventsConcurrently(t *testing.T) {
if !assert.NoError(t, err) {
return
}
server, err := New(&config, bufio.ScanLines, to)

factory := SplitHandlerFactory(to, bufio.ScanLines)

server, err := New(&config, factory)
if !assert.NoError(t, err) {
return
}
Expand Down

0 comments on commit e5c955f

Please sign in to comment.