From 40023cd9554af4d3ef59da58fd20a9d5cd1610f8 Mon Sep 17 00:00:00 2001 From: Michael Desa Date: Mon, 1 May 2017 15:29:02 -0400 Subject: [PATCH] Add alertpost service Add enough to make tests pass Add correct POST alert handling Add tests for .endpoint on .post on alert node Add alertpost config tests Remove NewConfig section Add comments to alertpost/config.go Add fixes suggested in PR Move AlertData to alert package from service/alert Implement Test function Allow override variables to set map values Rename alertpost service to httppost Favorite package name is httpposttest :) Add endpoint property method to httpPost Allow variadic httpPost arguments Update config file for httppost Add endpoint test to http post node Make Fixes suggested in PR Add documentation for variadic arguments it httpPost Add test for map enviornment vars Refactor httppost endpoints Reorder code in httpost service package Add support for basic auth to endpoints Make changes from PR Fix httpPost functionality Allow headers to be set for alerts via tickscript Check headers Unredact headers --- CHANGELOG.md | 4 + alert.go | 20 +- alert/types.go | 24 ++ etc/kapacitor/kapacitor.conf | 14 + http_post.go | 44 +- integrations/batcher_test.go | 16 +- integrations/helpers_test.go | 4 +- integrations/streamer_test.go | 388 ++++++++++++++---- pipeline/alert.go | 65 ++- pipeline/http_post.go | 97 ++++- pipeline/node.go | 6 +- server/config.go | 78 ++++ server/config_test.go | 9 + server/server.go | 14 + server/server_test.go | 130 +++++- services/alert/alerttest/alerttest.go | 20 +- services/alert/handlers.go | 33 +- services/alert/service.go | 4 + services/config/override/doc.go | 3 + services/config/override/override.go | 19 +- .../config/override/override_internal_test.go | 46 +++ services/httppost/config.go | 77 ++++ services/httppost/httpposttest/server.go | 53 +++ services/httppost/service.go | 254 ++++++++++++ task_master.go | 5 + 25 files changed, 1256 insertions(+), 171 deletions(-) create mode 100644 services/httppost/config.go create mode 100644 services/httppost/httpposttest/server.go create mode 100644 services/httppost/service.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 7683bf5e1..1fecb8d25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased [unreleased] +### Features + +- [#117](https://github.com/influxdata/kapacitor/issues/117): Add headers to alert POST requests. + ### Bugfixes - [#1294](https://github.com/influxdata/kapacitor/issues/1294): Fix bug where batch queries would be missing all fields after the first nil field. diff --git a/alert.go b/alert.go index 4f05a7131..5b4912bd0 100644 --- a/alert.go +++ b/alert.go @@ -17,6 +17,7 @@ import ( "github.com/influxdata/kapacitor/pipeline" alertservice "github.com/influxdata/kapacitor/services/alert" "github.com/influxdata/kapacitor/services/hipchat" + "github.com/influxdata/kapacitor/services/httppost" "github.com/influxdata/kapacitor/services/opsgenie" "github.com/influxdata/kapacitor/services/pagerduty" "github.com/influxdata/kapacitor/services/pushover" @@ -130,15 +131,6 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an * return nil, err } - // Construct alert handlers - for _, post := range n.PostHandlers { - c := alertservice.PostHandlerConfig{ - URL: post.URL, - } - h := alertservice.NewPostHandler(c, l) - an.handlers = append(an.handlers, h) - } - for _, tcp := range n.TcpHandlers { c := alertservice.TCPHandlerConfig{ Address: tcp.Address, @@ -363,6 +355,16 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an * an.handlers = append(an.handlers, h) } + for _, p := range n.HTTPPostHandlers { + c := httppost.HandlerConfig{ + URL: p.URL, + Endpoint: p.Endpoint, + Headers: p.Headers, + } + h := et.tm.HTTPPostService.Handler(c, l) + an.handlers = append(an.handlers, h) + } + for _, og := range n.OpsGenieHandlers { c := opsgenie.HandlerConfig{ TeamsList: og.TeamsList, diff --git a/alert/types.go b/alert/types.go index 3cc521361..4fb131120 100644 --- a/alert/types.go +++ b/alert/types.go @@ -17,6 +17,18 @@ type Event struct { previousState EventState } +func (e Event) AlertData() Data { + return Data{ + ID: e.State.ID, + Message: e.State.Message, + Details: e.State.Details, + Time: e.State.Time, + Duration: e.State.Duration, + Level: e.State.Level, + Data: e.Data.Result, + } +} + func (e Event) PreviousState() EventState { return e.previousState } @@ -154,3 +166,15 @@ type TopicState struct { Level Level Collected int64 } + +// Data is a structure that contains relevant data about an alert event. +// The structure is intended to be JSON encoded, providing a consistent data format. +type Data struct { + ID string `json:"id"` + Message string `json:"message"` + Details string `json:"details"` + Time time.Time `json:"time"` + Duration time.Duration `json:"duration"` + Level Level `json:"level"` + Data models.Result `json:"data"` +} diff --git a/etc/kapacitor/kapacitor.conf b/etc/kapacitor/kapacitor.conf index a9415d4e8..0abd98d54 100644 --- a/etc/kapacitor/kapacitor.conf +++ b/etc/kapacitor/kapacitor.conf @@ -275,6 +275,20 @@ default-retention-policy = "" # The URL for the Pushover API. url = "https://api.pushover.net/1/messages.json" +########################################## +# Configure Alert POST request Endpoints + +# As ENV variables: +# KAPACITOR_HTTPPOST_0_ENDPOINT = "example" +# KAPACITOR_HTTPPOST_0_URL = "http://example.com" +# KAPACITOR_HTTPPOST_0_HEADERS_Example = "header" + +# [[httppost]] +# endpoint = "example" +# url = "http://example.com" +# headers = { Example = "your-key" } +# basic-auth = { username = "my-user", password = "my-pass" } + [slack] # Configure Slack. enabled = false diff --git a/http_post.go b/http_post.go index 42910ffa3..a536351cc 100644 --- a/http_post.go +++ b/http_post.go @@ -2,6 +2,7 @@ package kapacitor import ( "encoding/json" + "fmt" "log" "net/http" "sync" @@ -9,24 +10,42 @@ import ( "github.com/influxdata/kapacitor/bufpool" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" + "github.com/influxdata/kapacitor/services/httppost" ) type HTTPPostNode struct { node - c *pipeline.HTTPPostNode - url string - mu sync.RWMutex - bp *bufpool.Pool + c *pipeline.HTTPPostNode + endpoint *httppost.Endpoint + mu sync.RWMutex + bp *bufpool.Pool } // Create a new HTTPPostNode which submits received items via POST to an HTTP endpoint func newHTTPPostNode(et *ExecutingTask, n *pipeline.HTTPPostNode, l *log.Logger) (*HTTPPostNode, error) { + hn := &HTTPPostNode{ node: node{Node: n, et: et, logger: l}, c: n, bp: bufpool.New(), - url: n.Url, } + + // Should only ever be 0 or 1 from validation of n + if len(n.URLs) == 1 { + e := httppost.NewEndpoint(n.URLs[0], nil, httppost.BasicAuth{}) + hn.endpoint = e + } + + // Should only ever be 0 or 1 from validation of n + if len(n.HTTPPostEndpoints) == 1 { + endpointName := n.HTTPPostEndpoints[0].Endpoint + e, ok := et.tm.HTTPPostService.Endpoint(endpointName) + if !ok { + return nil, fmt.Errorf("endpoint '%s' does not exist", endpointName) + } + hn.endpoint = e + } + hn.node.runF = hn.runPost return hn, nil } @@ -72,14 +91,27 @@ func (h *HTTPPostNode) postRow(group models.GroupID, row *models.Row) { defer h.bp.Put(body) err := json.NewEncoder(body).Encode(result) if err != nil { + h.incrementErrorCount() + h.logger.Printf("E! failed to marshal row data json: %v", err) + return + } + req, err := h.endpoint.NewHTTPRequest(body) + if err != nil { + h.incrementErrorCount() h.logger.Printf("E! failed to marshal row data json: %v", err) return } - resp, err := http.Post(h.url, "application/json", body) + req.Header.Set("Content-Type", "application/json") + for k, v := range h.c.Headers { + req.Header.Set(k, v) + } + resp, err := http.DefaultClient.Do(req) if err != nil { + h.incrementErrorCount() h.logger.Printf("E! failed to POST row data: %v", err) return } resp.Body.Close() + } diff --git a/integrations/batcher_test.go b/integrations/batcher_test.go index b3a33bb26..f4b88a6af 100644 --- a/integrations/batcher_test.go +++ b/integrations/batcher_test.go @@ -18,6 +18,7 @@ import ( "github.com/influxdata/kapacitor/clock" "github.com/influxdata/kapacitor/models" alertservice "github.com/influxdata/kapacitor/services/alert" + "github.com/influxdata/kapacitor/services/httppost" "github.com/influxdata/kapacitor/services/storage/storagetest" "github.com/influxdata/wlog" ) @@ -1389,7 +1390,7 @@ batch func TestBatch_AlertStateChangesOnly(t *testing.T) { requestCount := int32(0) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ad := alertservice.AlertData{} + ad := alert.Data{} dec := json.NewDecoder(r.Body) err := dec.Decode(&ad) if err != nil { @@ -1397,7 +1398,7 @@ func TestBatch_AlertStateChangesOnly(t *testing.T) { } atomic.AddInt32(&requestCount, 1) if rc := atomic.LoadInt32(&requestCount); rc == 1 { - expAd := alertservice.AlertData{ + expAd := alert.Data{ ID: "cpu_usage_idle:cpu=cpu-total", Message: "cpu_usage_idle:cpu=cpu-total is CRITICAL", Time: time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), @@ -1408,7 +1409,7 @@ func TestBatch_AlertStateChangesOnly(t *testing.T) { t.Error(msg) } } else { - expAd := alertservice.AlertData{ + expAd := alert.Data{ ID: "cpu_usage_idle:cpu=cpu-total", Message: "cpu_usage_idle:cpu=cpu-total is OK", Time: time.Date(1971, 1, 1, 0, 0, 38, 0, time.UTC), @@ -1454,7 +1455,7 @@ batch func TestBatch_AlertStateChangesOnlyExpired(t *testing.T) { requestCount := int32(0) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ad := alertservice.AlertData{} + ad := alert.Data{} dec := json.NewDecoder(r.Body) err := dec.Decode(&ad) if err != nil { @@ -1462,11 +1463,11 @@ func TestBatch_AlertStateChangesOnlyExpired(t *testing.T) { } // We don't care about the data for this test ad.Data = models.Result{} - var expAd alertservice.AlertData + var expAd alert.Data atomic.AddInt32(&requestCount, 1) rc := atomic.LoadInt32(&requestCount) if rc < 3 { - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "cpu_usage_idle:cpu=cpu-total", Message: "cpu_usage_idle:cpu=cpu-total is CRITICAL", Time: time.Date(1971, 1, 1, 0, 0, int(rc-1)*20, 0, time.UTC), @@ -1474,7 +1475,7 @@ func TestBatch_AlertStateChangesOnlyExpired(t *testing.T) { Level: alert.Critical, } } else { - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "cpu_usage_idle:cpu=cpu-total", Message: "cpu_usage_idle:cpu=cpu-total is OK", Time: time.Date(1971, 1, 1, 0, 0, 38, 0, time.UTC), @@ -2923,6 +2924,7 @@ func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.Ex tm.HTTPDService = httpdService tm.TaskStore = taskStore{} tm.DeadmanService = deadman{} + tm.HTTPPostService = httppost.NewService(nil, logService.NewLogger("[httppost] ", log.LstdFlags)) as := alertservice.NewService(logService.NewLogger("[alert] ", log.LstdFlags)) as.StorageService = storagetest.New() as.HTTPDService = httpdService diff --git a/integrations/helpers_test.go b/integrations/helpers_test.go index 27cfcc1f3..f682bd918 100644 --- a/integrations/helpers_test.go +++ b/integrations/helpers_test.go @@ -10,9 +10,9 @@ import ( "time" "github.com/influxdata/kapacitor" + "github.com/influxdata/kapacitor/alert" "github.com/influxdata/kapacitor/influxdb" "github.com/influxdata/kapacitor/models" - alertservice "github.com/influxdata/kapacitor/services/alert" "github.com/influxdata/kapacitor/services/httpd" k8s "github.com/influxdata/kapacitor/services/k8s/client" "github.com/influxdata/kapacitor/udf" @@ -117,7 +117,7 @@ func compareResultsIgnoreSeriesOrder(exp, got models.Result) (bool, string) { return true, "" } -func compareAlertData(exp, got alertservice.AlertData) (bool, string) { +func compareAlertData(exp, got alert.Data) (bool, string) { // Pull out Result for comparison expData := exp.Data exp.Data = models.Result{} diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 13bdd9b09..4758ff963 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -33,6 +33,8 @@ import ( "github.com/influxdata/kapacitor/services/alerta/alertatest" "github.com/influxdata/kapacitor/services/hipchat" "github.com/influxdata/kapacitor/services/hipchat/hipchattest" + "github.com/influxdata/kapacitor/services/httppost" + "github.com/influxdata/kapacitor/services/httppost/httpposttest" k8s "github.com/influxdata/kapacitor/services/k8s/client" "github.com/influxdata/kapacitor/services/logging/loggingtest" "github.com/influxdata/kapacitor/services/opsgenie" @@ -2236,6 +2238,159 @@ stream } } +func TestStream_HttpPostEndpoint(t *testing.T) { + headers := map[string]string{"my": "header"} + requestCount := int32(0) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + for k, v := range headers { + nv := r.Header.Get(k) + if nv != v { + t.Fatalf("got '%s:%s', exp '%s:%s'", k, nv, k, v) + } + } + result := models.Result{} + dec := json.NewDecoder(r.Body) + err := dec.Decode(&result) + if err != nil { + t.Fatal(err) + } + atomic.AddInt32(&requestCount, 1) + rc := atomic.LoadInt32(&requestCount) + + var er models.Result + switch rc { + case 1: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "idle"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 97.1, + }}, + }, + }, + } + case 2: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "idle"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC), + 92.6, + }}, + }, + }, + } + case 3: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "idle"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC), + 95.6, + }}, + }, + }, + } + case 4: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "idle"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC), + 93.1, + }}, + }, + }, + } + case 5: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "idle"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), + 92.6, + }}, + }, + }, + } + case 6: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "idle"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC), + 95.8, + }}, + }, + }, + } + } + if eq, msg := compareResults(er, result); !eq { + t.Errorf("unexpected alert data for request: %d %s", rc, msg) + } + })) + defer ts.Close() + + var script = ` +stream + |from() + .measurement('cpu') + .where(lambda: "host" == 'serverA') + .groupBy('host') + |httpPost() + .endpoint('test') + .header('my', 'header') + |httpOut('TestStream_HttpPost') +` + + er := models.Result{ + Series: models.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "idle"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC), + 95.8, + }}, + }, + }, + } + + tmInit := func(tm *kapacitor.TaskMaster) { + c := httppost.Config{} + c.URL = ts.URL + c.Endpoint = "test" + sl := httppost.NewService(httppost.Configs{c}, logService.NewLogger("[test_httppost_endpoint] ", log.LstdFlags)) + tm.HTTPPostService = sl + } + + testStreamerWithOutput(t, "TestStream_HttpPost", script, 13*time.Second, er, false, tmInit) + + if rc := atomic.LoadInt32(&requestCount); rc != 6 { + t.Errorf("got %v exp %v", rc, 6) + } +} + func TestStream_HttpOutPassThrough(t *testing.T) { var script = ` @@ -5601,7 +5756,7 @@ stream func TestStream_Alert(t *testing.T) { requestCount := int32(0) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ad := alertservice.AlertData{} + ad := alert.Data{} dec := json.NewDecoder(r.Body) err := dec.Decode(&ad) if err != nil { @@ -5609,7 +5764,7 @@ func TestStream_Alert(t *testing.T) { } atomic.AddInt32(&requestCount, 1) rc := atomic.LoadInt32(&requestCount) - expAd := alertservice.AlertData{ + expAd := alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is CRITICAL", Details: "details", @@ -5691,7 +5846,7 @@ stream func TestStream_Alert_NoRecoveries(t *testing.T) { requestCount := int32(0) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ad := alertservice.AlertData{} + ad := alert.Data{} dec := json.NewDecoder(r.Body) err := dec.Decode(&ad) if err != nil { @@ -5699,10 +5854,10 @@ func TestStream_Alert_NoRecoveries(t *testing.T) { } atomic.AddInt32(&requestCount, 1) rc := atomic.LoadInt32(&requestCount) - var expAd alertservice.AlertData + var expAd alert.Data switch rc { case 1: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is WARNING", Time: time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), @@ -5723,7 +5878,7 @@ func TestStream_Alert_NoRecoveries(t *testing.T) { }, } case 2: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is INFO", Time: time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC), @@ -5744,7 +5899,7 @@ func TestStream_Alert_NoRecoveries(t *testing.T) { }, } case 3: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is WARNING", Time: time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC), @@ -5765,7 +5920,7 @@ func TestStream_Alert_NoRecoveries(t *testing.T) { }, } case 4: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is WARNING", Time: time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), @@ -5786,7 +5941,7 @@ func TestStream_Alert_NoRecoveries(t *testing.T) { }, } case 5: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is CRITICAL", Time: time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC), @@ -5807,7 +5962,7 @@ func TestStream_Alert_NoRecoveries(t *testing.T) { }, } case 6: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is INFO", Time: time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC), @@ -5878,7 +6033,7 @@ stream func TestStream_Alert_WithReset_0(t *testing.T) { requestCount := int32(0) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ad := alertservice.AlertData{} + ad := alert.Data{} dec := json.NewDecoder(r.Body) err := dec.Decode(&ad) if err != nil { @@ -5886,10 +6041,10 @@ func TestStream_Alert_WithReset_0(t *testing.T) { } atomic.AddInt32(&requestCount, 1) rc := atomic.LoadInt32(&requestCount) - var expAd alertservice.AlertData + var expAd alert.Data switch rc { case 1: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is INFO", Details: "details", @@ -5910,7 +6065,7 @@ func TestStream_Alert_WithReset_0(t *testing.T) { }, } case 2: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is INFO", Details: "details", @@ -5932,7 +6087,7 @@ func TestStream_Alert_WithReset_0(t *testing.T) { }, } case 3: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is INFO", Details: "details", @@ -5954,7 +6109,7 @@ func TestStream_Alert_WithReset_0(t *testing.T) { }, } case 4: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is OK", Details: "details", @@ -5976,7 +6131,7 @@ func TestStream_Alert_WithReset_0(t *testing.T) { }, } case 5: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is INFO", Details: "details", @@ -5998,7 +6153,7 @@ func TestStream_Alert_WithReset_0(t *testing.T) { }, } case 6: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is WARNING", Details: "details", @@ -6020,7 +6175,7 @@ func TestStream_Alert_WithReset_0(t *testing.T) { }, } case 7: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is WARNING", Details: "details", @@ -6042,7 +6197,7 @@ func TestStream_Alert_WithReset_0(t *testing.T) { }, } case 8: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is OK", Details: "details", @@ -6064,7 +6219,7 @@ func TestStream_Alert_WithReset_0(t *testing.T) { }, } case 9: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is INFO", Details: "details", @@ -6086,7 +6241,7 @@ func TestStream_Alert_WithReset_0(t *testing.T) { }, } case 10: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is WARNING", Details: "details", @@ -6108,7 +6263,7 @@ func TestStream_Alert_WithReset_0(t *testing.T) { }, } case 11: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is CRITICAL", Details: "details", @@ -6130,7 +6285,7 @@ func TestStream_Alert_WithReset_0(t *testing.T) { }, } case 12: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is OK", Details: "details", @@ -6216,7 +6371,7 @@ stream func TestStream_Alert_WithReset_1(t *testing.T) { requestCount := int32(0) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ad := alertservice.AlertData{} + ad := alert.Data{} dec := json.NewDecoder(r.Body) err := dec.Decode(&ad) if err != nil { @@ -6224,10 +6379,10 @@ func TestStream_Alert_WithReset_1(t *testing.T) { } atomic.AddInt32(&requestCount, 1) rc := atomic.LoadInt32(&requestCount) - var expAd alertservice.AlertData + var expAd alert.Data switch rc { case 1: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is INFO", Details: "details", @@ -6248,7 +6403,7 @@ func TestStream_Alert_WithReset_1(t *testing.T) { }, } case 2: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is INFO", Details: "details", @@ -6270,7 +6425,7 @@ func TestStream_Alert_WithReset_1(t *testing.T) { }, } case 3: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is INFO", Details: "details", @@ -6292,7 +6447,7 @@ func TestStream_Alert_WithReset_1(t *testing.T) { }, } case 4: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is OK", Details: "details", @@ -6314,7 +6469,7 @@ func TestStream_Alert_WithReset_1(t *testing.T) { }, } case 5: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is INFO", Details: "details", @@ -6336,7 +6491,7 @@ func TestStream_Alert_WithReset_1(t *testing.T) { }, } case 6: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is WARNING", Details: "details", @@ -6358,7 +6513,7 @@ func TestStream_Alert_WithReset_1(t *testing.T) { }, } case 7: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is INFO", Details: "details", @@ -6380,7 +6535,7 @@ func TestStream_Alert_WithReset_1(t *testing.T) { }, } case 8: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is OK", Details: "details", @@ -6402,7 +6557,7 @@ func TestStream_Alert_WithReset_1(t *testing.T) { }, } case 9: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is INFO", Details: "details", @@ -6424,7 +6579,7 @@ func TestStream_Alert_WithReset_1(t *testing.T) { }, } case 10: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is WARNING", Details: "details", @@ -6446,7 +6601,7 @@ func TestStream_Alert_WithReset_1(t *testing.T) { }, } case 11: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is CRITICAL", Details: "details", @@ -6468,7 +6623,7 @@ func TestStream_Alert_WithReset_1(t *testing.T) { }, } case 12: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is WARNING", Details: "details", @@ -6490,7 +6645,7 @@ func TestStream_Alert_WithReset_1(t *testing.T) { }, } case 13: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is WARNING", Details: "details", @@ -6512,7 +6667,7 @@ func TestStream_Alert_WithReset_1(t *testing.T) { }, } case 14: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is INFO", Details: "details", @@ -6534,7 +6689,7 @@ func TestStream_Alert_WithReset_1(t *testing.T) { }, } case 15: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is OK", Details: "details", @@ -6620,18 +6775,18 @@ stream func TestStream_AlertDuration(t *testing.T) { requestCount := int32(0) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ad := alertservice.AlertData{} + ad := alert.Data{} dec := json.NewDecoder(r.Body) err := dec.Decode(&ad) if err != nil { t.Fatal(err) } atomic.AddInt32(&requestCount, 1) - var expAd alertservice.AlertData + var expAd alert.Data rc := atomic.LoadInt32(&requestCount) switch rc { case 1: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is CRITICAL", Details: "details", @@ -6653,7 +6808,7 @@ func TestStream_AlertDuration(t *testing.T) { }, } case 2: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is WARNING", Details: "details", @@ -6675,7 +6830,7 @@ func TestStream_AlertDuration(t *testing.T) { }, } case 3: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is OK", Details: "details", @@ -6697,7 +6852,7 @@ func TestStream_AlertDuration(t *testing.T) { }, } case 4: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is WARNING", Details: "details", @@ -6719,7 +6874,7 @@ func TestStream_AlertDuration(t *testing.T) { }, } case 5: - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "kapacitor/cpu/serverA", Message: "kapacitor/cpu/serverA is OK", Details: "details", @@ -7029,7 +7184,7 @@ stream testStreamerNoOutput(t, "TestStream_Alert", script, 13*time.Second, nil) exp := []interface{}{ - alertservice.AlertData{ + alert.Data{ ID: "kapacitor.cpu.serverA", Message: "kapacitor.cpu.serverA is CRITICAL", Time: time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), @@ -7292,7 +7447,6 @@ stream t.Error(err) } } - func TestStream_AlertOpsGenie(t *testing.T) { ts := opsgenietest.NewServer() defer ts.Close() @@ -7453,8 +7607,8 @@ stream } } -func TestStream_AlertPost(t *testing.T) { - ts := alerttest.NewPostServer() +func TestStream_AlertHTTPPost(t *testing.T) { + ts := httpposttest.NewAlertServer(nil) defer ts.Close() var script = ` @@ -7479,21 +7633,94 @@ stream testStreamerNoOutput(t, "TestStream_Alert", script, 13*time.Second, nil) exp := []interface{}{ - alertservice.AlertData{ - ID: "kapacitor.cpu.serverA", - Message: "kapacitor.cpu.serverA is CRITICAL", - Time: time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), - Level: alert.Critical, - Data: models.Result{ - Series: models.Rows{ - { - Name: "cpu", - Tags: map[string]string{"host": "serverA"}, - Columns: []string{"time", "count"}, - Values: [][]interface{}{[]interface{}{ - time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), - 10.0, - }}, + httpposttest.AlertRequest{ + MatchingHeaders: true, + Data: alert.Data{ + ID: "kapacitor.cpu.serverA", + Message: "kapacitor.cpu.serverA is CRITICAL", + Time: time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + Level: alert.Critical, + Data: models.Result{ + Series: models.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA"}, + Columns: []string{"time", "count"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 10.0, + }}, + }, + }, + }, + }, + }, + } + + ts.Close() + var got []interface{} + for _, g := range ts.Data() { + got = append(got, g) + } + + if err := compareListIgnoreOrder(got, exp, nil); err != nil { + t.Error(err) + } +} + +func TestStream_AlertHTTPPostEndpoint(t *testing.T) { + headers := map[string]string{"Authorization": "works"} + ts := httpposttest.NewAlertServer(headers) + defer ts.Close() + + var script = ` +stream + |from() + .measurement('cpu') + .where(lambda: "host" == 'serverA') + .groupBy('host') + |window() + .period(10s) + .every(10s) + |count('value') + |alert() + .id('kapacitor.{{ .Name }}.{{ index .Tags "host" }}') + .info(lambda: "count" > 6.0) + .warn(lambda: "count" > 7.0) + .crit(lambda: "count" > 8.0) + .details('') + .post() + .endpoint('test') +` + tmInit := func(tm *kapacitor.TaskMaster) { + c := httppost.Config{} + c.URL = ts.URL + c.Endpoint = "test" + c.Headers = headers + sl := httppost.NewService(httppost.Configs{c}, logService.NewLogger("[test_pushover] ", log.LstdFlags)) + tm.HTTPPostService = sl + } + testStreamerNoOutput(t, "TestStream_Alert", script, 13*time.Second, tmInit) + + exp := []interface{}{ + httpposttest.AlertRequest{ + MatchingHeaders: true, + Data: alert.Data{ + ID: "kapacitor.cpu.serverA", + Message: "kapacitor.cpu.serverA is CRITICAL", + Time: time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + Level: alert.Critical, + Data: models.Result{ + Series: models.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA"}, + Columns: []string{"time", "count"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 10.0, + }}, + }, }, }, }, @@ -7670,7 +7897,7 @@ stream .mode(0644) `, normalPath, modePath) - expAD := []alertservice.AlertData{{ + expAD := []alert.Data{{ ID: "kapacitor.cpu.serverA", Message: "kapacitor.cpu.serverA is CRITICAL", Time: time.Date(1971, 01, 01, 0, 0, 10, 0, time.UTC), @@ -7692,7 +7919,7 @@ stream testStreamerNoOutput(t, "TestStream_Alert", script, 13*time.Second, nil) - testLog := func(name string, expData []alertservice.AlertData, expMode os.FileMode, l *alerttest.Log) error { + testLog := func(name string, expData []alert.Data, expMode os.FileMode, l *alerttest.Log) error { m, err := l.Mode() if err != nil { return err @@ -7740,7 +7967,7 @@ stream .exec('/bin/my-other-script') ` - expAD := alertservice.AlertData{ + expAD := alert.Data{ ID: "kapacitor.cpu.serverA", Message: "kapacitor.cpu.serverA is CRITICAL", Time: time.Date(1971, 01, 01, 0, 0, 10, 0, time.UTC), @@ -8042,17 +8269,17 @@ stream func TestStream_AlertSigma(t *testing.T) { requestCount := int32(0) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ad := alertservice.AlertData{} + ad := alert.Data{} dec := json.NewDecoder(r.Body) err := dec.Decode(&ad) if err != nil { t.Fatal(err) } - var expAd alertservice.AlertData + var expAd alert.Data atomic.AddInt32(&requestCount, 1) rc := atomic.LoadInt32(&requestCount) if rc := atomic.LoadInt32(&requestCount); rc == 1 { - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "cpu:nil", Message: "cpu:nil is INFO", Details: "cpu:nil is INFO", @@ -8074,7 +8301,7 @@ func TestStream_AlertSigma(t *testing.T) { }, } } else { - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "cpu:nil", Message: "cpu:nil is OK", Details: "cpu:nil is OK", @@ -8130,14 +8357,14 @@ func TestStream_AlertComplexWhere(t *testing.T) { requestCount := int32(0) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ad := alertservice.AlertData{} + ad := alert.Data{} dec := json.NewDecoder(r.Body) err := dec.Decode(&ad) if err != nil { t.Fatal(err) } atomic.AddInt32(&requestCount, 1) - expAd := alertservice.AlertData{ + expAd := alert.Data{ ID: "cpu:nil", Message: "cpu:nil is CRITICAL", Details: "", @@ -8209,7 +8436,7 @@ func TestStream_AlertStateChangesOnlyExpired(t *testing.T) { requestCount := int32(0) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ad := alertservice.AlertData{} + ad := alert.Data{} dec := json.NewDecoder(r.Body) err := dec.Decode(&ad) if err != nil { @@ -8217,11 +8444,11 @@ func TestStream_AlertStateChangesOnlyExpired(t *testing.T) { } //We don't care about the data for this test ad.Data = models.Result{} - var expAd alertservice.AlertData + var expAd alert.Data atomic.AddInt32(&requestCount, 1) rc := atomic.LoadInt32(&requestCount) if rc < 6 { - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "cpu:nil", Message: "cpu:nil is CRITICAL", Time: time.Date(1971, 1, 1, 0, 0, int(rc)*2-1, 0, time.UTC), @@ -8229,7 +8456,7 @@ func TestStream_AlertStateChangesOnlyExpired(t *testing.T) { Level: alert.Critical, } } else { - expAd = alertservice.AlertData{ + expAd = alert.Data{ ID: "cpu:nil", Message: "cpu:nil is OK", Time: time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), @@ -9701,6 +9928,7 @@ func testStreamer( tm.HTTPDService = httpdService tm.TaskStore = taskStore{} tm.DeadmanService = deadman{} + tm.HTTPPostService = httppost.NewService(nil, logService.NewLogger("[httppost] ", log.LstdFlags)) as := alertservice.NewService(logService.NewLogger("[alert] ", log.LstdFlags)) as.StorageService = storagetest.New() as.HTTPDService = httpdService diff --git a/pipeline/alert.go b/pipeline/alert.go index 86b9f1537..04b3c9098 100644 --- a/pipeline/alert.go +++ b/pipeline/alert.go @@ -3,6 +3,7 @@ package pipeline import ( "fmt" "reflect" + "strings" "time" "github.com/influxdata/kapacitor/tick/ast" @@ -292,7 +293,7 @@ type AlertNode struct { // Post the JSON alert data to the specified URL. // tick:ignore - PostHandlers []*PostHandler `tick:"Post"` + HTTPPostHandlers []*AlertHTTPPostHandler `tick:"Post"` // Send the JSON alert data to the specified endpoint via TCP. // tick:ignore @@ -379,6 +380,12 @@ func (n *AlertNode) validate() error { return errors.Wrapf(err, "invalid SNMP trap %q", snmp.TrapOid) } } + + for _, post := range n.HTTPPostHandlers { + if err := post.validate(); err != nil { + return errors.Wrap(err, "invalid post") + } + } return nil } @@ -468,23 +475,69 @@ func (a *AlertNode) Flapping(low, high float64) *AlertNode { } // HTTP POST JSON alert data to a specified URL. +// Example with endpoint: +// stream +// |alert() +// .post() +// .endpoint('example') +// +// Example with url: +// stream +// |alert() +// .post('http://example.com') +// // tick:property -func (a *AlertNode) Post(url string) *PostHandler { - post := &PostHandler{ +func (a *AlertNode) Post(urls ...string) *AlertHTTPPostHandler { + post := &AlertHTTPPostHandler{ AlertNode: a, - URL: url, } - a.PostHandlers = append(a.PostHandlers, post) + a.HTTPPostHandlers = append(a.HTTPPostHandlers, post) + + if len(urls) == 0 { + return post + } + + post.URL = urls[0] return post } +// Example: +// stream +// |alert() +// .post() +// .endpoint('example') +// .header('a','b') +// tick:property +func (a *AlertHTTPPostHandler) Header(k, v string) *AlertHTTPPostHandler { + if a.Headers == nil { + a.Headers = map[string]string{} + } + + a.Headers[k] = v + return a +} + // tick:embedded:AlertNode.Post -type PostHandler struct { +type AlertHTTPPostHandler struct { *AlertNode // The POST URL. // tick:ignore URL string + + // Name of the endpoint to be used, as is defined in the configuration file + Endpoint string + + Headers map[string]string `tick:"Header"` +} + +func (a *AlertHTTPPostHandler) validate() error { + for k := range a.Headers { + if strings.ToUpper(k) == "AUTHENTICATE" { + return errors.New("cannot set 'authenticate' header") + } + } + return nil } // Send JSON alert data to a specified address over TCP. diff --git a/pipeline/http_post.go b/pipeline/http_post.go index 232e5b22d..8ecb79a91 100644 --- a/pipeline/http_post.go +++ b/pipeline/http_post.go @@ -1,6 +1,14 @@ package pipeline +import ( + "errors" + "fmt" + "strings" +) + // An HTTPPostNode will take the incoming data stream and POST it to an HTTP endpoint. +// That endpoint may be specified as a positional argument, or as an endpoint property +// method on httpPost. Multiple endpoint property methods may be specified. // // Example: // stream @@ -11,16 +19,99 @@ package pipeline // //Post the top 10 results over the last 10s updated every 5s. // |httpPost('http://example.com/api/top10') // +// Example: +// stream +// |window() +// .period(10s) +// .every(5s) +// |top('value', 10) +// //Post the top 10 results over the last 10s updated every 5s. +// |httpPost() +// .endpoint('example') +// type HTTPPostNode struct { chainnode // tick:ignore - Url string + HTTPPostEndpoints []*HTTPPostEndpoint `tick:"Endpoint"` + + // Headers + Headers map[string]string `tick:"Header"` + + // tick:ignore + URLs []string } -func newHTTPPostNode(wants EdgeType, url string) *HTTPPostNode { +func newHTTPPostNode(wants EdgeType, urls ...string) *HTTPPostNode { return &HTTPPostNode{ chainnode: newBasicChainNode("http_post", wants, wants), - Url: url, + URLs: urls, + } +} + +// tick:ignore +func (p *HTTPPostNode) validate() error { + if len(p.URLs) >= 2 { + return fmt.Errorf("httpPost expects 0 or 1 arguments, got %v", len(p.URLs)) + } + + if len(p.HTTPPostEndpoints) > 1 { + return fmt.Errorf("httpPost expects 0 or 1 endpoints, got %v", len(p.HTTPPostEndpoints)) + } + + if len(p.URLs) == 0 && len(p.HTTPPostEndpoints) == 0 { + return errors.New("must provide url or endpoint") + } + + if len(p.URLs) > 0 && len(p.HTTPPostEndpoints) > 0 { + return errors.New("only one endpoint and url may be specified") } + + for k := range p.Headers { + if strings.ToUpper(k) == "AUTHENTICATE" { + return errors.New("cannot set 'authenticate' header") + } + } + + return nil +} + +// Example: +// stream +// |httpPost() +// .endpoint('example') +// +// tick:property +func (p *HTTPPostNode) Endpoint(endpoint string) *HTTPPostEndpoint { + post := &HTTPPostEndpoint{ + HTTPPostNode: p, + Endpoint: endpoint, + } + p.HTTPPostEndpoints = append(p.HTTPPostEndpoints, post) + + return post +} + +// Example: +// stream +// |httpPost() +// .endpoint('example') +// .header('my', 'header') +// +// tick:property +func (p *HTTPPostNode) Header(k, v string) *HTTPPostNode { + if p.Headers == nil { + p.Headers = map[string]string{} + } + p.Headers[k] = v + + return p +} + +// tick:embedded:HTTPPostNode.Endpoint +type HTTPPostEndpoint struct { + *HTTPPostNode + + // Name of the endpoint to be used, as is defined in the configuration file + Endpoint string } diff --git a/pipeline/node.go b/pipeline/node.go index 684432503..e50daec80 100644 --- a/pipeline/node.go +++ b/pipeline/node.go @@ -327,8 +327,10 @@ func (n *chainnode) HttpOut(endpoint string) *HTTPOutNode { } // Creates an HTTP Post node that POSTS received data to the provided HTTP endpoint. -func (n *chainnode) HttpPost(url string) *HTTPPostNode { - h := newHTTPPostNode(n.provides, url) +// HttpPost expects 0 or 1 arguments. If 0 arguments are provided, you must specify an +// endpoint property method. +func (n *chainnode) HttpPost(url ...string) *HTTPPostNode { + h := newHTTPPostNode(n.provides, url...) n.linkChild(h) return h } diff --git a/server/config.go b/server/config.go index 035781ae7..5408e0ec4 100644 --- a/server/config.go +++ b/server/config.go @@ -23,6 +23,7 @@ import ( "github.com/influxdata/kapacitor/services/gce" "github.com/influxdata/kapacitor/services/hipchat" "github.com/influxdata/kapacitor/services/httpd" + "github.com/influxdata/kapacitor/services/httppost" "github.com/influxdata/kapacitor/services/influxdb" "github.com/influxdata/kapacitor/services/k8s" "github.com/influxdata/kapacitor/services/logging" @@ -77,6 +78,7 @@ type Config struct { OpsGenie opsgenie.Config `toml:"opsgenie" override:"opsgenie"` PagerDuty pagerduty.Config `toml:"pagerduty" override:"pagerduty"` Pushover pushover.Config `toml:"pushover" override:"pushover"` + HTTPPost httppost.Configs `toml:"httppost" override:"httppost,element-key=endpoint"` SMTP smtp.Config `toml:"smtp" override:"smtp"` SNMPTrap snmptrap.Config `toml:"snmptrap" override:"snmptrap"` Sensu sensu.Config `toml:"sensu" override:"sensu"` @@ -138,6 +140,7 @@ func NewConfig() *Config { c.OpsGenie = opsgenie.NewConfig() c.PagerDuty = pagerduty.NewConfig() c.Pushover = pushover.NewConfig() + c.HTTPPost = httppost.Configs{} c.SMTP = smtp.NewConfig() c.Sensu = sensu.NewConfig() c.Slack = slack.NewConfig() @@ -250,6 +253,9 @@ func (c *Config) Validate() error { if err := c.Pushover.Validate(); err != nil { return err } + if err := c.HTTPPost.Validate(); err != nil { + return err + } if err := c.SMTP.Validate(); err != nil { return err } @@ -360,6 +366,72 @@ func (c *Config) ApplyEnvOverrides() error { return c.applyEnvOverrides("KAPACITOR", "", reflect.ValueOf(c)) } +func (c *Config) applyEnvOverridesToMap(prefix string, fieldDesc string, mapValue, key, spec reflect.Value) error { + // If we have a pointer, dereference it + s := spec + if spec.Kind() == reflect.Ptr { + s = spec.Elem() + } + + var value string + + if s.Kind() != reflect.Struct { + value = os.Getenv(prefix) + // Skip any fields we don't have a value to set + if value == "" { + return nil + } + + if fieldDesc != "" { + fieldDesc = " to " + fieldDesc + } + } + + switch s.Kind() { + case reflect.String: + mapValue.SetMapIndex(key, reflect.ValueOf(value)) + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + + var intValue int64 + + // Handle toml.Duration + if s.Type().Name() == "Duration" { + dur, err := time.ParseDuration(value) + if err != nil { + return fmt.Errorf("failed to apply %v%v using type %v and value '%v'", prefix, fieldDesc, s.Type().String(), value) + } + intValue = dur.Nanoseconds() + } else { + var err error + intValue, err = strconv.ParseInt(value, 0, s.Type().Bits()) + if err != nil { + return fmt.Errorf("failed to apply %v%v using type %v and value '%v'", prefix, fieldDesc, s.Type().String(), value) + } + } + + mapValue.SetMapIndex(key, reflect.ValueOf(intValue)) + case reflect.Bool: + boolValue, err := strconv.ParseBool(value) + if err != nil { + return fmt.Errorf("failed to apply %v%v using type %v and value '%v'", prefix, fieldDesc, s.Type().String(), value) + + } + mapValue.SetMapIndex(key, reflect.ValueOf(boolValue)) + case reflect.Float32, reflect.Float64: + floatValue, err := strconv.ParseFloat(value, s.Type().Bits()) + if err != nil { + return fmt.Errorf("failed to apply %v%v using type %v and value '%v'", prefix, fieldDesc, s.Type().String(), value) + + } + mapValue.SetMapIndex(key, reflect.ValueOf(floatValue)) + case reflect.Struct: + if err := c.applyEnvOverridesToStruct(prefix, s); err != nil { + return err + } + } + return nil +} + func (c *Config) applyEnvOverrides(prefix string, fieldDesc string, spec reflect.Value) error { // If we have a pointer, dereference it s := spec @@ -453,6 +525,12 @@ func (c *Config) applyEnvOverridesToStruct(prefix string, s reflect.Value) error return err } } + } else if f.Kind() == reflect.Map { + for _, k := range f.MapKeys() { + if err := c.applyEnvOverridesToMap(fmt.Sprintf("%s_%v", key, k), fieldName, f, k, f.MapIndex(k)); err != nil { + return err + } + } } else if err := c.applyEnvOverrides(key, fieldName, f); err != nil { return err } diff --git a/server/config_test.go b/server/config_test.go index cecb022c5..8355e008a 100644 --- a/server/config_test.go +++ b/server/config_test.go @@ -43,6 +43,9 @@ dir = "/tmp/replay" [storage] boltdb = "/tmp/kapacitor.db" + +[[httppost]] +headers = { Authorization = "your-key" } `, &c); err != nil { t.Fatal(err) } @@ -59,6 +62,10 @@ boltdb = "/tmp/kapacitor.db" t.Fatalf("failed to set env var: %v", err) } + if err := os.Setenv("KAPACITOR_HTTPPOST_0_HEADERS_Authorization", "my-key"); err != nil { + t.Fatalf("failed to set env var: %v", err) + } + if err := c.ApplyEnvOverrides(); err != nil { t.Fatalf("failed to apply env overrides: %v", err) } @@ -70,5 +77,7 @@ boltdb = "/tmp/kapacitor.db" t.Fatalf("unexpected storage boltdb-path: %s", c.Storage.BoltDBPath) } else if c.InfluxDB[0].URLs[0] != "http://localhost:18086" { t.Fatalf("unexpected url 0: %s", c.InfluxDB[0].URLs[0]) + } else if c.HTTPPost[0].Headers["Authorization"] != "my-key" { + t.Fatalf("unexpected header Authorization: %s", c.InfluxDB[0].URLs[0]) } } diff --git a/server/server.go b/server/server.go index fef0e8021..1bf98b357 100644 --- a/server/server.go +++ b/server/server.go @@ -32,6 +32,7 @@ import ( "github.com/influxdata/kapacitor/services/gce" "github.com/influxdata/kapacitor/services/hipchat" "github.com/influxdata/kapacitor/services/httpd" + "github.com/influxdata/kapacitor/services/httppost" "github.com/influxdata/kapacitor/services/influxdb" "github.com/influxdata/kapacitor/services/k8s" "github.com/influxdata/kapacitor/services/logging" @@ -213,6 +214,7 @@ func New(c *Config, buildInfo BuildInfo, logService logging.Interface) (*Server, s.appendOpsGenieService() s.appendPagerDutyService() s.appendPushoverService() + s.appendHTTPPostService() s.appendSMTPService() s.appendTelegramService() if err := s.appendSlackService(); err != nil { @@ -501,6 +503,18 @@ func (s *Server) appendPushoverService() { s.AppendService("pushover", srv) } +func (s *Server) appendHTTPPostService() { + c := s.config.HTTPPost + l := s.LogService.NewLogger("[httppost] ", log.LstdFlags) + srv := httppost.NewService(c, l) + + s.TaskMaster.HTTPPostService = srv + s.AlertService.HTTPPostService = srv + + s.SetDynamicService("httppost", srv) + s.AppendService("httppost", srv) +} + func (s *Server) appendSensuService() { c := s.config.Sensu l := s.LogService.NewLogger("[sensu] ", log.LstdFlags) diff --git a/server/server_test.go b/server/server_test.go index 2a51249dd..e4aa4736f 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -31,10 +31,10 @@ import ( "github.com/influxdata/kapacitor/command/commandtest" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/server" - alertservice "github.com/influxdata/kapacitor/services/alert" "github.com/influxdata/kapacitor/services/alert/alerttest" "github.com/influxdata/kapacitor/services/alerta/alertatest" "github.com/influxdata/kapacitor/services/hipchat/hipchattest" + "github.com/influxdata/kapacitor/services/httppost" "github.com/influxdata/kapacitor/services/k8s" "github.com/influxdata/kapacitor/services/opsgenie" "github.com/influxdata/kapacitor/services/opsgenie/opsgenietest" @@ -4251,7 +4251,7 @@ func TestServer_RecordReplayQuery_Missing(t *testing.T) { t.Fatal(err) } defer f.Close() - exp := []alertservice.AlertData{ + exp := []alert.Data{ { ID: "test-stream-query", Message: "test-stream-query is CRITICAL", @@ -4495,9 +4495,9 @@ func TestServer_RecordReplayQuery_Missing(t *testing.T) { }, } dec := json.NewDecoder(f) - var got []alertservice.AlertData + var got []alert.Data for dec.More() { - g := alertservice.AlertData{} + g := alert.Data{} dec.Decode(&g) got = append(got, g) } @@ -5969,6 +5969,99 @@ func TestServer_UpdateConfig(t *testing.T) { }, }, }, + { + section: "httppost", + element: "test", + setDefaults: func(c *server.Config) { + apc := httppost.Config{ + Endpoint: "test", + URL: "http://httppost.example.com", + Headers: map[string]string{ + "testing": "works", + }, + } + c.HTTPPost = httppost.Configs{apc} + }, + expDefaultSection: client.ConfigSection{ + Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/config/httppost"}, + Elements: []client.ConfigElement{ + { + Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/config/httppost/test"}, + Options: map[string]interface{}{ + "endpoint": "test", + "url": "http://httppost.example.com", + "headers": map[string]interface{}{ + "testing": "works", + }, + "basic-auth": false, + }, + Redacted: []string{ + "basic-auth", + }}, + }, + }, + expDefaultElement: client.ConfigElement{ + Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/config/httppost/test"}, + Options: map[string]interface{}{ + "endpoint": "test", + "url": "http://httppost.example.com", + "headers": map[string]interface{}{ + "testing": "works", + }, + "basic-auth": false, + }, + Redacted: []string{ + "basic-auth", + }, + }, + updates: []updateAction{ + { + element: "test", + updateAction: client.ConfigUpdateAction{ + Set: map[string]interface{}{ + "headers": map[string]string{ + "testing": "more", + }, + "basic-auth": httppost.BasicAuth{ + Username: "usr", + Password: "pass", + }, + }, + }, + expSection: client.ConfigSection{ + Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/config/httppost"}, + Elements: []client.ConfigElement{{ + Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/config/httppost/test"}, + Options: map[string]interface{}{ + "endpoint": "test", + "url": "http://httppost.example.com", + "headers": map[string]interface{}{ + "testing": "more", + }, + "basic-auth": true, + }, + Redacted: []string{ + "basic-auth", + }, + }}, + }, + expElement: client.ConfigElement{ + Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/config/httppost/test"}, + Options: map[string]interface{}{ + "endpoint": "test", + "url": "http://httppost.example.com", + "headers": map[string]interface{}{ + "testing": "more", + }, + "basic-auth": true, + }, + Redacted: []string{ + "basic-auth", + }, + }, + }, + }, + }, { section: "pushover", setDefaults: func(c *server.Config) { @@ -7108,6 +7201,15 @@ func TestServer_ListServiceTests(t *testing.T) { "level": "CRITICAL", }, }, + { + Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/service-tests/httppost"}, + Name: "httppost", + Options: client.ServiceTestOptions{ + "endpoint": "example", + "url": "http://localhost:3000/", + "headers": map[string]interface{}{"Auth": "secret"}, + }, + }, { Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/service-tests/influxdb"}, Name: "influxdb", @@ -7677,7 +7779,7 @@ func TestServer_AlertHandlers(t *testing.T) { resultJSON := `{"series":[{"name":"alert","columns":["time","value"],"values":[["1970-01-01T00:00:00Z",1]]}]}` - alertData := alertservice.AlertData{ + alertData := alert.Data{ ID: "id", Message: "message", Details: "details", @@ -7843,7 +7945,7 @@ func TestServer_AlertHandlers(t *testing.T) { tdir := ctxt.Value("tdir").(string) defer os.RemoveAll(tdir) l := ctxt.Value("log").(*alerttest.Log) - expData := []alertservice.AlertData{alertData} + expData := []alert.Data{alertData} expMode := os.FileMode(0604) m, err := l.Mode() @@ -7994,7 +8096,7 @@ func TestServer_AlertHandlers(t *testing.T) { result: func(ctxt context.Context) error { ts := ctxt.Value("server").(*alerttest.PostServer) ts.Close() - exp := []alertservice.AlertData{alertData} + exp := []alert.Data{alertData} got := ts.Data() if !reflect.DeepEqual(exp, got) { return fmt.Errorf("unexpected post request:\nexp\n%+v\ngot\n%+v\n", exp, got) @@ -8245,7 +8347,7 @@ func TestServer_AlertHandlers(t *testing.T) { result: func(ctxt context.Context) error { ts := ctxt.Value("server").(*alerttest.TCPServer) ts.Close() - exp := []alertservice.AlertData{alertData} + exp := []alert.Data{alertData} got := ts.Data() if !reflect.DeepEqual(exp, got) { return fmt.Errorf("unexpected tcp request:\nexp\n%+v\ngot\n%+v\n", exp, got) @@ -8594,7 +8696,7 @@ alert value=2 0000000000002 time.Sleep(110 * time.Millisecond) // Check TCP handler got event - alertData := alertservice.AlertData{ + alertData := alert.Data{ ID: "id-agg", Message: "Received 3 events in the last 100ms.", Details: "message\nmessage\nmessage", @@ -8631,7 +8733,7 @@ alert value=2 0000000000002 }, } ts.Close() - exp := []alertservice.AlertData{alertData} + exp := []alert.Data{alertData} got := ts.Data() if !reflect.DeepEqual(exp, got) { t.Errorf("unexpected tcp request:\nexp\n%+v\ngot\n%+v\n", exp, got) @@ -8739,7 +8841,7 @@ stream s.Restart() // Check TCP handler got event - alertData := alertservice.AlertData{ + alertData := alert.Data{ ID: "id", Message: "message", Details: "details", @@ -8759,7 +8861,7 @@ stream }, } ts.Close() - exp := []alertservice.AlertData{alertData} + exp := []alert.Data{alertData} got := ts.Data() if !reflect.DeepEqual(exp, got) { t.Errorf("unexpected tcp request:\nexp\n%+v\ngot\n%+v\n", exp, got) @@ -8859,7 +8961,7 @@ alert,host=serverB value=0 0000000004 s.Restart() - alertData := alertservice.AlertData{ + alertData := alert.Data{ ID: "id", Message: "message", Details: "details", @@ -8880,7 +8982,7 @@ alert,host=serverB value=0 0000000004 }, } ts.Close() - exp := []alertservice.AlertData{alertData} + exp := []alert.Data{alertData} got := ts.Data() if !reflect.DeepEqual(exp, got) { t.Errorf("unexpected tcp request:\nexp\n%+v\ngot\n%+v\n", exp, got) diff --git a/services/alert/alerttest/alerttest.go b/services/alert/alerttest/alerttest.go index 353c7dcb4..ff8e0f973 100644 --- a/services/alert/alerttest/alerttest.go +++ b/services/alert/alerttest/alerttest.go @@ -8,9 +8,9 @@ import ( "os" "sync" + "github.com/influxdata/kapacitor/alert" "github.com/influxdata/kapacitor/command" "github.com/influxdata/kapacitor/command/commandtest" - alertservice "github.com/influxdata/kapacitor/services/alert" ) type Log struct { @@ -23,16 +23,16 @@ func NewLog(p string) *Log { } } -func (l *Log) Data() ([]alertservice.AlertData, error) { +func (l *Log) Data() ([]alert.Data, error) { f, err := os.Open(l.path) if err != nil { return nil, err } defer f.Close() dec := json.NewDecoder(f) - var data []alertservice.AlertData + var data []alert.Data for dec.More() { - ad := alertservice.AlertData{} + ad := alert.Data{} err := dec.Decode(&ad) if err != nil { return nil, err @@ -72,7 +72,7 @@ type TCPServer struct { l *net.TCPListener - data []alertservice.AlertData + data []alert.Data wg sync.WaitGroup closed bool @@ -99,7 +99,7 @@ func NewTCPServer() (*TCPServer, error) { return s, nil } -func (s *TCPServer) Data() []alertservice.AlertData { +func (s *TCPServer) Data() []alert.Data { return s.data } @@ -120,7 +120,7 @@ func (s *TCPServer) run() { } func() { defer conn.Close() - ad := alertservice.AlertData{} + ad := alert.Data{} json.NewDecoder(conn).Decode(&ad) s.data = append(s.data, ad) }() @@ -130,14 +130,14 @@ func (s *TCPServer) run() { type PostServer struct { ts *httptest.Server URL string - data []alertservice.AlertData + data []alert.Data closed bool } func NewPostServer() *PostServer { s := new(PostServer) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ad := alertservice.AlertData{} + ad := alert.Data{} dec := json.NewDecoder(r.Body) dec.Decode(&ad) s.data = append(s.data, ad) @@ -147,7 +147,7 @@ func NewPostServer() *PostServer { return s } -func (s *PostServer) Data() []alertservice.AlertData { +func (s *PostServer) Data() []alert.Data { return s.data } diff --git a/services/alert/handlers.go b/services/alert/handlers.go index e87e38343..37be2fed3 100644 --- a/services/alert/handlers.go +++ b/services/alert/handlers.go @@ -17,36 +17,11 @@ import ( "github.com/influxdata/kapacitor/alert" "github.com/influxdata/kapacitor/bufpool" "github.com/influxdata/kapacitor/command" - "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/tick/ast" "github.com/influxdata/kapacitor/tick/stateful" "github.com/pkg/errors" ) -// AlertData is a structure that contains relevant data about an alert event. -// The structure is intended to be JSON encoded, providing a consistent data format. -type AlertData struct { - ID string `json:"id"` - Message string `json:"message"` - Details string `json:"details"` - Time time.Time `json:"time"` - Duration time.Duration `json:"duration"` - Level alert.Level `json:"level"` - Data models.Result `json:"data"` -} - -func alertDataFromEvent(event alert.Event) AlertData { - return AlertData{ - ID: event.State.ID, - Message: event.State.Message, - Details: event.State.Details, - Time: event.State.Time, - Duration: event.State.Duration, - Level: event.State.Level, - Data: event.Data.Result, - } -} - // Default log mode for file const defaultLogFileMode = 0600 @@ -89,7 +64,7 @@ func NewLogHandler(c LogHandlerConfig, l *log.Logger) (alert.Handler, error) { } func (h *logHandler) Handle(event alert.Event) { - ad := alertDataFromEvent(event) + ad := event.AlertData() f, err := os.OpenFile(h.logpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, h.mode) if err != nil { @@ -133,7 +108,7 @@ func NewExecHandler(c ExecHandlerConfig, l *log.Logger) alert.Handler { func (h *execHandler) Handle(event alert.Event) { buf := h.bp.Get() defer h.bp.Put(buf) - ad := alertDataFromEvent(event) + ad := event.AlertData() err := json.NewEncoder(buf).Encode(ad) if err != nil { @@ -179,7 +154,7 @@ func NewTCPHandler(c TCPHandlerConfig, l *log.Logger) alert.Handler { func (h *tcpHandler) Handle(event alert.Event) { buf := h.bp.Get() defer h.bp.Put(buf) - ad := alertDataFromEvent(event) + ad := event.AlertData() err := json.NewEncoder(buf).Encode(ad) if err != nil { @@ -219,7 +194,7 @@ func NewPostHandler(c PostHandlerConfig, l *log.Logger) alert.Handler { func (h *postHandler) Handle(event alert.Event) { body := h.bp.Get() defer h.bp.Put(body) - ad := alertDataFromEvent(event) + ad := event.AlertData() err := json.NewEncoder(body).Encode(ad) if err != nil { diff --git a/services/alert/service.go b/services/alert/service.go index 016c28633..536e74c5b 100644 --- a/services/alert/service.go +++ b/services/alert/service.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/kapacitor/services/alerta" "github.com/influxdata/kapacitor/services/hipchat" "github.com/influxdata/kapacitor/services/httpd" + "github.com/influxdata/kapacitor/services/httppost" "github.com/influxdata/kapacitor/services/opsgenie" "github.com/influxdata/kapacitor/services/pagerduty" "github.com/influxdata/kapacitor/services/pushover" @@ -73,6 +74,9 @@ type Service struct { PushoverService interface { Handler(pushover.HandlerConfig, *log.Logger) alert.Handler } + HTTPPostService interface { + Handler(httppost.HandlerConfig, *log.Logger) alert.Handler + } SensuService interface { Handler(sensu.HandlerConfig, *log.Logger) (alert.Handler, error) } diff --git a/services/config/override/doc.go b/services/config/override/doc.go index 3a201d205..f928758b6 100644 --- a/services/config/override/doc.go +++ b/services/config/override/doc.go @@ -5,11 +5,14 @@ These fields may either be a struct or a slice of structs. As such a section consists of a list of elements. In the case where the field is a struct and not a slice, the section list always contains one element. Further nested levels may exist but Overrider will not interact with them directly. +If a nested field is a struct, then github.com/mitchellh/mapstructure will be used to decode the map into the struct. In order for a section to be overridden an `override` struct tag must be present. The `override` tag defines a name for the section and option. Struct tags can be used to mark options as redacted by adding a `,redact` to the end of the `override` tag value. + + Example: type SectionAConfig struct { Option string `override:"option"` diff --git a/services/config/override/override.go b/services/config/override/override.go index 7c9e2fc50..2e8289022 100644 --- a/services/config/override/override.go +++ b/services/config/override/override.go @@ -9,6 +9,7 @@ import ( "strings" "github.com/mitchellh/copystructure" + "github.com/mitchellh/mapstructure" "github.com/mitchellh/reflectwalk" "github.com/pkg/errors" ) @@ -438,6 +439,19 @@ func weakCopyValue(dst, src reflect.Value) (err error) { } return fmt.Errorf("cannot convert string %q into %s", str, dstK) } + } else if dstK == reflect.Struct && srcK == reflect.Map { + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + ErrorUnused: true, + Result: addrDst.Interface(), + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + }) + if err != nil { + return errors.Wrap(err, "failed to initialize mapstructure decoder") + } + if err := dec.Decode(src.Interface()); err != nil { + return errors.Wrapf(err, "failed to decode options into %s", addrDst.Type()) + } + return nil } else { return fmt.Errorf("wrong kind %s, expected value of kind %s: %t", srcK, dstK, srcK == dstK) } @@ -607,9 +621,8 @@ func isZero(v reflect.Value) bool { // Check structs recusively since not all of its field may be comparable z := true for i := 0; i < v.NumField() && z; i++ { - if f := v.Field(i); f.CanSet() { - z = z && isZero(f) - } + f := v.Field(i) + z = z && isZero(f) } return z default: diff --git a/services/config/override/override_internal_test.go b/services/config/override/override_internal_test.go index 69d15810e..9e34d4523 100644 --- a/services/config/override/override_internal_test.go +++ b/services/config/override/override_internal_test.go @@ -47,3 +47,49 @@ func TestGetSectionName(t *testing.T) { } } } + +func Test_isZero(t *testing.T) { + tt := []struct { + exp bool + value struct { + X string + Y int + } + }{ + { + exp: true, + value: struct { + X string + Y int + }{}, + }, + { + exp: false, + value: struct { + X string + Y int + }{X: "hello"}, + }, + { + exp: false, + value: struct { + X string + Y int + }{Y: 10}, + }, + { + exp: false, + value: struct { + X string + Y int + }{X: "hello", Y: 10}, + }, + } + + for _, tst := range tt { + if got, exp := isZero(reflect.ValueOf(tst.value)), tst.exp; got != exp { + t.Errorf("unexpected result for isZero of %v got %t exp %t", tst.value, got, exp) + } + } + +} diff --git a/services/httppost/config.go b/services/httppost/config.go new file mode 100644 index 000000000..8e7578d72 --- /dev/null +++ b/services/httppost/config.go @@ -0,0 +1,77 @@ +package httppost + +import ( + "net/url" + + "github.com/pkg/errors" +) + +type BasicAuth struct { + Username string `toml:"username" json:"username"` + Password string `toml:"password" json:"password"` +} + +func (b BasicAuth) valid() bool { + return b.Username != "" && b.Password != "" +} + +func (b BasicAuth) validate() error { + if !b.valid() { + return errors.New("basic-auth must set both \"username\" and \"password\" parameters") + } + + return nil +} + +// Config is the configuration for a single [[httppost]] section of the kapacitor +// configuration file. +type Config struct { + Endpoint string `toml:"endpoint" override:"endpoint"` + URL string `toml:"url" override:"url"` + Headers map[string]string `toml:"headers" override:"headers"` + BasicAuth BasicAuth `toml:"basic-auth" override:"basic-auth,redact"` +} + +// Validate ensures that all configurations options are valid. The Endpoint, +// and URL parameters must be set to be considered valid. +func (c Config) Validate() error { + if c.Endpoint == "" { + return errors.New("must specify endpoint name") + } + + if c.URL == "" { + return errors.New("must specify url") + } + + if _, err := url.Parse(c.URL); err != nil { + return errors.Wrapf(err, "invalid URL %q", c.URL) + } + + return nil +} + +// Configs is the configuration for all [[alertpost]] sections of the kapacitor +// configuration file. +type Configs []Config + +// Validate calls config.Validate for each element in Configs +func (cs Configs) Validate() error { + for _, c := range cs { + err := c.Validate() + if err != nil { + return err + } + } + return nil +} + +// index generates a map from config.Endpoint to config +func (cs Configs) index() map[string]*Endpoint { + m := map[string]*Endpoint{} + + for _, c := range cs { + m[c.Endpoint] = NewEndpoint(c.URL, c.Headers, c.BasicAuth) + } + + return m +} diff --git a/services/httppost/httpposttest/server.go b/services/httppost/httpposttest/server.go new file mode 100644 index 000000000..7a51b62be --- /dev/null +++ b/services/httppost/httpposttest/server.go @@ -0,0 +1,53 @@ +package httpposttest + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + + "github.com/influxdata/kapacitor/alert" +) + +type AlertServer struct { + ts *httptest.Server + URL string + data []AlertRequest + closed bool +} + +func NewAlertServer(headers map[string]string) *AlertServer { + s := new(AlertServer) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + req := AlertRequest{MatchingHeaders: true} + for k, v := range headers { + nv := r.Header.Get(k) + if nv != v { + req.MatchingHeaders = false + } + } + req.Data = alert.Data{} + dec := json.NewDecoder(r.Body) + dec.Decode(&req.Data) + s.data = append(s.data, req) + })) + s.ts = ts + s.URL = ts.URL + return s +} + +type AlertRequest struct { + MatchingHeaders bool + Data alert.Data +} + +func (s *AlertServer) Data() []AlertRequest { + return s.data +} + +func (s *AlertServer) Close() { + if s.closed { + return + } + s.closed = true + s.ts.Close() +} diff --git a/services/httppost/service.go b/services/httppost/service.go new file mode 100644 index 000000000..8a14088f0 --- /dev/null +++ b/services/httppost/service.go @@ -0,0 +1,254 @@ +package httppost + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net/http" + "sync" + + "github.com/influxdata/kapacitor/alert" + "github.com/influxdata/kapacitor/bufpool" +) + +// Only one of name and url should be non-empty +type Endpoint struct { + mu sync.RWMutex + url string + headers map[string]string + auth BasicAuth + closed bool +} + +func NewEndpoint(url string, headers map[string]string, auth BasicAuth) *Endpoint { + return &Endpoint{ + url: url, + headers: headers, + auth: auth, + } +} +func (e *Endpoint) Close() { + e.mu.Lock() + defer e.mu.Unlock() + e.closed = true + return +} + +func (e *Endpoint) Update(c Config) { + e.mu.Lock() + defer e.mu.Unlock() + e.url = c.URL + e.headers = c.Headers + e.auth = c.BasicAuth +} + +func (e *Endpoint) NewHTTPRequest(body io.Reader) (req *http.Request, err error) { + e.mu.RLock() + defer e.mu.RUnlock() + if e.closed { + return nil, errors.New("endpoint was closed") + } + + req, err = http.NewRequest("POST", e.url, body) + if err != nil { + return nil, fmt.Errorf("failed to create POST request: %v", err) + } + + if e.auth.valid() { + req.SetBasicAuth(e.auth.Username, e.auth.Password) + } + + for k, v := range e.headers { + req.Header.Add(k, v) + } + + return req, nil +} + +type Service struct { + mu sync.RWMutex + endpoints map[string]*Endpoint + logger *log.Logger +} + +func NewService(c Configs, l *log.Logger) *Service { + s := &Service{ + logger: l, + endpoints: c.index(), + } + return s +} + +func (s *Service) Endpoint(name string) (*Endpoint, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + + e, ok := s.endpoints[name] + return e, ok +} + +func (s *Service) Update(newConfigs []interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + + endpointSet := map[string]bool{} + + for _, nc := range newConfigs { + if c, ok := nc.(Config); ok { + if err := c.Validate(); err != nil { + return err + } + e, ok := s.endpoints[c.Endpoint] + if !ok { + s.endpoints[c.Endpoint] = NewEndpoint(c.URL, c.Headers, c.BasicAuth) + continue + } + e.Update(c) + + endpointSet[c.Endpoint] = true + } else { + return fmt.Errorf("unexpected config object type, got %T exp %T", nc, c) + } + } + + // Find any deleted endpoints + for name, endpoint := range s.endpoints { + if !endpointSet[name] { + endpoint.Close() + delete(s.endpoints, name) + } + } + + return nil +} + +func (s *Service) Open() error { + return nil +} + +func (s *Service) Close() error { + return nil +} + +type testOptions struct { + Endpoint string `json:"endpoint"` + URL string `json:"url"` + Headers map[string]string `json:"headers"` +} + +func (s *Service) TestOptions() interface{} { + return &testOptions{ + Endpoint: "example", + URL: "http://localhost:3000/", + Headers: map[string]string{"Auth": "secret"}, + } +} + +func (s *Service) Test(options interface{}) error { + var err error + o, ok := options.(*testOptions) + if !ok { + return fmt.Errorf("unexpected options type %t", options) + } + + event := alert.Event{} + body := bytes.NewBuffer(nil) + ad := event.AlertData() + + err = json.NewEncoder(body).Encode(ad) + if err != nil { + return fmt.Errorf("failed to marshal alert data json: %v", err) + } + + // Create the HTTP request + var req *http.Request + e := &Endpoint{ + url: o.URL, + headers: o.Headers, + } + req, err = e.NewHTTPRequest(body) + + // Execute the request + req.Header.Set("Content-Type", "application/json") + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("failed to POST alert data: %v", err) + } + resp.Body.Close() + return nil +} + +type HandlerConfig struct { + URL string `mapstructure:"url"` + Endpoint string `mapstructure:"endpoint"` + Headers map[string]string `mapstructure:"headers"` +} + +type handler struct { + s *Service + bp *bufpool.Pool + endpoint *Endpoint + logger *log.Logger + headers map[string]string +} + +func (s *Service) Handler(c HandlerConfig, l *log.Logger) alert.Handler { + e, ok := s.Endpoint(c.Endpoint) + if !ok { + e = NewEndpoint(c.URL, nil, BasicAuth{}) + } + + return &handler{ + s: s, + bp: bufpool.New(), + endpoint: e, + logger: l, + headers: c.Headers, + } +} + +func (h *handler) NewHTTPRequest(body io.Reader) (req *http.Request, err error) { + req, err = h.endpoint.NewHTTPRequest(body) + if err != nil { + return + } + + for k, v := range h.headers { + req.Header.Set(k, v) + } + + return +} + +func (h *handler) Handle(event alert.Event) { + var err error + + // Construct the body of the HTTP request + body := h.bp.Get() + defer h.bp.Put(body) + ad := event.AlertData() + + err = json.NewEncoder(body).Encode(ad) + if err != nil { + h.logger.Printf("E! failed to marshal alert data json: %v", err) + return + } + + req, err := h.NewHTTPRequest(body) + if err != nil { + h.logger.Printf("E! fail to create HTTP request: %v", err) + return + } + + // Execute the request + req.Header.Set("Content-Type", "application/json") + resp, err := http.DefaultClient.Do(req) + if err != nil { + h.logger.Printf("E! failed to POST alert data: %v", err) + return + } + resp.Body.Close() +} diff --git a/task_master.go b/task_master.go index 957af6f8b..25e1bc5d3 100644 --- a/task_master.go +++ b/task_master.go @@ -18,6 +18,7 @@ import ( "github.com/influxdata/kapacitor/services/alerta" "github.com/influxdata/kapacitor/services/hipchat" "github.com/influxdata/kapacitor/services/httpd" + "github.com/influxdata/kapacitor/services/httppost" k8s "github.com/influxdata/kapacitor/services/k8s/client" "github.com/influxdata/kapacitor/services/opsgenie" "github.com/influxdata/kapacitor/services/pagerduty" @@ -102,6 +103,10 @@ type TaskMaster struct { PushoverService interface { Handler(pushover.HandlerConfig, *log.Logger) alert.Handler } + HTTPPostService interface { + Handler(httppost.HandlerConfig, *log.Logger) alert.Handler + Endpoint(string) (*httppost.Endpoint, bool) + } SlackService interface { Global() bool StateChangesOnly() bool