Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

export more params for logs collecting #728

Merged
merged 1 commit into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/logs_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewLogsAgent() AgentModule {
diagnosticMessageReceiver := diagnostic.NewBufferedMessageReceiver()

// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewProvider(logsconfig.NumberOfPipelines, auditor, diagnosticMessageReceiver, processingRules, endpoints, destinationsCtx)
pipelineProvider := pipeline.NewProvider(coreconfig.NumberOfPipelines(), auditor, diagnosticMessageReceiver, processingRules, endpoints, destinationsCtx)

validatePodContainerID := coreconfig.ValidatePodContainerID()
//
Expand Down
7 changes: 3 additions & 4 deletions agent/logs_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,9 @@ func BuildHTTPEndpointsWithConfig(endpointPrefix string, intakeTrackType logscon
}

batchWait := time.Duration(logsConfig.BatchWait) * time.Second
// TODO support custom param
batchMaxConcurrentSend := 0
batchMaxSize := 100
batchMaxContentSize := 1000000
batchMaxConcurrentSend := coreconfig.BatchConcurrence()
batchMaxSize := coreconfig.BatchMaxSize()
batchMaxContentSize := coreconfig.BatchMaxContentSize()

return NewEndpointsWithBatchSettings(main, false, "http", batchWait, batchMaxConcurrentSend, batchMaxSize, batchMaxContentSize), nil
}
Expand Down
18 changes: 18 additions & 0 deletions conf/logs.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,24 @@ scan_period = 10
## read buffer of udp
frame_size = 9000

## channal size, default 100
## 读取日志缓冲区,行数
chan_size = 1000
## pipeline num , default 4
## 有多少线程处理日志
pipelin=4
## configuration for kafka
## 指定kafka版本
kafka_version="3.3.2"
# 默认0 表示串行,如果对日志顺序有要求,保持默认配置
batch_max_concurrence = 0
# 最大并发批次, 默认100
batch_max_size=100
# 每次最大发送的内容上限 默认1000000
batch_max_contentsize=1000000
# client timeout in seconds
producer_timeout= 10

# 是否开启sasl模式
sasl_enable = false
sasl_user = "admin"
Expand Down
47 changes: 47 additions & 0 deletions config/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ type (
Accuracy string `toml:"accuracy" json:"accuracy"`
KafkaConfig
KubeConfig

ChanSize int `toml:"chan_size" json:"chan_size"`
Pipeline int `toml:"pipeline" json:"pipeline"`
BatchMaxSize int `toml:"batch_max_size" json:"batch_max_size"`
BatchMaxContentSize int `toml:"batch_max_content_size" json:"batch_max_content_size"`
BatchConcurrence int `toml:"batch_max_concurrence" json:"batch_max_concurrence"`
ProducerTimeout int `toml:"producer_timeout" json:"producer_timeout"`

EnableCollectContainer bool `json:"enable_collect_container" toml:"enable_collect_container"`
}
KafkaConfig struct {
Topic string `json:"topic" toml:"topic"`
Expand Down Expand Up @@ -93,6 +102,44 @@ func LogFrameSize() int {
}
return Config.Logs.FrameSize
}
func NumberOfPipelines() int {
if Config.Logs.Pipeline == 0 {
Config.Logs.Pipeline = 4
}
return Config.Logs.Pipeline
}

func ChanSize() int {
if Config.Logs.ChanSize == 0 {
Config.Logs.ChanSize = 100
}
return Config.Logs.ChanSize
}

func BatchMaxSize() int {
if Config.Logs.BatchMaxSize == 0 {
Config.Logs.BatchMaxSize = 100
}
return Config.Logs.BatchMaxSize
}

func BatchMaxContentSize() int {
if Config.Logs.BatchMaxContentSize == 0 {
Config.Logs.BatchMaxContentSize = 1000000
}
return Config.Logs.BatchMaxContentSize
}

func BatchConcurrence() int {
return Config.Logs.BatchConcurrence
}

func ClientTimeout() int {
if Config.Logs.ProducerTimeout == 0 {
Config.Logs.ProducerTimeout = 10
}
return Config.Logs.ProducerTimeout
}

