Skip to content

Commit

Permalink
Move fluentbit and otel config generation under confgenerator.
Browse files Browse the repository at this point in the history
Rename fluentbit package to "fluentbit".
  • Loading branch information
igorpeshansky committed Aug 4, 2021
1 parent f503293 commit 5f31d91
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 48 deletions.
90 changes: 45 additions & 45 deletions confgenerator/confgenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
"strings"
"text/template"

"github.com/GoogleCloudPlatform/ops-agent/fluentbit/conf"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel"
"github.com/GoogleCloudPlatform/ops-agent/internal/version"
"github.com/GoogleCloudPlatform/ops-agent/otel"
"github.com/shirou/gopsutil/host"
)

Expand Down Expand Up @@ -137,14 +137,14 @@ func generateOtelServices(receiverNameMap map[string]string, exporterNameMap map
}

// defaultTails returns the default Tail sections for the agents' own logs.
func defaultTails(logsDir string, stateDir string, hostInfo *host.InfoStat) (tails []*conf.Tail) {
tails = []*conf.Tail{}
tailFluentbit := conf.Tail{
func defaultTails(logsDir string, stateDir string, hostInfo *host.InfoStat) (tails []*fluentbit.Tail) {
tails = []*fluentbit.Tail{}
tailFluentbit := fluentbit.Tail{
Tag: "ops-agent-fluent-bit",
DB: filepathJoin(hostInfo.OS, stateDir, "buffers", "ops-agent-fluent-bit"),
Path: filepathJoin(hostInfo.OS, logsDir, "logging-module.log"),
}
tailCollectd := conf.Tail{
tailCollectd := fluentbit.Tail{
Tag: "ops-agent-collectd",
DB: filepathJoin(hostInfo.OS, stateDir, "buffers", "ops-agent-collectd"),
Path: filepathJoin(hostInfo.OS, logsDir, "metrics-module.log"),
Expand All @@ -158,8 +158,8 @@ func defaultTails(logsDir string, stateDir string, hostInfo *host.InfoStat) (tai
}

// defaultStackdriverOutputs returns the default Stackdriver sections for the agents' own logs.
func defaultStackdriverOutputs(hostInfo *host.InfoStat) (stackdrivers []*conf.Stackdriver) {
return []*conf.Stackdriver{
func defaultStackdriverOutputs(hostInfo *host.InfoStat) (stackdrivers []*fluentbit.Stackdriver) {
return []*fluentbit.Stackdriver{
{
Match: "ops-agent-fluent-bit|ops-agent-collectd",
Workers: getWorkers(hostInfo),
Expand Down Expand Up @@ -216,14 +216,14 @@ func (uc *UnifiedConfig) GenerateFluentBitConfigs(logsDir string, stateDir strin
fbTails := defaultTails(logsDir, stateDir, hostInfo)
userAgent, _ := getUserAgent("Google-Cloud-Ops-Agent-Logging", hostInfo)
fbStackdrivers := defaultStackdriverOutputs(hostInfo)
fbSyslogs := []*conf.Syslog{}
fbWinEventlogs := []*conf.WindowsEventlog{}
fbFilterParserGroups := []conf.FilterParserGroup{}
fbFilterAddLogNames := []*conf.FilterModifyAddLogName{}
fbFilterRewriteTags := []*conf.FilterRewriteTag{}
fbFilterRemoveLogNames := []*conf.FilterModifyRemoveLogName{}
jsonParsers := []*conf.ParserJSON{}
regexParsers := []*conf.ParserRegex{}
fbSyslogs := []*fluentbit.Syslog{}
fbWinEventlogs := []*fluentbit.WindowsEventlog{}
fbFilterParserGroups := []fluentbit.FilterParserGroup{}
fbFilterAddLogNames := []*fluentbit.FilterModifyAddLogName{}
fbFilterRewriteTags := []*fluentbit.FilterRewriteTag{}
fbFilterRemoveLogNames := []*fluentbit.FilterModifyRemoveLogName{}
jsonParsers := []*fluentbit.ParserJSON{}
regexParsers := []*fluentbit.ParserRegex{}

if logging != nil && logging.Service != nil {
// Override any user-specified exporters
Expand All @@ -237,7 +237,7 @@ func (uc *UnifiedConfig) GenerateFluentBitConfigs(logsDir string, stateDir strin
p.ExporterIDs = []string{"google"}
}

extractedTails := []*conf.Tail{}
extractedTails := []*fluentbit.Tail{}
var err error
extractedTails, fbSyslogs, fbWinEventlogs, err = generateFluentBitInputs(logging.Receivers, logging.Service.Pipelines, stateDir, hostInfo)
if err != nil {
Expand All @@ -248,7 +248,7 @@ func (uc *UnifiedConfig) GenerateFluentBitConfigs(logsDir string, stateDir strin
if err != nil {
return "", "", err
}
extractedStackdrivers := []*conf.Stackdriver{}
extractedStackdrivers := []*fluentbit.Stackdriver{}
fbFilterAddLogNames, fbFilterRewriteTags, fbFilterRemoveLogNames, extractedStackdrivers, err = extractExporterPlugins(logging.Exporters, logging.Service.Pipelines, hostInfo)
if err != nil {
return "", "", err
Expand All @@ -259,11 +259,11 @@ func (uc *UnifiedConfig) GenerateFluentBitConfigs(logsDir string, stateDir strin
return "", "", err
}
}
mainConfig, err := conf.GenerateFluentBitMainConfig(fbTails, fbSyslogs, fbWinEventlogs, fbFilterParserGroups, fbFilterAddLogNames, fbFilterRewriteTags, fbFilterRemoveLogNames, fbStackdrivers, userAgent)
mainConfig, err := fluentbit.GenerateFluentBitMainConfig(fbTails, fbSyslogs, fbWinEventlogs, fbFilterParserGroups, fbFilterAddLogNames, fbFilterRewriteTags, fbFilterRemoveLogNames, fbStackdrivers, userAgent)
if err != nil {
return "", "", err
}
parserConfig, err := conf.GenerateFluentBitParserConfig(jsonParsers, regexParsers)
parserConfig, err := fluentbit.GenerateFluentBitParserConfig(jsonParsers, regexParsers)
if err != nil {
return "", "", err
}
Expand Down Expand Up @@ -468,10 +468,10 @@ func generateOtelProcessors(processors map[string]*MetricsProcessor, pipelines m
return excludeMetricsList, processorNameMap, nil
}

func generateFluentBitInputs(receivers map[string]*LoggingReceiver, pipelines map[string]*LoggingPipeline, stateDir string, hostInfo *host.InfoStat) ([]*conf.Tail, []*conf.Syslog, []*conf.WindowsEventlog, error) {
fbTails := []*conf.Tail{}
fbSyslogs := []*conf.Syslog{}
fbWinEventlogs := []*conf.WindowsEventlog{}
func generateFluentBitInputs(receivers map[string]*LoggingReceiver, pipelines map[string]*LoggingPipeline, stateDir string, hostInfo *host.InfoStat) ([]*fluentbit.Tail, []*fluentbit.Syslog, []*fluentbit.WindowsEventlog, error) {
fbTails := []*fluentbit.Tail{}
fbSyslogs := []*fluentbit.Syslog{}
fbWinEventlogs := []*fluentbit.WindowsEventlog{}
fileReceiverFactories, syslogReceiverFactories, wineventlogReceiverFactories, err := extractReceiverFactories(receivers)
if err != nil {
return nil, nil, nil, err
Expand All @@ -480,7 +480,7 @@ func generateFluentBitInputs(receivers map[string]*LoggingReceiver, pipelines ma
p := pipelines[pID]
for _, rID := range p.ReceiverIDs {
if f, ok := fileReceiverFactories[rID]; ok {
fbTail := conf.Tail{
fbTail := fluentbit.Tail{
Tag: fmt.Sprintf("%s.%s", pID, rID),
DB: filepathJoin(hostInfo.OS, stateDir, "buffers", pID+"_"+rID),
Path: strings.Join(f.IncludePaths, ","),
Expand All @@ -492,7 +492,7 @@ func generateFluentBitInputs(receivers map[string]*LoggingReceiver, pipelines ma
continue
}
if f, ok := syslogReceiverFactories[rID]; ok {
fbSyslog := conf.Syslog{
fbSyslog := fluentbit.Syslog{
Tag: fmt.Sprintf("%s.%s", pID, rID),
Listen: f.ListenHost,
Mode: f.TransportProtocol,
Expand All @@ -502,7 +502,7 @@ func generateFluentBitInputs(receivers map[string]*LoggingReceiver, pipelines ma
continue
}
if f, ok := wineventlogReceiverFactories[rID]; ok {
fbWinlog := conf.WindowsEventlog{
fbWinlog := fluentbit.WindowsEventlog{
Tag: fmt.Sprintf("%s.%s", pID, rID),
Channels: strings.Join(f.Channels, ","),
Interval_Sec: "1",
Expand All @@ -516,17 +516,17 @@ func generateFluentBitInputs(receivers map[string]*LoggingReceiver, pipelines ma
return fbTails, fbSyslogs, fbWinEventlogs, nil
}

func generateFluentBitFilters(processors map[string]*LoggingProcessor, pipelines map[string]*LoggingPipeline) ([]conf.FilterParserGroup, error) {
func generateFluentBitFilters(processors map[string]*LoggingProcessor, pipelines map[string]*LoggingPipeline) ([]fluentbit.FilterParserGroup, error) {
// Note: Keep each pipeline's filters in a separate group, because
// the order within that group is important, even though the order
// of the groups themselves does not matter.
groups := []conf.FilterParserGroup{}
groups := []fluentbit.FilterParserGroup{}
for _, pID := range sortedKeys(pipelines) {
fbFilterParsers := []*conf.FilterParser{}
fbFilterParsers := []*fluentbit.FilterParser{}
pipeline := pipelines[pID]
for _, processorID := range pipeline.ProcessorIDs {
p, ok := processors[processorID]
fbFilterParser := conf.FilterParser{
fbFilterParser := fluentbit.FilterParser{
Match: fmt.Sprintf("%s.*", pID),
Parser: processorID,
KeyName: "message",
Expand All @@ -544,56 +544,56 @@ func generateFluentBitFilters(processors map[string]*LoggingProcessor, pipelines
}

func extractExporterPlugins(exporters map[string]*LoggingExporter, pipelines map[string]*LoggingPipeline, hostInfo *host.InfoStat) (
[]*conf.FilterModifyAddLogName, []*conf.FilterRewriteTag, []*conf.FilterModifyRemoveLogName, []*conf.Stackdriver, error) {
fbFilterModifyAddLogNames := []*conf.FilterModifyAddLogName{}
fbFilterRewriteTags := []*conf.FilterRewriteTag{}
fbFilterModifyRemoveLogNames := []*conf.FilterModifyRemoveLogName{}
fbStackdrivers := []*conf.Stackdriver{}
[]*fluentbit.FilterModifyAddLogName, []*fluentbit.FilterRewriteTag, []*fluentbit.FilterModifyRemoveLogName, []*fluentbit.Stackdriver, error) {
fbFilterModifyAddLogNames := []*fluentbit.FilterModifyAddLogName{}
fbFilterRewriteTags := []*fluentbit.FilterRewriteTag{}
fbFilterModifyRemoveLogNames := []*fluentbit.FilterModifyRemoveLogName{}
fbStackdrivers := []*fluentbit.Stackdriver{}
stackdriverExporters := make(map[string][]string)
for _, pID := range sortedKeys(pipelines) {
pipeline := pipelines[pID]
for _, exporterID := range pipeline.ExporterIDs {
// for each receiver, generate a output plugin with the specified receiver id
for _, rID := range pipeline.ReceiverIDs {
fbFilterModifyAddLogNames = append(fbFilterModifyAddLogNames, &conf.FilterModifyAddLogName{
fbFilterModifyAddLogNames = append(fbFilterModifyAddLogNames, &fluentbit.FilterModifyAddLogName{
Match: fmt.Sprintf("%s.%s", pID, rID),
LogName: rID,
})
// generate single rewriteTag for this pipeline
fbFilterRewriteTags = append(fbFilterRewriteTags, &conf.FilterRewriteTag{
fbFilterRewriteTags = append(fbFilterRewriteTags, &fluentbit.FilterRewriteTag{
Match: fmt.Sprintf("%s.%s", pID, rID),
})
fbFilterModifyRemoveLogNames = append(fbFilterModifyRemoveLogNames, &conf.FilterModifyRemoveLogName{
fbFilterModifyRemoveLogNames = append(fbFilterModifyRemoveLogNames, &fluentbit.FilterModifyRemoveLogName{
Match: rID,
})
stackdriverExporters[exporterID] = append(stackdriverExporters[exporterID], rID)
}
}
}
for _, tags := range stackdriverExporters {
fbStackdrivers = append(fbStackdrivers, &conf.Stackdriver{
fbStackdrivers = append(fbStackdrivers, &fluentbit.Stackdriver{
Match: strings.Join(tags, "|"),
Workers: getWorkers(hostInfo),
})
}
return fbFilterModifyAddLogNames, fbFilterRewriteTags, fbFilterModifyRemoveLogNames, fbStackdrivers, nil
}

func extractFluentBitParsers(processors map[string]*LoggingProcessor) ([]*conf.ParserJSON, []*conf.ParserRegex, error) {
fbJSONParsers := []*conf.ParserJSON{}
fbRegexParsers := []*conf.ParserRegex{}
func extractFluentBitParsers(processors map[string]*LoggingProcessor) ([]*fluentbit.ParserJSON, []*fluentbit.ParserRegex, error) {
fbJSONParsers := []*fluentbit.ParserJSON{}
fbRegexParsers := []*fluentbit.ParserRegex{}
for _, name := range sortedKeys(processors) {
p := processors[name]
switch t := p.Type; t {
case "parse_json":
fbJSONParser := conf.ParserJSON{
fbJSONParser := fluentbit.ParserJSON{
Name: name,
TimeKey: p.TimeKey,
TimeFormat: p.TimeFormat,
}
fbJSONParsers = append(fbJSONParsers, &fbJSONParser)
case "parse_regex":
fbRegexParser := conf.ParserRegex{
fbRegexParser := fluentbit.ParserRegex{
Name: name,
Regex: p.Regex,
TimeKey: p.TimeKey,
Expand Down
4 changes: 2 additions & 2 deletions fluentbit/conf/conf.go → confgenerator/fluentbit/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package conf provides data structures to represent and generate fluentBit configuration.
package conf
// Package fluentbit provides data structures to represent and generate fluentBit configuration.
package fluentbit

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package conf
package fluentbit

import (
"testing"
Expand Down
File renamed without changes.
File renamed without changes.

0 comments on commit 5f31d91

Please sign in to comment.