diff --git a/.chloggen/add-udp-tokenize-flag.yaml b/.chloggen/add-udp-tokenize-flag.yaml new file mode 100644 index 000000000000..e0bdbe646d8a --- /dev/null +++ b/.chloggen/add-udp-tokenize-flag.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add option to skip log tokenization for both tcp and udp receivers. use the 'one_log_per_packet' setting to skip log tokenization if multiline is not used. + +# One or more tracking issues related to the change +issues: [23440] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: \ No newline at end of file diff --git a/pkg/stanza/docs/operators/tcp_input.md b/pkg/stanza/docs/operators/tcp_input.md index acaaedb4ef1a..ed690a432bcc 100644 --- a/pkg/stanza/docs/operators/tcp_input.md +++ b/pkg/stanza/docs/operators/tcp_input.md @@ -4,20 +4,21 @@ The `tcp_input` operator listens for logs on one or more TCP connections. The op ### Configuration Fields -| Field | Default | Description | -| --- | --- | --- | -| `id` | `tcp_input` | A unique identifier for the operator. | -| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. | -| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory. | -| `listen_address` | required | A listen address of the form `:`. | -| `tls` | nil | An optional `TLS` configuration (see the TLS configuration section). | -| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. | -| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. | -| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes]. | +| Field | Default | Description | +| --- | --- | --- | +| `id` | `tcp_input` | A unique identifier for the operator. | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. | +| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory. | +| `listen_address` | required | A listen address of the form `:`. | +| `tls` | nil | An optional `TLS` configuration (see the TLS configuration section). | +| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. | +| `one_log_per_packet` | false | Skip log tokenization, set to true if logs contains one log per record and multiline is not used. This will improve performance. | +| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. | +| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes]. | | `multiline` | | A `multiline` configuration block. See below for details. | -| `preserve_leading_whitespaces` | false | Whether to preserve leading whitespaces. | -| `preserve_trailing_whitespaces` | false | Whether to preserve trailing whitespaces. | -| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options. | +| `preserve_leading_whitespaces` | false | Whether to preserve leading whitespaces. | +| `preserve_trailing_whitespaces` | false | Whether to preserve trailing whitespaces. | +| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options. | #### TLS Configuration diff --git a/pkg/stanza/docs/operators/udp_input.md b/pkg/stanza/docs/operators/udp_input.md index 6a9584413b56..8697b3f49427 100644 --- a/pkg/stanza/docs/operators/udp_input.md +++ b/pkg/stanza/docs/operators/udp_input.md @@ -4,18 +4,19 @@ The `udp_input` operator listens for logs from UDP packets. ### Configuration Fields -| Field | Default | Description | -| --- | --- | --- | -| `id` | `udp_input` | A unique identifier for the operator. | -| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. | -| `listen_address` | required | A listen address of the form `:`. | -| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. | -| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. | -| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes]. | +| Field | Default | Description | +| --- | --- | --- | +| `id` | `udp_input` | A unique identifier for the operator. | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. | +| `listen_address` | required | A listen address of the form `:`. | +| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. | +| `one_log_per_packet` | false | Skip log tokenization, set to true if logs contains one log per record and multiline is not used. This will improve performance. | +| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. | +| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes]. | | `multiline` | | A `multiline` configuration block. See below for details. | -| `preserve_leading_whitespaces` | false | Whether to preserve leading whitespaces. | -| `preserve_trailing_whitespaces` | false | Whether to preserve trailing whitespaces. | -| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options. | +| `preserve_leading_whitespaces` | false | Whether to preserve leading whitespaces. | +| `preserve_trailing_whitespaces` | false | Whether to preserve trailing whitespaces. | +| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options. | #### `multiline` configuration diff --git a/pkg/stanza/operator/input/tcp/tcp.go b/pkg/stanza/operator/input/tcp/tcp.go index 579c8ad051bc..806e53fbf260 100644 --- a/pkg/stanza/operator/input/tcp/tcp.go +++ b/pkg/stanza/operator/input/tcp/tcp.go @@ -5,10 +5,12 @@ package tcp // import "github.com/open-telemetry/opentelemetry-collector-contrib import ( "bufio" + "bytes" "context" "crypto/rand" "crypto/tls" "fmt" + "io" "net" "strconv" "sync" @@ -48,8 +50,9 @@ func NewConfigWithID(operatorID string) *Config { return &Config{ InputConfig: helper.NewInputConfig(operatorID, operatorType), BaseConfig: BaseConfig{ - Multiline: helper.NewMultilineConfig(), - Encoding: helper.NewEncodingConfig(), + OneLogPerPacket: false, + Multiline: helper.NewMultilineConfig(), + Encoding: helper.NewEncodingConfig(), }, } } @@ -66,6 +69,7 @@ type BaseConfig struct { ListenAddress string `mapstructure:"listen_address,omitempty"` TLS *configtls.TLSServerSetting `mapstructure:"tls,omitempty"` AddAttributes bool `mapstructure:"add_attributes,omitempty"` + OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"` Encoding helper.EncodingConfig `mapstructure:",squash,omitempty"` Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty"` PreserveLeadingWhitespaces bool `mapstructure:"preserve_leading_whitespaces,omitempty"` @@ -114,12 +118,13 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { } tcpInput := &Input{ - InputOperator: inputOperator, - address: c.ListenAddress, - MaxLogSize: int(c.MaxLogSize), - addAttributes: c.AddAttributes, - encoding: encoding, - splitFunc: splitFunc, + InputOperator: inputOperator, + address: c.ListenAddress, + MaxLogSize: int(c.MaxLogSize), + addAttributes: c.AddAttributes, + OneLogPerPacket: c.OneLogPerPacket, + encoding: encoding, + splitFunc: splitFunc, backoff: backoff.Backoff{ Max: 3 * time.Second, }, @@ -139,9 +144,10 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { // Input is an operator that listens for log entries over tcp. type Input struct { helper.InputOperator - address string - MaxLogSize int - addAttributes bool + address string + MaxLogSize int + addAttributes bool + OneLogPerPacket bool listener net.Listener cancel context.CancelFunc @@ -239,48 +245,77 @@ func (t *Input) goHandleMessages(ctx context.Context, conn net.Conn, cancel cont defer t.wg.Done() defer cancel() + if t.OneLogPerPacket { + var buf bytes.Buffer + _, err := io.Copy(&buf, conn) + if err != nil { + t.Errorw("IO copy net connection buffer error", zap.Error(err)) + } + log := truncateMaxLog(buf.Bytes(), t.MaxLogSize) + t.handleMessage(ctx, conn, log) + return + } + buf := make([]byte, 0, t.MaxLogSize) + scanner := bufio.NewScanner(conn) scanner.Buffer(buf, t.MaxLogSize) scanner.Split(t.splitFunc) for scanner.Scan() { - decoded, err := t.encoding.Decode(scanner.Bytes()) - if err != nil { - t.Errorw("Failed to decode data", zap.Error(err)) - continue - } + t.handleMessage(ctx, conn, scanner.Bytes()) + } - entry, err := t.NewEntry(string(decoded)) - if err != nil { - t.Errorw("Failed to create entry", zap.Error(err)) - continue - } + if err := scanner.Err(); err != nil { + t.Errorw("Scanner error", zap.Error(err)) + } + }() +} - if t.addAttributes { - entry.AddAttribute("net.transport", "IP.TCP") - if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok { - ip := addr.IP.String() - entry.AddAttribute("net.peer.ip", ip) - entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10)) - entry.AddAttribute("net.peer.name", t.resolver.GetHostFromIP(ip)) - } +func (t *Input) handleMessage(ctx context.Context, conn net.Conn, log []byte) { + decoded, err := t.encoding.Decode(log) + if err != nil { + t.Errorw("Failed to decode data", zap.Error(err)) + return + } - if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok { - ip := addr.IP.String() - entry.AddAttribute("net.host.ip", addr.IP.String()) - entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10)) - entry.AddAttribute("net.host.name", t.resolver.GetHostFromIP(ip)) - } - } + entry, err := t.NewEntry(string(decoded)) + if err != nil { + t.Errorw("Failed to create entry", zap.Error(err)) + return + } - t.Write(ctx, entry) + if t.addAttributes { + entry.AddAttribute("net.transport", "IP.TCP") + if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok { + ip := addr.IP.String() + entry.AddAttribute("net.peer.ip", ip) + entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10)) + entry.AddAttribute("net.peer.name", t.resolver.GetHostFromIP(ip)) } - if err := scanner.Err(); err != nil { - t.Errorw("Scanner error", zap.Error(err)) + + if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok { + ip := addr.IP.String() + entry.AddAttribute("net.host.ip", addr.IP.String()) + entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10)) + entry.AddAttribute("net.host.name", t.resolver.GetHostFromIP(ip)) } - }() + } + + t.Write(ctx, entry) +} + +func truncateMaxLog(data []byte, maxLogSize int) (token []byte) { + if len(data) >= maxLogSize { + return data[:maxLogSize] + } + + if len(data) == 0 { + return nil + } + + return data } // Stop will stop listening for log entries over TCP. diff --git a/pkg/stanza/operator/input/udp/udp.go b/pkg/stanza/operator/input/udp/udp.go index fe5a8a9c6221..8566d1d164a4 100644 --- a/pkg/stanza/operator/input/udp/udp.go +++ b/pkg/stanza/operator/input/udp/udp.go @@ -39,7 +39,8 @@ func NewConfigWithID(operatorID string) *Config { return &Config{ InputConfig: helper.NewInputConfig(operatorID, operatorType), BaseConfig: BaseConfig{ - Encoding: helper.NewEncodingConfig(), + Encoding: helper.NewEncodingConfig(), + OneLogPerPacket: false, Multiline: helper.MultilineConfig{ LineStartPattern: "", LineEndPattern: ".^", // Use never matching regex to not split data by default @@ -57,6 +58,7 @@ type Config struct { // BaseConfig is the details configuration of a udp input operator. type BaseConfig struct { ListenAddress string `mapstructure:"listen_address,omitempty"` + OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"` AddAttributes bool `mapstructure:"add_attributes,omitempty"` Encoding helper.EncodingConfig `mapstructure:",squash,omitempty"` Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty"` @@ -97,13 +99,14 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { } udpInput := &Input{ - InputOperator: inputOperator, - address: address, - buffer: make([]byte, MaxUDPSize), - addAttributes: c.AddAttributes, - encoding: encoding, - splitFunc: splitFunc, - resolver: resolver, + InputOperator: inputOperator, + address: address, + buffer: make([]byte, MaxUDPSize), + addAttributes: c.AddAttributes, + encoding: encoding, + splitFunc: splitFunc, + resolver: resolver, + OneLogPerPacket: c.OneLogPerPacket, } return udpInput, nil } @@ -112,8 +115,9 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { type Input struct { buffer []byte helper.InputOperator - address *net.UDPAddr - addAttributes bool + address *net.UDPAddr + addAttributes bool + OneLogPerPacket bool connection net.PacketConn cancel context.CancelFunc @@ -159,42 +163,19 @@ func (u *Input) goHandleMessages(ctx context.Context) { break } + if u.OneLogPerPacket { + log := truncateMaxLog(message) + u.handleMessage(ctx, remoteAddr, log) + continue + } + scanner := bufio.NewScanner(bytes.NewReader(message)) scanner.Buffer(buf, MaxUDPSize) scanner.Split(u.splitFunc) for scanner.Scan() { - decoded, err := u.encoding.Decode(scanner.Bytes()) - if err != nil { - u.Errorw("Failed to decode data", zap.Error(err)) - continue - } - - entry, err := u.NewEntry(string(decoded)) - if err != nil { - u.Errorw("Failed to create entry", zap.Error(err)) - continue - } - - if u.addAttributes { - entry.AddAttribute("net.transport", "IP.UDP") - if addr, ok := u.connection.LocalAddr().(*net.UDPAddr); ok { - ip := addr.IP.String() - entry.AddAttribute("net.host.ip", addr.IP.String()) - entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10)) - entry.AddAttribute("net.host.name", u.resolver.GetHostFromIP(ip)) - } - - if addr, ok := remoteAddr.(*net.UDPAddr); ok { - ip := addr.IP.String() - entry.AddAttribute("net.peer.ip", ip) - entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10)) - entry.AddAttribute("net.peer.name", u.resolver.GetHostFromIP(ip)) - } - } - - u.Write(ctx, entry) + u.handleMessage(ctx, remoteAddr, scanner.Bytes()) } if err := scanner.Err(); err != nil { u.Errorw("Scanner error", zap.Error(err)) @@ -203,6 +184,51 @@ func (u *Input) goHandleMessages(ctx context.Context) { }() } +func truncateMaxLog(data []byte) (token []byte) { + if len(data) >= MaxUDPSize { + return data[:MaxUDPSize] + } + + if len(data) == 0 { + return nil + } + + return data +} + +func (u *Input) handleMessage(ctx context.Context, remoteAddr net.Addr, log []byte) { + decoded, err := u.encoding.Decode(log) + if err != nil { + u.Errorw("Failed to decode data", zap.Error(err)) + return + } + + entry, err := u.NewEntry(string(decoded)) + if err != nil { + u.Errorw("Failed to create entry", zap.Error(err)) + return + } + + if u.addAttributes { + entry.AddAttribute("net.transport", "IP.UDP") + if addr, ok := u.connection.LocalAddr().(*net.UDPAddr); ok { + ip := addr.IP.String() + entry.AddAttribute("net.host.ip", addr.IP.String()) + entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10)) + entry.AddAttribute("net.host.name", u.resolver.GetHostFromIP(ip)) + } + + if addr, ok := remoteAddr.(*net.UDPAddr); ok { + ip := addr.IP.String() + entry.AddAttribute("net.peer.ip", ip) + entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10)) + entry.AddAttribute("net.peer.name", u.resolver.GetHostFromIP(ip)) + } + } + + u.Write(ctx, entry) +} + // readMessage will read log messages from the connection. func (u *Input) readMessage() ([]byte, net.Addr, error) { n, addr, err := u.connection.ReadFrom(u.buffer) diff --git a/receiver/tcplogreceiver/README.md b/receiver/tcplogreceiver/README.md index a4c86f094f9b..185bfaaa1156 100644 --- a/receiver/tcplogreceiver/README.md +++ b/receiver/tcplogreceiver/README.md @@ -17,17 +17,18 @@ Receives logs over TCP. ## Configuration -| Field | Default | Description | -| --- | --- | --- | -| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory | -| `listen_address` | required | A listen address of the form `:` | -| `tls` | nil | An optional `TLS` configuration (see the TLS configuration section) | -| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | -| `resource` | {} | A map of `key: value` pairs to add to the entry's resource | -| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes] | -| `multiline` | | A `multiline` configuration block. See below for details | -| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options | -| `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details | +| Field | Default | Description | +| --- | --- | --- | +| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory | +| `listen_address` | required | A listen address of the form `:` | +| `tls` | nil | An optional `TLS` configuration (see the TLS configuration section) | +| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | +| `one_log_per_packet` | false | Skip log tokenization, set to true if logs contains one log per record and multiline is not used. This will improve performance. | +| `resource` | {} | A map of `key: value` pairs to add to the entry's resource | +| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes] | +| `multiline` | | A `multiline` configuration block. See below for details | +| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options | +| `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details | ### TLS Configuration diff --git a/receiver/udplogreceiver/README.md b/receiver/udplogreceiver/README.md index e4cf9943591b..1e0d4ca3b383 100644 --- a/receiver/udplogreceiver/README.md +++ b/receiver/udplogreceiver/README.md @@ -16,15 +16,16 @@ Receives logs over UDP. ## Configuration Fields -| Field | Default | Description | -| --- | --- | --- | -| `listen_address` | required | A listen address of the form `:` | -| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | -| `resource` | {} | A map of `key: value` pairs to add to the entry's resource | -| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes] | -| `multiline` | | A `multiline` configuration block. See below for details | -| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options | -| `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details | +| Field | Default | Description | +| --- | --- | --- | +| `listen_address` | required | A listen address of the form `:` | +| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | +| `one_log_per_packet` | false | Skip log tokenization, set to true if logs contains one log per record and multiline is not used. This will improve performance. | +| `resource` | {} | A map of `key: value` pairs to add to the entry's resource | +| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes] | +| `multiline` | | A `multiline` configuration block. See below for details | +| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options | +| `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details | ### Operators