func ValidatePodContainerID() bool {
return false
Expand Down
6 changes: 0 additions & 6 deletions config/logs/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,6 @@

package logs

// Pipeline constraints
const (
ChanSize = 100
NumberOfPipelines = 4
)

const (
// DateFormat is the default date format.
DateFormat = "2006-01-02T15:04:05.000000000Z"
Expand Down
4 changes: 2 additions & 2 deletions logs/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"sync"
"time"

config "flashcat.cloud/categraf/config/logs"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/logs/message"
)

Expand Down Expand Up @@ -96,7 +96,7 @@ func (a *RegistryAuditor) Stop() {
func (a *RegistryAuditor) createChannels() {
a.chansMutex.Lock()
defer a.chansMutex.Unlock()
a.inputChan = make(chan *message.Message, config.ChanSize)
a.inputChan = make(chan *message.Message, config.ChanSize())
a.done = make(chan struct{})
}

Expand Down
3 changes: 2 additions & 1 deletion logs/client/http/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync"
"time"

coreconfig "flashcat.cloud/categraf/config"
logsconfig "flashcat.cloud/categraf/config/logs"
"flashcat.cloud/categraf/logs/client"
"flashcat.cloud/categraf/pkg/backoff"
Expand Down Expand Up @@ -185,7 +186,7 @@ func (d *Destination) unconditionalSend(payload []byte) (err error) {
// SendAsync sends a payload in background.
func (d *Destination) SendAsync(payload []byte) {
d.once.Do(func() {
payloadChan := make(chan []byte, logsconfig.ChanSize)
payloadChan := make(chan []byte, coreconfig.ChanSize())
d.sendInBackground(payloadChan)
d.payloadChan = payloadChan
})
Expand Down
15 changes: 7 additions & 8 deletions logs/client/kafka/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@ const (
JSONContentType = "application/json"
)

// HTTP errors.
// errors.
var (
errClient = errors.New("client error")
errServer = errors.New("server error")
)

// emptyPayload is an empty payload used to check HTTP connectivity without sending logs.
var emptyPayload []byte

// Destination sends a payload over HTTP.
// Destination sends a payload over Kafka.
type Destination struct {
topic string
brokers []string
Expand All @@ -59,7 +56,7 @@ type Destination struct {
// there is no concurrency and the background sending pipeline will block while sending each payload.
// TODO: add support for SOCKS5
func NewDestination(endpoint logsconfig.Endpoint, contentType string, destinationsContext *client.DestinationsContext, maxConcurrentBackgroundSends int) *Destination {
return newDestination(endpoint, contentType, destinationsContext, time.Second*10, maxConcurrentBackgroundSends)
return newDestination(endpoint, contentType, destinationsContext, time.Duration(coreconfig.ClientTimeout())*time.Second, maxConcurrentBackgroundSends)
}

func newDestination(endpoint logsconfig.Endpoint, contentType string, destinationsContext *client.DestinationsContext, timeout time.Duration, maxConcurrentBackgroundSends int) *Destination {
Expand Down Expand Up @@ -91,6 +88,8 @@ func newDestination(endpoint logsconfig.Endpoint, contentType string, destinatio
coreconfig.Config.Logs.Producer.Return.Successes = true
}

coreconfig.Config.Logs.Config.Producer.Timeout = timeout

if coreconfig.Config.Logs.SendWithTLS && coreconfig.Config.Logs.SendType == "kafka" {
coreconfig.Config.Logs.Config.Net.TLS.Enable = true
coreconfig.Config.Logs.UseTLS = true
Expand Down Expand Up @@ -152,7 +151,7 @@ func errorToTag(err error) string {
}
}

// Send sends a payload over HTTP,
// Send sends a payload over Kafka,
// the error returned can be retryable and it is the responsibility of the callee to retry.
func (d *Destination) Send(payload []byte) error {
if d.blockedUntil.After(time.Now()) {
Expand Down Expand Up @@ -208,7 +207,7 @@ func (d *Destination) unconditionalSend(payload []byte) (err error) {
// SendAsync sends a payload in background.
func (d *Destination) SendAsync(payload []byte) {
d.once.Do(func() {
payloadChan := make(chan []byte, logsconfig.ChanSize)
payloadChan := make(chan []byte, coreconfig.ChanSize())
d.sendInBackground(payloadChan)
d.payloadChan = payloadChan
})
Expand Down
3 changes: 2 additions & 1 deletion logs/client/tcp/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"time"

coreconfig "flashcat.cloud/categraf/config"
logsconfig "flashcat.cloud/categraf/config/logs"
"flashcat.cloud/categraf/logs/client"
)
Expand Down Expand Up @@ -86,7 +87,7 @@ func (d *Destination) Send(payload []byte) error {
func (d *Destination) SendAsync(payload []byte) {
// host := d.connManager.endpoint.Host
d.once.Do(func() {
inputChan := make(chan []byte, logsconfig.ChanSize)
inputChan := make(chan []byte, coreconfig.ChanSize())
d.inputChan = inputChan
go d.runAsync()
})
Expand Down
7 changes: 3 additions & 4 deletions logs/diagnostic/message_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"time"

coreconfig "flashcat.cloud/categraf/config"
logsconfig "flashcat.cloud/categraf/config/logs"
"flashcat.cloud/categraf/logs/message"
)

Expand Down Expand Up @@ -44,13 +43,13 @@ type Filters struct {
// NewBufferedMessageReceiver creates a new MessageReceiver
func NewBufferedMessageReceiver() *BufferedMessageReceiver {
return &BufferedMessageReceiver{
inputChan: make(chan messagePair, logsconfig.ChanSize),
inputChan: make(chan messagePair, coreconfig.ChanSize()),
}
}

// Start opens new input channel
func (b *BufferedMessageReceiver) Start() {
b.inputChan = make(chan messagePair, logsconfig.ChanSize)
b.inputChan = make(chan messagePair, coreconfig.ChanSize())
}

// Stop closes the input channel
Expand Down Expand Up @@ -99,7 +98,7 @@ func (b *BufferedMessageReceiver) HandleMessage(m message.Message, redactedMsg [

// Filter writes the buffered events from the input channel formatted as a string to the output channel
func (b *BufferedMessageReceiver) Filter(filters *Filters, done <-chan struct{}) <-chan string {
out := make(chan string, logsconfig.ChanSize)
out := make(chan string, coreconfig.ChanSize())
go func() {
defer close(out)
for {
Expand Down
5 changes: 3 additions & 2 deletions logs/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package pipeline
import (
"context"

coreconfig "flashcat.cloud/categraf/config"
logsconfig "flashcat.cloud/categraf/config/logs"
"flashcat.cloud/categraf/logs/client"
"flashcat.cloud/categraf/logs/client/http"
Expand Down Expand Up @@ -65,14 +66,14 @@ func NewPipeline(outputChan chan *message.Message, processingRules []*logsconfig
encoder = processor.RawEncoder
}

senderChan := make(chan *message.Message, logsconfig.ChanSize)
senderChan := make(chan *message.Message, coreconfig.ChanSize())
sender := sender.NewSender(senderChan, outputChan, destinations, strategy)

if endpoints.UseProto {
encoder = processor.ProtoEncoder
}

inputChan := make(chan *message.Message, logsconfig.ChanSize)
inputChan := make(chan *message.Message, coreconfig.ChanSize())
processor := processor.New(inputChan, senderChan, processingRules, encoder, diagnosticMessageReceiver)

return &Pipeline{
Expand Down
4 changes: 1 addition & 3 deletions logs/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ package processor

import (
"context"
"fmt"
"log"
"os"
"sync"

coreconfig "flashcat.cloud/categraf/config"
Expand Down Expand Up @@ -98,7 +96,7 @@ func (p *Processor) processMessage(msg *message.Message) {
return
}
if coreconfig.Config.DebugMode {
fmt.Fprintf(os.Stdout, "D! log item: %s", string(content))
log.Println("D! log item:", string(content))
}
msg.Content = content
p.outputChan <- msg
Expand Down