Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add option to skip log tokenization #23440

Merged
16 changes: 16 additions & 0 deletions .chloggen/add-udp-tokenize-flag.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add option to skip log tokenization. use the 'one_log_per_packet' setting to skip log tokenization if multiline is not used.

# One or more tracking issues related to the change
issues: []
djaglowski marked this conversation as resolved.
Show resolved Hide resolved

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
107 changes: 67 additions & 40 deletions pkg/stanza/operator/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func NewConfigWithID(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, operatorType),
BaseConfig: BaseConfig{
Encoding: helper.NewEncodingConfig(),
Encoding: helper.NewEncodingConfig(),
OneLogPerPacket: false,
Multiline: helper.MultilineConfig{
LineStartPattern: "",
LineEndPattern: ".^", // Use never matching regex to not split data by default
Expand All @@ -57,6 +58,7 @@ type Config struct {
// BaseConfig is the details configuration of a udp input operator.
type BaseConfig struct {
ListenAddress string `mapstructure:"listen_address,omitempty"`
OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"`
AddAttributes bool `mapstructure:"add_attributes,omitempty"`
Encoding helper.EncodingConfig `mapstructure:",squash,omitempty"`
Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty"`
Expand Down Expand Up @@ -97,13 +99,14 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
}

udpInput := &Input{
InputOperator: inputOperator,
address: address,
buffer: make([]byte, MaxUDPSize),
addAttributes: c.AddAttributes,
encoding: encoding,
splitFunc: splitFunc,
resolver: resolver,
InputOperator: inputOperator,
address: address,
buffer: make([]byte, MaxUDPSize),
addAttributes: c.AddAttributes,
encoding: encoding,
splitFunc: splitFunc,
resolver: resolver,
OneLogPerPacket: c.OneLogPerPacket,
}
return udpInput, nil
}
Expand All @@ -112,8 +115,9 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
type Input struct {
buffer []byte
helper.InputOperator
address *net.UDPAddr
addAttributes bool
address *net.UDPAddr
addAttributes bool
OneLogPerPacket bool

connection net.PacketConn
cancel context.CancelFunc
Expand Down Expand Up @@ -159,42 +163,19 @@ func (u *Input) goHandleMessages(ctx context.Context) {
break
}

if u.OneLogPerPacket {
log := truncateMaxLog(message)
handleMessage(u, ctx, remoteAddr, log)
continue
}

scanner := bufio.NewScanner(bytes.NewReader(message))
scanner.Buffer(buf, MaxUDPSize)

scanner.Split(u.splitFunc)

for scanner.Scan() {
decoded, err := u.encoding.Decode(scanner.Bytes())
if err != nil {
u.Errorw("Failed to decode data", zap.Error(err))
continue
}

entry, err := u.NewEntry(string(decoded))
if err != nil {
u.Errorw("Failed to create entry", zap.Error(err))
continue
}

if u.addAttributes {
entry.AddAttribute("net.transport", "IP.UDP")
if addr, ok := u.connection.LocalAddr().(*net.UDPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", u.resolver.GetHostFromIP(ip))
}

if addr, ok := remoteAddr.(*net.UDPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", u.resolver.GetHostFromIP(ip))
}
}

u.Write(ctx, entry)
handleMessage(u, ctx, remoteAddr, scanner.Bytes())
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
}
if err := scanner.Err(); err != nil {
u.Errorw("Scanner error", zap.Error(err))
Expand All @@ -203,6 +184,52 @@ func (u *Input) goHandleMessages(ctx context.Context) {
}()
}

func truncateMaxLog(data []byte) (token []byte) {
dataLength := len(data)
if dataLength >= MaxUDPSize {
return data[:MaxUDPSize]
}

if dataLength == 0 {
haimrubinstein marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

return data
}

func handleMessage(u *Input, ctx context.Context, remoteAddr net.Addr, log []byte) {
decoded, err := u.encoding.Decode(log)
if err != nil {
u.Errorw("Failed to decode data", zap.Error(err))
return
}

entry, err := u.NewEntry(string(decoded))
if err != nil {
u.Errorw("Failed to create entry", zap.Error(err))
return
}

if u.addAttributes {
entry.AddAttribute("net.transport", "IP.UDP")
if addr, ok := u.connection.LocalAddr().(*net.UDPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", u.resolver.GetHostFromIP(ip))
}

if addr, ok := remoteAddr.(*net.UDPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", u.resolver.GetHostFromIP(ip))
}
}

u.Write(ctx, entry)
}

// readMessage will read log messages from the connection.
func (u *Input) readMessage() ([]byte, net.Addr, error) {
n, addr, err := u.connection.ReadFrom(u.buffer)
Expand Down