Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add httppost service (add headers to POST alerts) #1254

Merged
merged 1 commit into from
May 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased [unreleased]

### Features

- [#117](https://github.com/influxdata/kapacitor/issues/117): Add headers to alert POST requests.

### Bugfixes

- [#1294](https://github.com/influxdata/kapacitor/issues/1294): Fix bug where batch queries would be missing all fields after the first nil field.
Expand Down
20 changes: 11 additions & 9 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/influxdata/kapacitor/pipeline"
alertservice "github.com/influxdata/kapacitor/services/alert"
"github.com/influxdata/kapacitor/services/hipchat"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/opsgenie"
"github.com/influxdata/kapacitor/services/pagerduty"
"github.com/influxdata/kapacitor/services/pushover"
Expand Down Expand Up @@ -130,15 +131,6 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
return nil, err
}

// Construct alert handlers
for _, post := range n.PostHandlers {
c := alertservice.PostHandlerConfig{
URL: post.URL,
}
h := alertservice.NewPostHandler(c, l)
an.handlers = append(an.handlers, h)
}

for _, tcp := range n.TcpHandlers {
c := alertservice.TCPHandlerConfig{
Address: tcp.Address,
Expand Down Expand Up @@ -363,6 +355,16 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
an.handlers = append(an.handlers, h)
}

for _, p := range n.HTTPPostHandlers {
c := httppost.HandlerConfig{
URL: p.URL,
Endpoint: p.Endpoint,
Headers: p.Headers,
}
h := et.tm.HTTPPostService.Handler(c, l)
an.handlers = append(an.handlers, h)
}

for _, og := range n.OpsGenieHandlers {
c := opsgenie.HandlerConfig{
TeamsList: og.TeamsList,
Expand Down
24 changes: 24 additions & 0 deletions alert/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ type Event struct {
previousState EventState
}

func (e Event) AlertData() Data {
return Data{
ID: e.State.ID,
Message: e.State.Message,
Details: e.State.Details,
Time: e.State.Time,
Duration: e.State.Duration,
Level: e.State.Level,
Data: e.Data.Result,
}
}

func (e Event) PreviousState() EventState {
return e.previousState
}
Expand Down Expand Up @@ -154,3 +166,15 @@ type TopicState struct {
Level Level
Collected int64
}

// Data is a structure that contains relevant data about an alert event.
// The structure is intended to be JSON encoded, providing a consistent data format.
type Data struct {
ID string `json:"id"`
Message string `json:"message"`
Details string `json:"details"`
Time time.Time `json:"time"`
Duration time.Duration `json:"duration"`
Level Level `json:"level"`
Data models.Result `json:"data"`
}
14 changes: 14 additions & 0 deletions etc/kapacitor/kapacitor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,20 @@ default-retention-policy = ""
# The URL for the Pushover API.
url = "https://api.pushover.net/1/messages.json"

##########################################
# Configure Alert POST request Endpoints

# As ENV variables:
# KAPACITOR_HTTPPOST_0_ENDPOINT = "example"
# KAPACITOR_HTTPPOST_0_URL = "http://example.com"
# KAPACITOR_HTTPPOST_0_HEADERS_Example = "header"

# [[httppost]]
# endpoint = "example"
# url = "http://example.com"
# headers = { Example = "your-key" }
# basic-auth = { username = "my-user", password = "my-pass" }

[slack]
# Configure Slack.
enabled = false
Expand Down
44 changes: 38 additions & 6 deletions http_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,50 @@ package kapacitor

import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"

"github.com/influxdata/kapacitor/bufpool"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/services/httppost"
)

type HTTPPostNode struct {
node
c *pipeline.HTTPPostNode
url string
mu sync.RWMutex
bp *bufpool.Pool
c *pipeline.HTTPPostNode
endpoint *httppost.Endpoint
mu sync.RWMutex
bp *bufpool.Pool
}

// Create a new HTTPPostNode which submits received items via POST to an HTTP endpoint
func newHTTPPostNode(et *ExecutingTask, n *pipeline.HTTPPostNode, l *log.Logger) (*HTTPPostNode, error) {

hn := &HTTPPostNode{
node: node{Node: n, et: et, logger: l},
c: n,
bp: bufpool.New(),
url: n.Url,
}

// Should only ever be 0 or 1 from validation of n
if len(n.URLs) == 1 {
e := httppost.NewEndpoint(n.URLs[0], nil, httppost.BasicAuth{})
hn.endpoint = e
}

// Should only ever be 0 or 1 from validation of n
if len(n.HTTPPostEndpoints) == 1 {
endpointName := n.HTTPPostEndpoints[0].Endpoint
e, ok := et.tm.HTTPPostService.Endpoint(endpointName)
if !ok {
return nil, fmt.Errorf("endpoint '%s' does not exist", endpointName)
}
hn.endpoint = e
}

hn.node.runF = hn.runPost
return hn, nil
}
Expand Down Expand Up @@ -72,14 +91,27 @@ func (h *HTTPPostNode) postRow(group models.GroupID, row *models.Row) {
defer h.bp.Put(body)
err := json.NewEncoder(body).Encode(result)
if err != nil {
h.incrementErrorCount()
h.logger.Printf("E! failed to marshal row data json: %v", err)
return
}
req, err := h.endpoint.NewHTTPRequest(body)
if err != nil {
h.incrementErrorCount()
h.logger.Printf("E! failed to marshal row data json: %v", err)
return
}

resp, err := http.Post(h.url, "application/json", body)
req.Header.Set("Content-Type", "application/json")
for k, v := range h.c.Headers {
req.Header.Set(k, v)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
h.incrementErrorCount()
h.logger.Printf("E! failed to POST row data: %v", err)
return
}
resp.Body.Close()

}
16 changes: 9 additions & 7 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/influxdata/kapacitor/clock"
"github.com/influxdata/kapacitor/models"
alertservice "github.com/influxdata/kapacitor/services/alert"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/storage/storagetest"
"github.com/influxdata/wlog"
)
Expand Down Expand Up @@ -1389,15 +1390,15 @@ batch
func TestBatch_AlertStateChangesOnly(t *testing.T) {
requestCount := int32(0)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ad := alertservice.AlertData{}
ad := alert.Data{}
dec := json.NewDecoder(r.Body)
err := dec.Decode(&ad)
if err != nil {
t.Fatal(err)
}
atomic.AddInt32(&requestCount, 1)
if rc := atomic.LoadInt32(&requestCount); rc == 1 {
expAd := alertservice.AlertData{
expAd := alert.Data{
ID: "cpu_usage_idle:cpu=cpu-total",
Message: "cpu_usage_idle:cpu=cpu-total is CRITICAL",
Time: time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
Expand All @@ -1408,7 +1409,7 @@ func TestBatch_AlertStateChangesOnly(t *testing.T) {
t.Error(msg)
}
} else {
expAd := alertservice.AlertData{
expAd := alert.Data{
ID: "cpu_usage_idle:cpu=cpu-total",
Message: "cpu_usage_idle:cpu=cpu-total is OK",
Time: time.Date(1971, 1, 1, 0, 0, 38, 0, time.UTC),
Expand Down Expand Up @@ -1454,27 +1455,27 @@ batch
func TestBatch_AlertStateChangesOnlyExpired(t *testing.T) {
requestCount := int32(0)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ad := alertservice.AlertData{}
ad := alert.Data{}
dec := json.NewDecoder(r.Body)
err := dec.Decode(&ad)
if err != nil {
t.Fatal(err)
}
// We don't care about the data for this test
ad.Data = models.Result{}
var expAd alertservice.AlertData
var expAd alert.Data
atomic.AddInt32(&requestCount, 1)
rc := atomic.LoadInt32(&requestCount)
if rc < 3 {
expAd = alertservice.AlertData{
expAd = alert.Data{
ID: "cpu_usage_idle:cpu=cpu-total",
Message: "cpu_usage_idle:cpu=cpu-total is CRITICAL",
Time: time.Date(1971, 1, 1, 0, 0, int(rc-1)*20, 0, time.UTC),
Duration: time.Duration(rc-1) * 20 * time.Second,
Level: alert.Critical,
}
} else {
expAd = alertservice.AlertData{
expAd = alert.Data{
ID: "cpu_usage_idle:cpu=cpu-total",
Message: "cpu_usage_idle:cpu=cpu-total is OK",
Time: time.Date(1971, 1, 1, 0, 0, 38, 0, time.UTC),
Expand Down Expand Up @@ -2923,6 +2924,7 @@ func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.Ex
tm.HTTPDService = httpdService
tm.TaskStore = taskStore{}
tm.DeadmanService = deadman{}
tm.HTTPPostService = httppost.NewService(nil, logService.NewLogger("[httppost] ", log.LstdFlags))
as := alertservice.NewService(logService.NewLogger("[alert] ", log.LstdFlags))
as.StorageService = storagetest.New()
as.HTTPDService = httpdService
Expand Down
4 changes: 2 additions & 2 deletions integrations/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"time"

"github.com/influxdata/kapacitor"
"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/influxdb"
"github.com/influxdata/kapacitor/models"
alertservice "github.com/influxdata/kapacitor/services/alert"
"github.com/influxdata/kapacitor/services/httpd"
k8s "github.com/influxdata/kapacitor/services/k8s/client"
"github.com/influxdata/kapacitor/udf"
Expand Down Expand Up @@ -117,7 +117,7 @@ func compareResultsIgnoreSeriesOrder(exp, got models.Result) (bool, string) {
return true, ""
}

func compareAlertData(exp, got alertservice.AlertData) (bool, string) {
func compareAlertData(exp, got alert.Data) (bool, string) {
// Pull out Result for comparison
expData := exp.Data
exp.Data = models.Result{}
Expand Down
Loading