Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add option to skip log tokenization #23440

Merged
89 changes: 58 additions & 31 deletions pkg/stanza/operator/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func NewConfigWithID(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, operatorType),
BaseConfig: BaseConfig{
Encoding: helper.NewEncodingConfig(),
Encoding: helper.NewEncodingConfig(),
TokenizeLog: true,
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
Multiline: helper.MultilineConfig{
LineStartPattern: "",
LineEndPattern: ".^", // Use never matching regex to not split data by default
Expand All @@ -57,6 +58,7 @@ type Config struct {
// BaseConfig is the details configuration of a udp input operator.
type BaseConfig struct {
ListenAddress string `mapstructure:"listen_address,omitempty"`
TokenizeLog bool `mapstructure:"tokenize_log,omitempty"`
AddAttributes bool `mapstructure:"add_attributes,omitempty"`
Encoding helper.EncodingConfig `mapstructure:",squash,omitempty"`
Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty"`
Expand Down Expand Up @@ -104,6 +106,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
encoding: encoding,
splitFunc: splitFunc,
resolver: resolver,
tokenizeLog: c.TokenizeLog,
}
return udpInput, nil
}
Expand All @@ -114,6 +117,7 @@ type Input struct {
helper.InputOperator
address *net.UDPAddr
addAttributes bool
tokenizeLog bool

connection net.PacketConn
cancel context.CancelFunc
Expand Down Expand Up @@ -159,42 +163,19 @@ func (u *Input) goHandleMessages(ctx context.Context) {
break
}

if !u.tokenizeLog {
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
log := VerifyLog(message)
handleMessage(u, ctx, remoteAddr, log)
continue
}

scanner := bufio.NewScanner(bytes.NewReader(message))
scanner.Buffer(buf, MaxUDPSize)

scanner.Split(u.splitFunc)

for scanner.Scan() {
decoded, err := u.encoding.Decode(scanner.Bytes())
if err != nil {
u.Errorw("Failed to decode data", zap.Error(err))
continue
}

entry, err := u.NewEntry(string(decoded))
if err != nil {
u.Errorw("Failed to create entry", zap.Error(err))
continue
}

if u.addAttributes {
entry.AddAttribute("net.transport", "IP.UDP")
if addr, ok := u.connection.LocalAddr().(*net.UDPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", u.resolver.GetHostFromIP(ip))
}

if addr, ok := remoteAddr.(*net.UDPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", u.resolver.GetHostFromIP(ip))
}
}

u.Write(ctx, entry)
handleMessage(u, ctx, remoteAddr, scanner.Bytes())
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
}
if err := scanner.Err(); err != nil {
u.Errorw("Scanner error", zap.Error(err))
Expand All @@ -203,6 +184,52 @@ func (u *Input) goHandleMessages(ctx context.Context) {
}()
}

func VerifyLog(data []byte) (token []byte) {
haimrubinstein marked this conversation as resolved.
Show resolved Hide resolved
dataLength := len(data)
if dataLength >= MaxUDPSize {
return data[:MaxUDPSize]
}

if dataLength == 0 {
haimrubinstein marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

return data
}

func handleMessage(u *Input, ctx context.Context, remoteAddr net.Addr, log []byte) {
decoded, err := u.encoding.Decode(log)
if err != nil {
u.Errorw("Failed to decode data", zap.Error(err))
return
}

entry, err := u.NewEntry(string(decoded))
if err != nil {
u.Errorw("Failed to create entry", zap.Error(err))
return
}

if u.addAttributes {
entry.AddAttribute("net.transport", "IP.UDP")
if addr, ok := u.connection.LocalAddr().(*net.UDPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", u.resolver.GetHostFromIP(ip))
}

if addr, ok := remoteAddr.(*net.UDPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", u.resolver.GetHostFromIP(ip))
}
}

u.Write(ctx, entry)
}

// readMessage will read log messages from the connection.
func (u *Input) readMessage() ([]byte, net.Addr, error) {
n, addr, err := u.connection.ReadFrom(u.buffer)
Expand Down