diff --git a/clients/pkg/promtail/scrapeconfig/scrapeconfig.go b/clients/pkg/promtail/scrapeconfig/scrapeconfig.go index 5491a9f56f2c4..aa1b6cb004ce3 100644 --- a/clients/pkg/promtail/scrapeconfig/scrapeconfig.go +++ b/clients/pkg/promtail/scrapeconfig/scrapeconfig.go @@ -348,7 +348,7 @@ type GcplogTargetConfig struct { // ProjectID is the Cloud project id ProjectID string `yaml:"project_id"` - // Subscription is the scription name we use to pull logs from a pubsub topic. + // Subscription is the subscription name we use to pull logs from a pubsub topic. Subscription string `yaml:"subscription"` // Labels are the additional labels to be added to log entry while pushing it to Loki server. @@ -358,6 +358,13 @@ type GcplogTargetConfig struct { // current timestamp at the time of processing. // Its default value(`false`) denotes, replace it with current timestamp at the time of processing. UseIncomingTimestamp bool `yaml:"use_incoming_timestamp"` + + // SubscriptionType decides if the target works with a `pull` or `push` subscription type. + // Defaults to `pull` for backwards compatibility reasons. + SubscriptionType string `yaml:"subscription_type"` + + // Server is the weaveworks server config for listening connections. Used just for `push` subscription type. + Server server.Config `yaml:"server"` } // HerokuDrainTargetConfig describes a scrape config to listen and consume heroku logs, in the HTTPS drain manner. diff --git a/clients/pkg/promtail/targets/gcplog/metrics.go b/clients/pkg/promtail/targets/gcplog/metrics.go index 9eb51a7855a12..9a99f566afb27 100644 --- a/clients/pkg/promtail/targets/gcplog/metrics.go +++ b/clients/pkg/promtail/targets/gcplog/metrics.go @@ -2,7 +2,7 @@ package gcplog import "github.com/prometheus/client_golang/prometheus" -// Metrics stores gcplog entry mertrics. +// Metrics stores gcplog entry metrics. type Metrics struct { // reg is the Registerer used to create this set of metrics. reg prometheus.Registerer @@ -10,6 +10,9 @@ type Metrics struct { gcplogEntries *prometheus.CounterVec gcplogErrors *prometheus.CounterVec gcplogTargetLastSuccessScrape *prometheus.GaugeVec + + gcpPushEntries *prometheus.CounterVec + gcpPushErrors *prometheus.CounterVec } // NewMetrics creates a new set of metrics. Metrics will be registered to reg. @@ -17,6 +20,7 @@ func NewMetrics(reg prometheus.Registerer) *Metrics { var m Metrics m.reg = reg + // Pull subscription metrics m.gcplogEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "gcplog_target_entries_total", @@ -35,6 +39,25 @@ func NewMetrics(reg prometheus.Registerer) *Metrics { Help: "Timestamp of the specific target's last successful poll", }, []string{"project", "target"}) - reg.MustRegister(m.gcplogEntries, m.gcplogErrors) + // Push subscription metrics + m.gcpPushEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "promtail", + Name: "gcp_push_target_entries_total", + Help: "Number of successful entries received by the GCP Push target", + }, []string{}) + + m.gcpPushErrors = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "promtail", + Name: "gcp_push_target_parsing_errors_total", + Help: "Number of parsing errors while receiving GCP Push messages", + }, []string{}) + + reg.MustRegister( + m.gcplogEntries, + m.gcplogErrors, + m.gcplogTargetLastSuccessScrape, + m.gcpPushEntries, + m.gcpPushErrors, + ) return &m } diff --git a/clients/pkg/promtail/targets/gcplog/pull_target.go b/clients/pkg/promtail/targets/gcplog/pull_target.go new file mode 100644 index 0000000000000..c6839525723c8 --- /dev/null +++ b/clients/pkg/promtail/targets/gcplog/pull_target.go @@ -0,0 +1,149 @@ +package gcplog + +import ( + "context" + "sync" + + "cloud.google.com/go/pubsub" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/relabel" + + "github.com/grafana/loki/clients/pkg/promtail/api" + "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" + "github.com/grafana/loki/clients/pkg/promtail/targets/target" +) + +// pullTarget represents the target specific to GCP project, with a pull subscription type. +// It collects logs from GCP and push it to Loki. +// nolint:revive +type pullTarget struct { + metrics *Metrics + logger log.Logger + handler api.EntryHandler + config *scrapeconfig.GcplogTargetConfig + relabelConfig []*relabel.Config + jobName string + + // lifecycle management + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + + // pubsub + ps *pubsub.Client + msgs chan *pubsub.Message +} + +// newPullTarget returns the new instance of pullTarget for +// the given `project-id`. It scraps logs from the GCP project +// and push it Loki via given `api.EntryHandler.` +// It starts the `run` loop to consume log entries that can be +// stopped via `target.Stop()` +// nolint:revive,govet +func newPullTarget( + metrics *Metrics, + logger log.Logger, + handler api.EntryHandler, + relabel []*relabel.Config, + jobName string, + config *scrapeconfig.GcplogTargetConfig, +) (*pullTarget, error) { + ctx, cancel := context.WithCancel(context.Background()) + + ps, err := pubsub.NewClient(ctx, config.ProjectID) + if err != nil { + return nil, err + } + + target := &pullTarget{ + metrics: metrics, + logger: logger, + handler: handler, + relabelConfig: relabel, + config: config, + jobName: jobName, + ctx: ctx, + cancel: cancel, + ps: ps, + msgs: make(chan *pubsub.Message), + } + + go func() { + _ = target.run() + }() + + return target, nil +} + +func (t *pullTarget) run() error { + t.wg.Add(1) + defer t.wg.Done() + + send := t.handler.Chan() + + sub := t.ps.SubscriptionInProject(t.config.Subscription, t.config.ProjectID) + go func() { + // NOTE(kavi): `cancel` the context as exiting from this goroutine should stop main `run` loop + // It makesense as no more messages will be received. + defer t.cancel() + + err := sub.Receive(t.ctx, func(ctx context.Context, m *pubsub.Message) { + t.msgs <- m + }) + if err != nil { + level.Error(t.logger).Log("msg", "failed to receive pubsub messages", "error", err) + t.metrics.gcplogErrors.WithLabelValues(t.config.ProjectID).Inc() + t.metrics.gcplogTargetLastSuccessScrape.WithLabelValues(t.config.ProjectID, t.config.Subscription).SetToCurrentTime() + } + }() + + for { + select { + case <-t.ctx.Done(): + return t.ctx.Err() + case m := <-t.msgs: + entry, err := format(m, t.config.Labels, t.config.UseIncomingTimestamp, t.relabelConfig) + if err != nil { + level.Error(t.logger).Log("event", "error formating log entry", "cause", err) + m.Ack() + break + } + send <- entry + m.Ack() // Ack only after log is sent. + t.metrics.gcplogEntries.WithLabelValues(t.config.ProjectID).Inc() + } + } +} + +func (t *pullTarget) Type() target.TargetType { + return target.GcplogTargetType +} + +func (t *pullTarget) Ready() bool { + // Return true just like all other targets. + // Rationale is gcplog scraping shouldn't stop because of some transient timeout errors. + // This transient failure can cause promtail readyness probe to fail which may prevent pod from starting. + // We have metrics now to track if scraping failed (`gcplog_target_last_success_scrape`). + return true +} + +func (t *pullTarget) DiscoveredLabels() model.LabelSet { + return nil +} + +func (t *pullTarget) Labels() model.LabelSet { + return t.config.Labels +} + +func (t *pullTarget) Details() interface{} { + return nil +} + +func (t *pullTarget) Stop() error { + t.cancel() + t.wg.Wait() + t.handler.Stop() + return nil +} diff --git a/clients/pkg/promtail/targets/gcplog/pull_target_test.go b/clients/pkg/promtail/targets/gcplog/pull_target_test.go new file mode 100644 index 0000000000000..6c48893155728 --- /dev/null +++ b/clients/pkg/promtail/targets/gcplog/pull_target_test.go @@ -0,0 +1,239 @@ +package gcplog + +import ( + "context" + "sync" + "testing" + "time" + + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/pstest" + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/grafana/loki/clients/pkg/promtail/api" + "github.com/grafana/loki/clients/pkg/promtail/client/fake" + "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" + "github.com/grafana/loki/clients/pkg/promtail/targets/target" +) + +func TestPullTarget_Run(t *testing.T) { + // Goal: Check message written to pubsub topic is received by the target. + tt, apiclient, pubsubClient, teardown := testPullTarget(t) + defer teardown() + + // seed pubsub + ctx := context.Background() + tp, err := pubsubClient.CreateTopic(ctx, topic) + require.NoError(t, err) + _, err = pubsubClient.CreateSubscription(ctx, subscription, pubsub.SubscriptionConfig{ + Topic: tp, + }) + require.NoError(t, err) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + _ = tt.run() + }() + + publishMessage(t, tp) + + // Wait till message is received by the run loop. + // NOTE(kavi): sleep is not ideal. but not other way to confirm if api.Handler received messages + time.Sleep(1 * time.Second) + + err = tt.Stop() + require.NoError(t, err) + + // wait till `run` stops. + wg.Wait() + + assert.Equal(t, 1, len(apiclient.Received())) +} + +func TestPullTarget_Stop(t *testing.T) { + // Goal: To test that `run()` stops when you invoke `target.Stop()` + + errs := make(chan error, 1) + + tt, _, _, teardown := testPullTarget(t) + defer teardown() + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + errs <- tt.run() + }() + + // invoke stop + _ = tt.Stop() + + // wait till run returns + wg.Wait() + + // wouldn't block as 1 error is buffered into the channel. + err := <-errs + + // returned error should be cancelled context error + assert.Equal(t, tt.ctx.Err(), err) +} + +func TestPullTarget_Type(t *testing.T) { + tt, _, _, teardown := testPullTarget(t) + defer teardown() + + assert.Equal(t, target.TargetType("Gcplog"), tt.Type()) +} + +func TestPullTarget_Ready(t *testing.T) { + tt, _, _, teardown := testPullTarget(t) + defer teardown() + + assert.Equal(t, true, tt.Ready()) +} + +func TestPullTarget_Labels(t *testing.T) { + tt, _, _, teardown := testPullTarget(t) + defer teardown() + + assert.Equal(t, model.LabelSet{"job": "test-gcplogtarget"}, tt.Labels()) +} + +func testPullTarget(t *testing.T) (*pullTarget, *fake.Client, *pubsub.Client, func()) { + t.Helper() + + ctx, cancel := context.WithCancel(context.Background()) + + mockSvr := pstest.NewServer() + conn, err := grpc.Dial(mockSvr.Addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + + mockpubsubClient, err := pubsub.NewClient(ctx, testConfig.ProjectID, option.WithGRPCConn(conn)) + require.NoError(t, err) + + fakeClient := fake.New(func() {}) + + var handler api.EntryHandler = fakeClient + target := &pullTarget{ + metrics: NewMetrics(prometheus.NewRegistry()), + logger: log.NewNopLogger(), + handler: handler, + relabelConfig: nil, + config: testConfig, + jobName: "job-test-gcplogtarget", + ctx: ctx, + cancel: cancel, + ps: mockpubsubClient, + msgs: make(chan *pubsub.Message), + } + + // cleanup + return target, fakeClient, mockpubsubClient, func() { + cancel() + conn.Close() + mockSvr.Close() + } +} + +func publishMessage(t *testing.T, topic *pubsub.Topic) { + t.Helper() + + ctx := context.Background() + res := topic.Publish(ctx, &pubsub.Message{Data: []byte(gcpLogEntry)}) + _, err := res.Get(ctx) // wait till message is actully published + require.NoError(t, err) +} + +const ( + project = "test-project" + topic = "test-topic" + subscription = "test-subscription" + + gcpLogEntry = ` +{ + "insertId": "ajv4d1f1ch8dr", + "logName": "projects/grafanalabs-dev/logs/cloudaudit.googleapis.com%2Fdata_access", + "protoPayload": { + "@type": "type.googleapis.com/google.cloud.audit.AuditLog", + "authenticationInfo": { + "principalEmail": "1040409107725-compute@developer.gserviceaccount.com", + "serviceAccountDelegationInfo": [ + { + "firstPartyPrincipal": { + "principalEmail": "service-1040409107725@compute-system.iam.gserviceaccount.com" + } + } + ] + }, + "authorizationInfo": [ + { + "granted": true, + "permission": "storage.objects.list", + "resource": "projects/_/buckets/dev-us-central1-cortex-tsdb-dev", + "resourceAttributes": { + } + }, + { + "permission": "storage.objects.get", + "resource": "projects/_/buckets/dev-us-central1-cortex-tsdb-dev/objects/load-generator-20/01EM34PFBC2SCV3ETBGRAQZ090/deletion-mark.json", + "resourceAttributes": { + } + } + ], + "methodName": "storage.objects.get", + "requestMetadata": { + "callerIp": "34.66.19.193", + "callerNetwork": "//compute.googleapis.com/projects/grafanalabs-dev/global/networks/__unknown__", + "callerSuppliedUserAgent": "thanos-store-gateway/1.5.0 (go1.14.9),gzip(gfe)", + "destinationAttributes": { + }, + "requestAttributes": { + "auth": { + }, + "time": "2021-01-01T02:17:10.661405637Z" + } + }, + "resourceLocation": { + "currentLocations": [ + "us-central1" + ] + }, + "resourceName": "projects/_/buckets/dev-us-central1-cortex-tsdb-dev/objects/load-generator-20/01EM34PFBC2SCV3ETBGRAQZ090/deletion-mark.json", + "serviceName": "storage.googleapis.com", + "status": { + } + }, + "receiveTimestamp": "2021-01-01T02:17:10.82013623Z", + "resource": { + "labels": { + "bucket_name": "dev-us-central1-cortex-tsdb-dev", + "location": "us-central1", + "project_id": "grafanalabs-dev" + }, + "type": "gcs_bucket" + }, + "severity": "INFO", + "timestamp": "2021-01-01T02:17:10.655982344Z" +} +` +) + +var testConfig = &scrapeconfig.GcplogTargetConfig{ + ProjectID: project, + Subscription: subscription, + Labels: model.LabelSet{ + "job": "test-gcplogtarget", + }, + SubscriptionType: "pull", +} diff --git a/clients/pkg/promtail/targets/gcplog/push_target.go b/clients/pkg/promtail/targets/gcplog/push_target.go new file mode 100644 index 0000000000000..cd6e51951c15e --- /dev/null +++ b/clients/pkg/promtail/targets/gcplog/push_target.go @@ -0,0 +1,158 @@ +package gcplog + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/relabel" + "github.com/weaveworks/common/logging" + "github.com/weaveworks/common/server" + + "github.com/grafana/loki/clients/pkg/promtail/api" + "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" + "github.com/grafana/loki/clients/pkg/promtail/targets/serverutils" + "github.com/grafana/loki/clients/pkg/promtail/targets/target" + + util_log "github.com/grafana/loki/pkg/util/log" +) + +type pushTarget struct { + logger log.Logger + handler api.EntryHandler + config *scrapeconfig.GcplogTargetConfig + jobName string + server *server.Server + metrics *Metrics + relabelConfigs []*relabel.Config +} + +// newPushTarget creates a brand new GCP Push target, capable of receiving message from a GCP PubSub push subscription. +func newPushTarget(metrics *Metrics, logger log.Logger, handler api.EntryHandler, jobName string, config *scrapeconfig.GcplogTargetConfig, relabel []*relabel.Config) (*pushTarget, error) { + wrappedLogger := log.With(logger, "component", "gcp_push") + + ht := &pushTarget{ + metrics: metrics, + logger: wrappedLogger, + handler: handler, + jobName: jobName, + config: config, + relabelConfigs: relabel, + } + + mergedServerConfigs, err := serverutils.MergeWithDefaults(config.Server) + if err != nil { + return nil, fmt.Errorf("failed to parse configs and override defaults when configuring gcp push target: %w", err) + } + config.Server = mergedServerConfigs + + err = ht.run() + if err != nil { + return nil, err + } + + return ht, nil +} + +func (h *pushTarget) run() error { + level.Info(h.logger).Log("msg", "starting gcp push target", "job", h.jobName) + + // To prevent metric collisions because all metrics are going to be registered in the global Prometheus registry. + + tentativeServerMetricNamespace := "promtail_gcp_push_target_" + h.jobName + if !model.IsValidMetricName(model.LabelValue(tentativeServerMetricNamespace)) { + return fmt.Errorf("invalid prometheus-compatible job name: %s", h.jobName) + } + h.config.Server.MetricsNamespace = tentativeServerMetricNamespace + + // We don't want the /debug and /metrics endpoints running, since this is not the main promtail HTTP server. + // We want this target to expose the least surface area possible, hence disabling WeaveWorks HTTP server metrics + // and debugging functionality. + h.config.Server.RegisterInstrumentation = false + + // Wrapping util logger with component-specific key vals, and the expected GoKit logging interface + h.config.Server.Log = logging.GoKit(log.With(util_log.Logger, "component", "gcp_push")) + + srv, err := server.New(h.config.Server) + if err != nil { + return err + } + + h.server = srv + h.server.HTTP.Path("/gcp/api/v1/push").Methods("POST").Handler(http.HandlerFunc(h.push)) + + go func() { + err := srv.Run() + if err != nil { + level.Error(h.logger).Log("msg", "gcp push target shutdown with error", "err", err) + } + }() + + return nil +} + +func (h *pushTarget) push(w http.ResponseWriter, r *http.Request) { + entries := h.handler.Chan() + defer r.Body.Close() + + pushMessage := PushMessage{} + bs, err := io.ReadAll(r.Body) + if err != nil { + h.metrics.gcpPushErrors.WithLabelValues().Inc() + level.Warn(h.logger).Log("msg", "failed to read incoming gcp push request", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + err = json.Unmarshal(bs, &pushMessage) + if err != nil { + h.metrics.gcpPushErrors.WithLabelValues().Inc() + level.Warn(h.logger).Log("msg", "failed to unmarshall gcp push request", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + entry, err := translate(pushMessage, h.config.Labels, h.config.UseIncomingTimestamp, h.relabelConfigs, r.Header.Get("X-Scope-OrgID")) + if err != nil { + h.metrics.gcpPushErrors.WithLabelValues().Inc() + level.Warn(h.logger).Log("msg", "failed to translate gcp push request", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + level.Debug(h.logger).Log("msg", fmt.Sprintf("Received line: %s", entry.Line)) + + entries <- entry + h.metrics.gcpPushEntries.WithLabelValues().Inc() + w.WriteHeader(http.StatusNoContent) +} + +func (h *pushTarget) Type() target.TargetType { + return target.GcplogTargetType +} + +func (h *pushTarget) DiscoveredLabels() model.LabelSet { + return nil +} + +func (h *pushTarget) Labels() model.LabelSet { + return h.config.Labels +} + +func (h *pushTarget) Ready() bool { + return true +} + +func (h *pushTarget) Details() interface{} { + return map[string]string{} +} + +func (h *pushTarget) Stop() error { + level.Info(h.logger).Log("msg", "stopping gcp push target", "job", h.jobName) + h.server.Shutdown() + h.handler.Stop() + return nil +} diff --git a/clients/pkg/promtail/targets/gcplog/push_target_test.go b/clients/pkg/promtail/targets/gcplog/push_target_test.go new file mode 100644 index 0000000000000..0d5d7570a6ca4 --- /dev/null +++ b/clients/pkg/promtail/targets/gcplog/push_target_test.go @@ -0,0 +1,304 @@ +package gcplog_test + +import ( + "flag" + "fmt" + "net" + "net/http" + "os" + "strings" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/relabel" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/server" + + lokiClient "github.com/grafana/loki/clients/pkg/promtail/client" + "github.com/grafana/loki/clients/pkg/promtail/client/fake" + "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" + "github.com/grafana/loki/clients/pkg/promtail/targets/gcplog" +) + +const localhost = "127.0.0.1" + +const expectedMessageData = `{ + "severity": "DEBUG", + "textPayload": "Function execution took 1198 ms. Finished with status code: 200", + "trace": "projects/wired-height/traces/54aa8a20875253c6b47caa8bd28ad652" +}` +const testPayload = ` +{ + "message": { + "attributes": { + "logging.googleapis.com/timestamp": "2022-07-25T22:19:09.903683708Z" + }, + "data": "ewogICJzZXZlcml0eSI6ICJERUJVRyIsCiAgInRleHRQYXlsb2FkIjogIkZ1bmN0aW9uIGV4ZWN1dGlvbiB0b29rIDExOTggbXMuIEZpbmlzaGVkIHdpdGggc3RhdHVzIGNvZGU6IDIwMCIsCiAgInRyYWNlIjogInByb2plY3RzL3dpcmVkLWhlaWdodC90cmFjZXMvNTRhYThhMjA4NzUyNTNjNmI0N2NhYThiZDI4YWQ2NTIiCn0=", + "messageId": "5187581549398349", + "message_id": "5187581549398349", + "publishTime": "2022-07-25T22:19:15.56Z", + "publish_time": "2022-07-25T22:19:15.56Z" + }, + "subscription": "projects/wired-height-350515/subscriptions/test" +}` + +func makeGCPPushRequest(host string, body string) (*http.Request, error) { + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/gcp/api/v1/push", host), strings.NewReader(body)) + if err != nil { + return nil, err + } + return req, nil +} + +func TestPushTarget(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + + type expectedEntry struct { + labels model.LabelSet + line string + } + type args struct { + RequestBody string + RelabelConfigs []*relabel.Config + Labels model.LabelSet + } + + cases := map[string]struct { + args args + expectedEntries []expectedEntry + }{ + "simplified cloud functions log line": { + args: args{ + RequestBody: testPayload, + Labels: model.LabelSet{ + "job": "some_job_name", + }, + }, + expectedEntries: []expectedEntry{ + { + labels: model.LabelSet{ + "job": "some_job_name", + }, + line: expectedMessageData, + }, + }, + }, + "simplified cloud functions log line, with relabeling custom attribute and message id": { + args: args{ + RequestBody: testPayload, + Labels: model.LabelSet{ + "job": "some_job_name", + }, + RelabelConfigs: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"__gcp_attributes_logging_googleapis_com_timestamp"}, + Regex: relabel.MustNewRegexp("(.*)"), + Replacement: "$1", + TargetLabel: "google_timestamp", + Action: relabel.Replace, + }, + { + SourceLabels: model.LabelNames{"__gcp_message_id"}, + Regex: relabel.MustNewRegexp("(.*)"), + Replacement: "$1", + TargetLabel: "message_id", + Action: relabel.Replace, + }, + }, + }, + expectedEntries: []expectedEntry{ + { + labels: model.LabelSet{ + "job": "some_job_name", + "google_timestamp": "2022-07-25T22:19:09.903683708Z", + "message_id": "5187581549398349", + }, + line: expectedMessageData, + }, + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + // Create fake promtail client + eh := fake.New(func() {}) + defer eh.Stop() + + serverConfig, port, err := getServerConfigWithAvailablePort() + require.NoError(t, err, "error generating server config or finding open port") + config := &scrapeconfig.GcplogTargetConfig{ + Server: serverConfig, + Labels: tc.args.Labels, + UseIncomingTimestamp: false, + SubscriptionType: "push", + } + + prometheus.DefaultRegisterer = prometheus.NewRegistry() + metrics := gcplog.NewMetrics(prometheus.DefaultRegisterer) + pt, err := gcplog.NewGCPLogTarget(metrics, logger, eh, tc.args.RelabelConfigs, "test_job", config) + require.NoError(t, err) + defer func() { + _ = pt.Stop() + }() + + // Clear received lines after test case is ran + defer eh.Clear() + + // Send some logs + ts := time.Now() + + req, err := makeGCPPushRequest(fmt.Sprintf("http://%s:%d", localhost, port), tc.args.RequestBody) + require.NoError(t, err, "expected test drain request to be successfully created") + res, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusNoContent, res.StatusCode, "expected no-content status code") + + waitForMessages(eh) + + // Make sure we didn't timeout + require.Equal(t, 1, len(eh.Received())) + + require.Equal(t, len(eh.Received()), len(tc.expectedEntries), "expected to receive equal amount of expected label sets") + for i, expectedEntry := range tc.expectedEntries { + // TODO: Add assertion over propagated timestamp + actualEntry := eh.Received()[i] + + require.Equal(t, expectedEntry.line, actualEntry.Line, "expected line to be equal for %d-th entry", i) + + expectedLS := expectedEntry.labels + actualLS := actualEntry.Labels + for label, value := range expectedLS { + require.Equal(t, expectedLS[label], actualLS[label], "expected label %s to be equal to %s in %d-th entry", label, value, i) + } + + // Timestamp is always set in the handler, we expect received timestamps to be slightly higher than the timestamp when we started sending logs. + require.GreaterOrEqual(t, actualEntry.Timestamp.Unix(), ts.Unix(), "expected %d-th entry to have a received timestamp greater than publish time", i) + } + }) + } +} + +func TestPushTarget_UseIncomingTimestamp(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + + // Create fake promtail client + eh := fake.New(func() {}) + defer eh.Stop() + + serverConfig, port, err := getServerConfigWithAvailablePort() + require.NoError(t, err, "error generating server config or finding open port") + config := &scrapeconfig.GcplogTargetConfig{ + Server: serverConfig, + Labels: nil, + UseIncomingTimestamp: true, + SubscriptionType: "push", + } + + prometheus.DefaultRegisterer = prometheus.NewRegistry() + metrics := gcplog.NewMetrics(prometheus.DefaultRegisterer) + pt, err := gcplog.NewGCPLogTarget(metrics, logger, eh, nil, "test_job", config) + require.NoError(t, err) + defer func() { + _ = pt.Stop() + }() + + // Clear received lines after test case is ran + defer eh.Clear() + + req, err := makeGCPPushRequest(fmt.Sprintf("http://%s:%d", localhost, port), testPayload) + require.NoError(t, err, "expected test drain request to be successfully created") + res, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusNoContent, res.StatusCode, "expected no-content status code") + + waitForMessages(eh) + + // Make sure we didn't timeout + require.Equal(t, 1, len(eh.Received())) + + expectedTs, err := time.Parse(time.RFC3339Nano, "2022-07-25T22:19:15.56Z") + require.NoError(t, err, "expected expected timestamp to be parse correctly") + require.Equal(t, expectedTs, eh.Received()[0].Timestamp, "expected entry timestamp to be overridden by received one") +} + +func TestPushTarget_UseTenantIDHeaderIfPresent(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + + // Create fake promtail client + eh := fake.New(func() {}) + defer eh.Stop() + + serverConfig, port, err := getServerConfigWithAvailablePort() + require.NoError(t, err, "error generating server config or finding open port") + config := &scrapeconfig.GcplogTargetConfig{ + Server: serverConfig, + Labels: nil, + UseIncomingTimestamp: true, + SubscriptionType: "push", + } + + prometheus.DefaultRegisterer = prometheus.NewRegistry() + metrics := gcplog.NewMetrics(prometheus.DefaultRegisterer) + pt, err := gcplog.NewGCPLogTarget(metrics, logger, eh, nil, "test_job", config) + require.NoError(t, err) + defer func() { + _ = pt.Stop() + }() + + // Clear received lines after test case is ran + defer eh.Clear() + + req, err := makeGCPPushRequest(fmt.Sprintf("http://%s:%d", localhost, port), testPayload) + require.NoError(t, err, "expected test drain request to be successfully created") + req.Header.Set("X-Scope-OrgID", "42") + res, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusNoContent, res.StatusCode, "expected no-content status code") + + waitForMessages(eh) + + // Make sure we didn't timeout + require.Equal(t, 1, len(eh.Received())) + + require.Equal(t, model.LabelValue("42"), eh.Received()[0].Labels[lokiClient.ReservedLabelTenantID]) +} + +func waitForMessages(eh *fake.Client) { + countdown := 1000 + for len(eh.Received()) != 1 && countdown > 0 { + time.Sleep(1 * time.Millisecond) + countdown-- + } +} + +func getServerConfigWithAvailablePort() (cfg server.Config, port int, err error) { + // Get a randomly available port by open and closing a TCP socket + addr, err := net.ResolveTCPAddr("tcp", localhost+":0") + if err != nil { + return + } + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return + } + port = l.Addr().(*net.TCPAddr).Port + err = l.Close() + if err != nil { + return + } + + // Adjust some of the defaults + cfg.RegisterFlags(flag.NewFlagSet("empty", flag.ContinueOnError)) + cfg.HTTPListenAddress = localhost + cfg.HTTPListenPort = port + cfg.GRPCListenAddress = localhost + cfg.GRPCListenPort = 0 // Not testing GRPC, a random port will be assigned + + return +} diff --git a/clients/pkg/promtail/targets/gcplog/push_translation.go b/clients/pkg/promtail/targets/gcplog/push_translation.go new file mode 100644 index 0000000000000..ff29f6ce4cd22 --- /dev/null +++ b/clients/pkg/promtail/targets/gcplog/push_translation.go @@ -0,0 +1,117 @@ +package gcplog + +import ( + "encoding/base64" + "fmt" + "regexp" + "strings" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" + + "github.com/grafana/loki/clients/pkg/promtail/api" + lokiClient "github.com/grafana/loki/clients/pkg/promtail/client" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/util" +) + +// Configured as a global in this file to avoid recompiling this regex everywhere. +var labelToLokiCompatible *regexp.Regexp + +func init() { + // TODO: Maybe use a regexp negative filter and grab everything non-alphanumeric non-underscore? + labelToLokiCompatible = regexp.MustCompile("[.-/]") +} + +// PushMessage is the POST body format sent by GCP PubSub push subscriptions. +type PushMessage struct { + Message struct { + Attributes map[string]string `json:"attributes"` + Data string `json:"data"` + ID string `json:"message_id"` + PublishTimestamp string `json:"publish_time"` + } `json:"message"` + Subscription string `json:"subscription"` +} + +// translate converts a GCP PushMessage into a loki api.Entry. It is responsible for decoding the log line (contained in the Message.Data) +// attribute, using the incoming timestamp if necessary, and formatting and passing down the incoming Message.Attributes +// if relabel is configured. +func translate(m PushMessage, other model.LabelSet, useIncomingTimestamp bool, relabelConfigs []*relabel.Config, xScopeOrgID string) (api.Entry, error) { + // mandatory label for gcplog + lbs := labels.NewBuilder(nil) + lbs.Set("__gcp_message_id", m.Message.ID) + + // labels from gcp log entry. Add it as internal labels + for k, v := range m.Message.Attributes { + lbs.Set(fmt.Sprintf("__gcp_attributes_%s", convertToLokiCompatibleLabel(k)), v) + } + + var processed labels.Labels + + // apply relabeling + if len(relabelConfigs) > 0 { + processed = relabel.Process(lbs.Labels(), relabelConfigs...) + } else { + processed = lbs.Labels() + } + + // final labelset that will be sent to loki + labels := make(model.LabelSet) + for _, lbl := range processed { + // ignore internal labels + if strings.HasPrefix(lbl.Name, "__") { + continue + } + // ignore invalid labels + if !model.LabelName(lbl.Name).IsValid() || !model.LabelValue(lbl.Value).IsValid() { + continue + } + labels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) + } + + // add labels coming from scrapeconfig + labels = labels.Merge(other) + + ts := time.Now() + + decodedData, err := base64.StdEncoding.DecodeString(m.Message.Data) + if err != nil { + return api.Entry{}, fmt.Errorf("failed to decode data: %w", err) + } + line := string(decodedData) + + if useIncomingTimestamp { + var err error + ts, err = time.Parse(time.RFC3339, m.Message.PublishTimestamp) + if err != nil { + return api.Entry{}, fmt.Errorf("invalid publish timestamp format: %w", err) + } + } + + // If the incoming request carries the tenant id, inject it as the reserved label, so it's used by the + // remote write client. + if xScopeOrgID != "" { + labels[lokiClient.ReservedLabelTenantID] = model.LabelValue(xScopeOrgID) + } + + return api.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: ts, + Line: line, + }, + }, nil +} + +// convertToLokiCompatibleLabel converts an incoming GCP Push message label to a loki compatible format. There are lables +// such as `logging.googleapis.com/timestamp`, which contain non-loki-compatible characters, which is just alphanumeric +// and _. The approach taken is to translate every non-alphanumeric separator character to an underscore. +func convertToLokiCompatibleLabel(label string) string { + // TODO: Since this is running for every incoming message, maybe it's more performant to do something + // like a loop over characters, and checking if it's not loki compatible, instead of a regexp. + return util.SnakeCase(labelToLokiCompatible.ReplaceAllString(label, "_")) +} diff --git a/clients/pkg/promtail/targets/gcplog/push_translation_test.go b/clients/pkg/promtail/targets/gcplog/push_translation_test.go new file mode 100644 index 0000000000000..4111acba93422 --- /dev/null +++ b/clients/pkg/promtail/targets/gcplog/push_translation_test.go @@ -0,0 +1,31 @@ +package gcplog + +import ( + "testing" +) + +func TestConvertToLokiCompatibleLabel(t *testing.T) { + type args struct { + label string + } + tests := []struct { + name string + args args + want string + }{ + { + name: "Google timestamp label attribute name", + args: args{ + label: "logging.googleapis.com/timestamp", + }, + want: "logging_googleapis_com_timestamp", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := convertToLokiCompatibleLabel(tt.args.label); got != tt.want { + t.Errorf("convertToLokiCompatibleLabel() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/clients/pkg/promtail/targets/gcplog/target.go b/clients/pkg/promtail/targets/gcplog/target.go index b515696413dba..0e4cbe3a097e2 100644 --- a/clients/pkg/promtail/targets/gcplog/target.go +++ b/clients/pkg/promtail/targets/gcplog/target.go @@ -1,13 +1,9 @@ package gcplog import ( - "context" - "sync" + "fmt" - "cloud.google.com/go/pubsub" "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/relabel" "github.com/grafana/loki/clients/pkg/promtail/api" @@ -15,151 +11,28 @@ import ( "github.com/grafana/loki/clients/pkg/promtail/targets/target" ) -// GcplogTarget represents the target specific to GCP project. -// It collects logs from GCP and push it to Loki. -// nolint:revive -type GcplogTarget struct { - metrics *Metrics - logger log.Logger - handler api.EntryHandler - config *scrapeconfig.GcplogTargetConfig - relabelConfig []*relabel.Config - jobName string - - // lifecycle management - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - - // pubsub - ps *pubsub.Client - msgs chan *pubsub.Message +// Target is a common interface implemented by both GCPLog targets. +type Target interface { + target.Target + Stop() error } -// NewGcplogTarget returns the new instannce of GcplogTarget for -// the given `project-id`. It scraps logs from the GCP project -// and push it Loki via given `api.EntryHandler.` -// It starts the `run` loop to consume log entries that can be -// stopped via `target.Stop()` -// nolint:revive,govet -func NewGcplogTarget( +// NewGCPLogTarget creates a GCPLog target either with the push or pull implementation, depending on the configured +// subscription type. +func NewGCPLogTarget( metrics *Metrics, logger log.Logger, handler api.EntryHandler, relabel []*relabel.Config, jobName string, config *scrapeconfig.GcplogTargetConfig, -) (*GcplogTarget, error) { - ctx, cancel := context.WithCancel(context.Background()) - - ps, err := pubsub.NewClient(ctx, config.ProjectID) - if err != nil { - return nil, err - } - - target := newGcplogTarget(metrics, logger, handler, relabel, jobName, config, ps, ctx, cancel) - - go func() { - _ = target.run() - }() - - return target, nil -} - -// nolint:revive -func newGcplogTarget( - metrics *Metrics, - logger log.Logger, - handler api.EntryHandler, - relabel []*relabel.Config, - jobName string, - config *scrapeconfig.GcplogTargetConfig, - pubsubClient *pubsub.Client, - ctx context.Context, - cancel func(), -) *GcplogTarget { - return &GcplogTarget{ - metrics: metrics, - logger: logger, - handler: handler, - relabelConfig: relabel, - config: config, - jobName: jobName, - ctx: ctx, - cancel: cancel, - ps: pubsubClient, - msgs: make(chan *pubsub.Message), - } -} - -func (t *GcplogTarget) run() error { - t.wg.Add(1) - defer t.wg.Done() - - send := t.handler.Chan() - - sub := t.ps.SubscriptionInProject(t.config.Subscription, t.config.ProjectID) - go func() { - // NOTE(kavi): `cancel` the context as exiting from this goroutine should stop main `run` loop - // It makesense as no more messages will be received. - defer t.cancel() - - err := sub.Receive(t.ctx, func(ctx context.Context, m *pubsub.Message) { - t.msgs <- m - }) - if err != nil { - level.Error(t.logger).Log("msg", "failed to receive pubsub messages", "error", err) - t.metrics.gcplogErrors.WithLabelValues(t.config.ProjectID).Inc() - t.metrics.gcplogTargetLastSuccessScrape.WithLabelValues(t.config.ProjectID, t.config.Subscription).SetToCurrentTime() - } - }() - - for { - select { - case <-t.ctx.Done(): - return t.ctx.Err() - case m := <-t.msgs: - entry, err := format(m, t.config.Labels, t.config.UseIncomingTimestamp, t.relabelConfig) - if err != nil { - level.Error(t.logger).Log("event", "error formating log entry", "cause", err) - m.Ack() - break - } - send <- entry - m.Ack() // Ack only after log is sent. - t.metrics.gcplogEntries.WithLabelValues(t.config.ProjectID).Inc() - } +) (Target, error) { + switch config.SubscriptionType { + case "pull", "": + return newPullTarget(metrics, logger, handler, relabel, jobName, config) + case "push": + return newPushTarget(metrics, logger, handler, jobName, config, relabel) + default: + return nil, fmt.Errorf("invalid subscription type: %s. valid options are 'push' and 'pull'", config.SubscriptionType) } } - -// sent implements target.Target. -func (t *GcplogTarget) Type() target.TargetType { - return target.GcplogTargetType -} - -func (t *GcplogTarget) Ready() bool { - // Return true just like all other targets. - // Rationale is gcplog scraping shouldn't stop because of some transient timeout errors. - // This transient failure can cause promtail readyness probe to fail which may prevent pod from starting. - // We have metrics now to track if scraping failed (`gcplog_target_last_success_scrape`). - return true -} - -func (t *GcplogTarget) DiscoveredLabels() model.LabelSet { - return nil -} - -func (t *GcplogTarget) Labels() model.LabelSet { - return t.config.Labels -} - -func (t *GcplogTarget) Details() interface{} { - return nil -} - -func (t *GcplogTarget) Stop() error { - t.cancel() - t.wg.Wait() - t.handler.Stop() - return nil -} diff --git a/clients/pkg/promtail/targets/gcplog/target_test.go b/clients/pkg/promtail/targets/gcplog/target_test.go index 266026dbb8445..fc311a9436021 100644 --- a/clients/pkg/promtail/targets/gcplog/target_test.go +++ b/clients/pkg/promtail/targets/gcplog/target_test.go @@ -1,235 +1,122 @@ package gcplog import ( - "context" - "sync" + "fmt" + "os" + "reflect" "testing" - "time" - "cloud.google.com/go/pubsub" - "cloud.google.com/go/pubsub/pstest" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/relabel" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "google.golang.org/api/option" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" + "github.com/grafana/loki/clients/pkg/promtail/api" "github.com/grafana/loki/clients/pkg/promtail/client/fake" "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" - "github.com/grafana/loki/clients/pkg/promtail/targets/target" ) -func TestGcplogTarget_Run(t *testing.T) { - // Goal: Check message written to pubsub topic is received by the target. - tt, apiclient, pubsubClient, teardown := testGcplogTarget(t) - defer teardown() - - // seed pubsub - ctx := context.Background() - tp, err := pubsubClient.CreateTopic(ctx, topic) - require.NoError(t, err) - _, err = pubsubClient.CreateSubscription(ctx, subscription, pubsub.SubscriptionConfig{ - Topic: tp, - }) - require.NoError(t, err) - - var wg sync.WaitGroup - - wg.Add(1) - go func() { - defer wg.Done() - _ = tt.run() - }() - - publishMessage(t, tp) - - // Wait till message is received by the run loop. - // NOTE(kavi): sleep is not ideal. but not other way to confirm if api.Handler received messages - time.Sleep(1 * time.Second) - - err = tt.Stop() - require.NoError(t, err) - - // wait till `run` stops. - wg.Wait() - - assert.Equal(t, 1, len(apiclient.Received())) -} - -func TestGcplogTarget_Stop(t *testing.T) { - // Goal: To test that `run()` stops when you invoke `target.Stop()` - - errs := make(chan error, 1) - - tt, _, _, teardown := testGcplogTarget(t) - defer teardown() - - var wg sync.WaitGroup - - wg.Add(1) - go func() { - defer wg.Done() - errs <- tt.run() - }() - - // invoke stop - _ = tt.Stop() - - // wait till run returns - wg.Wait() - - // wouldn't block as 1 error is buffered into the channel. - err := <-errs - - // returned error should be cancelled context error - assert.Equal(t, tt.ctx.Err(), err) -} - -func TestGcplogTarget_Type(t *testing.T) { - tt, _, _, teardown := testGcplogTarget(t) - defer teardown() - - assert.Equal(t, target.TargetType("Gcplog"), tt.Type()) -} - -func TestGcplogTarget_Ready(t *testing.T) { - tt, _, _, teardown := testGcplogTarget(t) - defer teardown() - - assert.Equal(t, true, tt.Ready()) -} - -func TestGcplogTarget_Labels(t *testing.T) { - tt, _, _, teardown := testGcplogTarget(t) - defer teardown() - - assert.Equal(t, model.LabelSet{"job": "test-gcplogtarget"}, tt.Labels()) -} - -func testGcplogTarget(t *testing.T) (*GcplogTarget, *fake.Client, *pubsub.Client, func()) { - t.Helper() - - ctx, cancel := context.WithCancel(context.Background()) - - mockSvr := pstest.NewServer() - conn, err := grpc.Dial(mockSvr.Addr, grpc.WithTransportCredentials(insecure.NewCredentials())) - require.NoError(t, err) - - mockpubsubClient, err := pubsub.NewClient(ctx, testConfig.ProjectID, option.WithGRPCConn(conn)) - require.NoError(t, err) - - fakeClient := fake.New(func() {}) - - target := newGcplogTarget( - NewMetrics(prometheus.NewRegistry()), - log.NewNopLogger(), - fakeClient, - nil, - "job-test-gcplogtarget", - testConfig, - mockpubsubClient, - ctx, - cancel, - ) - - // cleanup - return target, fakeClient, mockpubsubClient, func() { - cancel() - conn.Close() - mockSvr.Close() +func TestNewGCPLogTarget(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + + // Create fake promtail client + eh := fake.New(func() {}) + defer eh.Stop() + + type args struct { + metrics *Metrics + logger log.Logger + handler api.EntryHandler + relabel []*relabel.Config + jobName string + config *scrapeconfig.GcplogTargetConfig + } + tests := []struct { + name string + args args + wantType interface{} + wantErr assert.ErrorAssertionFunc + }{ + { + name: "defaults to pull target", + args: args{ + metrics: NewMetrics(prometheus.NewRegistry()), + logger: logger, + handler: eh, + relabel: nil, + jobName: "test_job", + config: &scrapeconfig.GcplogTargetConfig{ + SubscriptionType: "", + }, + }, + wantType: &pullTarget{}, + wantErr: assert.NoError, + }, + { + name: "pull SubscriptionType creates new pull target", + args: args{ + metrics: NewMetrics(prometheus.NewRegistry()), + logger: logger, + handler: eh, + relabel: nil, + jobName: "test_job", + config: &scrapeconfig.GcplogTargetConfig{ + SubscriptionType: "pull", + }, + }, + wantType: &pullTarget{}, + wantErr: assert.NoError, + }, + { + name: "push SubscriptionType creates new pull target", + args: args{ + metrics: NewMetrics(prometheus.NewRegistry()), + logger: logger, + handler: eh, + relabel: nil, + jobName: "test_job", + config: &scrapeconfig.GcplogTargetConfig{ + SubscriptionType: "push", + }, + }, + wantType: &pushTarget{}, + wantErr: assert.NoError, + }, + { + name: "unknown subscription type fails to create target", + args: args{ + metrics: NewMetrics(prometheus.NewRegistry()), + logger: logger, + handler: eh, + relabel: nil, + jobName: "test_job", + config: &scrapeconfig.GcplogTargetConfig{ + SubscriptionType: "magic", + }, + }, + wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + return assert.ErrorContains(t, err, "invalid subscription type: magic", i...) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Since the push target underlying http server registers metrics in the default registerer, we have to override it to prevent duplicate metrics errors. + prometheus.DefaultRegisterer = prometheus.NewRegistry() + + got, err := NewGCPLogTarget(tt.args.metrics, tt.args.logger, tt.args.handler, tt.args.relabel, tt.args.jobName, tt.args.config) + // If the target was started, stop it after test + if got != nil { + defer func() { _ = got.Stop() }() + } + + if !tt.wantErr(t, err, fmt.Sprintf("NewGCPLogTarget(%v, %v, %v, %v, %v, %v)", tt.args.metrics, tt.args.logger, tt.args.handler, tt.args.relabel, tt.args.jobName, tt.args.config)) { + return + } + if err == nil { + assert.IsType(t, tt.wantType, got, "created target type different than expected: Got: %s", reflect.TypeOf(got).Name()) + } + }) } -} - -func publishMessage(t *testing.T, topic *pubsub.Topic) { - t.Helper() - - ctx := context.Background() - res := topic.Publish(ctx, &pubsub.Message{Data: []byte(gcpLogEntry)}) - _, err := res.Get(ctx) // wait till message is actully published - require.NoError(t, err) -} - -const ( - project = "test-project" - topic = "test-topic" - subscription = "test-subscription" - - gcpLogEntry = ` -{ - "insertId": "ajv4d1f1ch8dr", - "logName": "projects/grafanalabs-dev/logs/cloudaudit.googleapis.com%2Fdata_access", - "protoPayload": { - "@type": "type.googleapis.com/google.cloud.audit.AuditLog", - "authenticationInfo": { - "principalEmail": "1040409107725-compute@developer.gserviceaccount.com", - "serviceAccountDelegationInfo": [ - { - "firstPartyPrincipal": { - "principalEmail": "service-1040409107725@compute-system.iam.gserviceaccount.com" - } - } - ] - }, - "authorizationInfo": [ - { - "granted": true, - "permission": "storage.objects.list", - "resource": "projects/_/buckets/dev-us-central1-cortex-tsdb-dev", - "resourceAttributes": { - } - }, - { - "permission": "storage.objects.get", - "resource": "projects/_/buckets/dev-us-central1-cortex-tsdb-dev/objects/load-generator-20/01EM34PFBC2SCV3ETBGRAQZ090/deletion-mark.json", - "resourceAttributes": { - } - } - ], - "methodName": "storage.objects.get", - "requestMetadata": { - "callerIp": "34.66.19.193", - "callerNetwork": "//compute.googleapis.com/projects/grafanalabs-dev/global/networks/__unknown__", - "callerSuppliedUserAgent": "thanos-store-gateway/1.5.0 (go1.14.9),gzip(gfe)", - "destinationAttributes": { - }, - "requestAttributes": { - "auth": { - }, - "time": "2021-01-01T02:17:10.661405637Z" - } - }, - "resourceLocation": { - "currentLocations": [ - "us-central1" - ] - }, - "resourceName": "projects/_/buckets/dev-us-central1-cortex-tsdb-dev/objects/load-generator-20/01EM34PFBC2SCV3ETBGRAQZ090/deletion-mark.json", - "serviceName": "storage.googleapis.com", - "status": { - } - }, - "receiveTimestamp": "2021-01-01T02:17:10.82013623Z", - "resource": { - "labels": { - "bucket_name": "dev-us-central1-cortex-tsdb-dev", - "location": "us-central1", - "project_id": "grafanalabs-dev" - }, - "type": "gcs_bucket" - }, - "severity": "INFO", - "timestamp": "2021-01-01T02:17:10.655982344Z" -} -` -) - -var testConfig = &scrapeconfig.GcplogTargetConfig{ - ProjectID: project, - Subscription: subscription, - Labels: model.LabelSet{ - "job": "test-gcplogtarget", - }, } diff --git a/clients/pkg/promtail/targets/gcplog/targetmanager.go b/clients/pkg/promtail/targets/gcplog/targetmanager.go index 12fbd8724aeed..71f3b5130a2fc 100644 --- a/clients/pkg/promtail/targets/gcplog/targetmanager.go +++ b/clients/pkg/promtail/targets/gcplog/targetmanager.go @@ -15,7 +15,7 @@ import ( // nolint:revive type GcplogTargetManager struct { logger log.Logger - targets map[string]*GcplogTarget + targets map[string]Target } func NewGcplogTargetManager( @@ -26,7 +26,7 @@ func NewGcplogTargetManager( ) (*GcplogTargetManager, error) { tm := &GcplogTargetManager{ logger: logger, - targets: make(map[string]*GcplogTarget), + targets: make(map[string]Target), } for _, cf := range scrape { @@ -38,7 +38,7 @@ func NewGcplogTargetManager( return nil, err } - t, err := NewGcplogTarget(metrics, logger, pipeline.Wrap(client), cf.RelabelConfigs, cf.JobName, cf.GcplogConfig) + t, err := NewGCPLogTarget(metrics, logger, pipeline.Wrap(client), cf.RelabelConfigs, cf.JobName, cf.GcplogConfig) if err != nil { return nil, fmt.Errorf("failed to create pubsub target: %w", err) } @@ -60,7 +60,7 @@ func (tm *GcplogTargetManager) Ready() bool { func (tm *GcplogTargetManager) Stop() { for name, t := range tm.targets { if err := t.Stop(); err != nil { - level.Error(t.logger).Log("event", "failed to stop pubsub target", "name", name, "cause", err) + level.Error(tm.logger).Log("event", "failed to stop pubsub target", "name", name, "cause", err) } } } diff --git a/clients/pkg/promtail/targets/heroku/target.go b/clients/pkg/promtail/targets/heroku/target.go index 0b64efbafa402..e40cf4a4f45b6 100644 --- a/clients/pkg/promtail/targets/heroku/target.go +++ b/clients/pkg/promtail/targets/heroku/target.go @@ -1,7 +1,6 @@ package heroku import ( - "flag" "fmt" "net/http" "strings" @@ -10,8 +9,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" herokuEncoding "github.com/heroku/x/logplex/encoding" - "github.com/imdario/mergo" - "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" @@ -21,6 +18,7 @@ import ( "github.com/grafana/loki/clients/pkg/promtail/api" lokiClient "github.com/grafana/loki/clients/pkg/promtail/client" "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" + "github.com/grafana/loki/clients/pkg/promtail/targets/serverutils" "github.com/grafana/loki/clients/pkg/promtail/targets/target" "github.com/grafana/loki/pkg/logproto" @@ -50,26 +48,14 @@ func NewTarget(metrics *Metrics, logger log.Logger, handler api.EntryHandler, jo relabelConfigs: relabel, } - // Bit of a chicken and egg problem trying to register the defaults and apply overrides from the loaded config. - // First create an empty config and set defaults. - defaults := server.Config{} - defaults.RegisterFlags(flag.NewFlagSet("empty", flag.ContinueOnError)) - // Then apply any config values loaded as overrides to the defaults. - if err := mergo.Merge(&defaults, config.Server, mergo.WithOverride); err != nil { - return nil, errors.Wrap(err, "failed to parse configs and override defaults when configuring heroku drain target") - } - // The merge won't overwrite with a zero value but in the case of ports 0 value - // indicates the desire for a random port so reset these to zero if the incoming config val is 0 - if config.Server.HTTPListenPort == 0 { - defaults.HTTPListenPort = 0 - } - if config.Server.GRPCListenPort == 0 { - defaults.GRPCListenPort = 0 + mergedServerConfigs, err := serverutils.MergeWithDefaults(config.Server) + if err != nil { + return nil, fmt.Errorf("failed to parse configs and override defaults when configuring heroku drain target: %w", err) } // Set the config to the new combined config. - config.Server = defaults + config.Server = mergedServerConfigs - err := ht.run() + err = ht.run() if err != nil { return nil, err } diff --git a/clients/pkg/promtail/targets/lokipush/pushtarget.go b/clients/pkg/promtail/targets/lokipush/pushtarget.go index f1df431ca71f0..d1d6e862d97d8 100644 --- a/clients/pkg/promtail/targets/lokipush/pushtarget.go +++ b/clients/pkg/promtail/targets/lokipush/pushtarget.go @@ -2,7 +2,7 @@ package lokipush import ( "bufio" - "flag" + "fmt" "io" "net/http" "sort" @@ -11,7 +11,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/imdario/mergo" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -23,6 +22,7 @@ import ( "github.com/grafana/loki/clients/pkg/promtail/api" "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" + "github.com/grafana/loki/clients/pkg/promtail/targets/serverutils" "github.com/grafana/loki/clients/pkg/promtail/targets/target" "github.com/grafana/loki/pkg/loghttp/push" @@ -54,26 +54,14 @@ func NewPushTarget(logger log.Logger, config: config, } - // Bit of a chicken and egg problem trying to register the defaults and apply overrides from the loaded config. - // First create an empty config and set defaults. - defaults := server.Config{} - defaults.RegisterFlags(flag.NewFlagSet("empty", flag.ContinueOnError)) - // Then apply any config values loaded as overrides to the defaults. - if err := mergo.Merge(&defaults, config.Server, mergo.WithOverride); err != nil { - level.Error(logger).Log("msg", "failed to parse configs and override defaults when configuring push server", "err", err) - } - // The merge won't overwrite with a zero value but in the case of ports 0 value - // indicates the desire for a random port so reset these to zero if the incoming config val is 0 - if config.Server.HTTPListenPort == 0 { - defaults.HTTPListenPort = 0 - } - if config.Server.GRPCListenPort == 0 { - defaults.GRPCListenPort = 0 + mergedServerConfigs, err := serverutils.MergeWithDefaults(config.Server) + if err != nil { + return nil, fmt.Errorf("failed to parse configs and override defaults when configuring loki push target: %w", err) } // Set the config to the new combined config. - config.Server = defaults + config.Server = mergedServerConfigs - err := pt.run() + err = pt.run() if err != nil { return nil, err } diff --git a/clients/pkg/promtail/targets/serverutils/config.go b/clients/pkg/promtail/targets/serverutils/config.go new file mode 100644 index 0000000000000..39f61088b8dc9 --- /dev/null +++ b/clients/pkg/promtail/targets/serverutils/config.go @@ -0,0 +1,29 @@ +package serverutils + +import ( + "flag" + + "github.com/imdario/mergo" + "github.com/weaveworks/common/server" +) + +// MergeWithDefaults applies server.Config defaults to a given and different server.Config. +func MergeWithDefaults(config server.Config) (server.Config, error) { + // Bit of a chicken and egg problem trying to register the defaults and apply overrides from the loaded config. + // First create an empty config and set defaults. + mergee := server.Config{} + mergee.RegisterFlags(flag.NewFlagSet("empty", flag.ContinueOnError)) + // Then apply any config values loaded as overrides to the defaults. + if err := mergo.Merge(&mergee, config, mergo.WithOverride); err != nil { + return server.Config{}, err + } + // The merge won't overwrite with a zero value but in the case of ports 0 value + // indicates the desire for a random port so reset these to zero if the incoming config val is 0 + if config.HTTPListenPort == 0 { + mergee.HTTPListenPort = 0 + } + if config.GRPCListenPort == 0 { + mergee.GRPCListenPort = 0 + } + return mergee, nil +} diff --git a/docs/sources/clients/promtail/configuration.md b/docs/sources/clients/promtail/configuration.md index d9370b74e6e2c..603584e923cc6 100644 --- a/docs/sources/clients/promtail/configuration.md +++ b/docs/sources/clients/promtail/configuration.md @@ -330,6 +330,9 @@ job_name: # Describes how to scrape logs from the Windows event logs. [windows_events: ] +# Configuration describing how to pull/receive Google Cloud Platform (GCP) logs. +[gcplog: ] + # Describes how to fetch logs from Kafka via a Consumer group. [kafka: ] @@ -930,6 +933,54 @@ labels: [use_incoming_timestamp: | default = false] ``` +### GCP Log + +The `gcplog` block configures how Promtail receives GCP logs. There are two strategies, based on the configuration of `subscription_type`: +- **Pull**: Using GCP Pub/Sub [pull subscriptions](https://cloud.google.com/pubsub/docs/pull). Promtail will consume log messages directly from the configured GCP Pub/Sub topic. +- **Push**: Using GCP Pub/Sub [push subscriptions](https://cloud.google.com/pubsub/docs/push). Promtail will expose an HTTP server, and GCP will deliver logs to that server. + +When using the `push` subscription type, keep in mind: +- The `server` configuration is the same as [server](#server), since Promtail exposes an HTTP server for target that requires so. +- An endpoint at `POST /gcp/api/v1/push`, which expects requests from GCP PubSub message delivery system. + +```yaml +# Type of subscription used to fetch logs from GCP. Can be either `pull` (default) or `push`. +[subscription_type: | default = "pull"] + +# If the subscription_type is pull, the GCP project ID +[project_id: ] + +# If the subscription_type is pull, GCP PubSub subscription from where Promtail will pull logs from +[subscription: ] + +# If the subscription_type is push, the server configuration options +[server: ] + +# Whether Promtail should pass on the timestamp from the incoming GCP Log message. +# When false, or if no timestamp is present in the syslog message, Promtail will assign the current +# timestamp to the log when it was processed. +[use_incoming_timestamp: | default = false] + +# Label map to add to every log message. +labels: + [ : ... ] +``` + +### Available Labels + +When Promtail receives GCP logs, various internal labels are made available for [relabeling](#relabeling). This depends on the subscription type chosen. + +**Internal labels available for pull** + +- `__gcp_logname` +- `__gcp_resource_type` +- `__gcp_resource_labels_` + +**Internal labels available for push** + +- `__gcp_message_id` +- `__gcp_attributes_*`: All attributes read from `.message.attributes` in the incoming push message. Each attribute key is conveniently renamed, since it might contain unsupported characters. For example, `logging.googleapis.com/timestamp` is converted to `__gcp_attributes_logging_googleapis_com_timestamp`. + ### kafka The `kafka` block configures Promtail to scrape logs from [Kafka](https://kafka.apache.org/) using a group consumer. diff --git a/docs/sources/clients/promtail/scraping.md b/docs/sources/clients/promtail/scraping.md index e0df0fa083461..96754a991e46a 100644 --- a/docs/sources/clients/promtail/scraping.md +++ b/docs/sources/clients/promtail/scraping.md @@ -190,13 +190,19 @@ resuming the target without skipping logs. see the [configuration](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#windows_events) section for more information. -## Gcplog scraping -Promtail supports scraping cloud resource logs(say GCS bucket logs, Load Balancer logs, Kubernetes Cluster logs) from GCP. -Configs are set in `gcplog` section in `scrape_config` +## GCP Log scraping + +Promtail supports scraping cloud resource logs such as GCS bucket logs, load balancer logs, and Kubernetes cluster logs from GCP. +Configuration is specified in the `gcplog` section, within `scrape_config`. + +There are two kind of scraping strategies: `pull` and `push`. + +### Pull ```yaml - job_name: gcplog gcplog: + subscription_type: "pull" # If the `subscription_type` field is empty, defaults to `pull` project_id: "my-gcp-project" subscription: "my-pubsub-subscription" use_incoming_timestamp: false # default rewrite timestamps. @@ -223,6 +229,38 @@ When Promtail receives GCP logs, various internal labels are made available for - `__gcp_resource_labels_` In the example above, the `project_id` label from a GCP resource was transformed into a label called `project` through `relabel_configs`. +### Push + +```yaml + - job_name: gcplog + gcplog: + subscription_type: "push" + use_incoming_timestamp: false + labels: + job: "gcplog-push" + server: + http_listen_address: 0.0.0.0 + http_listen_port: 8080 + relabel_configs: + - source_labels: ['__gcp_message_id'] + target_label: 'message_id' + - source_labels: ['__gcp_attributes_logging_googleapis_com_timestamp'] + target_label: 'incoming_ts' +``` + +When configuring the GCP Log push target, Promtail will start an HTTP server listening on port `8080`, as configured in the `server` +section. This server exposes the single endpoint `POST /gcp/api/v1/push`, responsible for receiving logs from GCP. + +It also supports `relabeling` and `pipeline` stages. + +When Promtail receives GCP logs, various internal labels are made available for [relabeling](#relabeling): +- `__gcp_message_id` +- `__gcp_attributes_` + +In the example above, the `__gcp_message_id` and the `__gcp_attributes_logging_googleapis_com_timestamp` labels are +transformed to `message_id` and `incoming_ts` through `relabel_configs`. All other internal labels, for example some other attribute, +will be dropped by the target if not transformed. + ## Syslog Receiver Promtail supports receiving [IETF Syslog (RFC5424)](https://tools.ietf.org/html/rfc5424)