From e7dd714c856a35b61996e50b6edf6cb4c1d8cb6d Mon Sep 17 00:00:00 2001 From: Min Pae Date: Tue, 4 Apr 2017 15:51:03 -0700 Subject: [PATCH] Add a generic HTTPPost node An HTTPPost node to mimic the behavior of HTTPOut, with the difference that data is POST'd to an HTTP endpoint rather than made available at a kapacitor endpoint. Closes #1330 --- CHANGELOG.md | 1 + http_post.go | 84 ++++ integrations/batcher_test.go | 412 ++++++++++++++++++++ integrations/data/TestBatch_HttpPost.0.brpl | 9 + integrations/data/TestStream_HttpPost.srpl | 18 + integrations/streamer_test.go | 136 +++++++ pipeline/http_post.go | 26 ++ pipeline/node.go | 7 + task.go | 2 + 9 files changed, 695 insertions(+) create mode 100644 http_post.go create mode 100644 integrations/data/TestBatch_HttpPost.0.brpl create mode 100644 integrations/data/TestStream_HttpPost.srpl create mode 100644 pipeline/http_post.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 9736569f0..8dcd7c96b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ kapacitor define-handler system aggregate_by_1m.yaml ### Features - [#1322](https://github.com/influxdata/kapacitor/pull/1322): TLS configuration in Slack service for Mattermost compatibility +- [#1330](https://github.com/influxdata/kapacitor/issues/1330): Generic HTTP Post node - [#1159](https://github.com/influxdata/kapacitor/pulls/1159): Go version 1.7.4 -> 1.7.5 - [#1175](https://github.com/influxdata/kapacitor/pull/1175): BREAKING: Add generic error counters to every node type. Renamed `query_errors` to `errors` in batch node. diff --git a/http_post.go b/http_post.go new file mode 100644 index 000000000..c686c9a94 --- /dev/null +++ b/http_post.go @@ -0,0 +1,84 @@ +package kapacitor + +import ( + "encoding/json" + "log" + "net/http" + "sync" + + "github.com/influxdata/kapacitor/bufpool" + "github.com/influxdata/kapacitor/models" + "github.com/influxdata/kapacitor/pipeline" +) + +type HTTPPostNode struct { + node + c *pipeline.HTTPPostNode + url string + mu sync.RWMutex + bp *bufpool.Pool +} + +// Create a new HTTPPostNode which caches the most recent item and exposes it over the HTTP API. +func newHTTPPostNode(et *ExecutingTask, n *pipeline.HTTPPostNode, l *log.Logger) (*HTTPPostNode, error) { + hn := &HTTPPostNode{ + node: node{Node: n, et: et, logger: l}, + c: n, + bp: bufpool.New(), + url: n.Url, + } + hn.node.runF = hn.runPost + return hn, nil +} + +func (h *HTTPPostNode) runPost([]byte) error { + switch h.Wants() { + case pipeline.StreamEdge: + for p, ok := h.ins[0].NextPoint(); ok; p, ok = h.ins[0].NextPoint() { + h.timer.Start() + row := models.PointToRow(p) + h.postRow(p.Group, row) + h.timer.Stop() + for _, child := range h.outs { + err := child.CollectPoint(p) + if err != nil { + return err + } + } + } + case pipeline.BatchEdge: + for b, ok := h.ins[0].NextBatch(); ok; b, ok = h.ins[0].NextBatch() { + h.timer.Start() + row := models.BatchToRow(b) + h.postRow(b.Group, row) + h.timer.Stop() + for _, child := range h.outs { + err := child.CollectBatch(b) + if err != nil { + return err + } + } + } + } + return nil +} + +// Update the result structure with a row. +func (h *HTTPPostNode) postRow(group models.GroupID, row *models.Row) { + result := new(models.Result) + result.Series = []*models.Row{row} + + body := h.bp.Get() + defer h.bp.Put(body) + err := json.NewEncoder(body).Encode(result) + if err != nil { + h.logger.Printf("E! failed to marshal row data json: %v", err) + return + } + + resp, err := http.Post(h.url, "application/json", body) + if err != nil { + h.logger.Printf("E! failed to POST row data: %v", err) + } + resp.Body.Close() +} diff --git a/integrations/batcher_test.go b/integrations/batcher_test.go index 07252a6e4..b3a33bb26 100644 --- a/integrations/batcher_test.go +++ b/integrations/batcher_test.go @@ -2497,6 +2497,418 @@ batch testBatcherWithOutput(t, "TestBatch_StateTracking", script, 8*time.Second, er, false) } +func TestBatch_HttpPost(t *testing.T) { + requestCount := int32(0) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + result := models.Result{} + dec := json.NewDecoder(r.Body) + err := dec.Decode(&result) + if err != nil { + t.Fatal(err) + } + atomic.AddInt32(&requestCount, 1) + rc := atomic.LoadInt32(&requestCount) + + var er models.Result + switch rc { + case 1: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu_usage_idle", + Tags: map[string]string{"cpu": "cpu-total"}, + Columns: []string{"time", "mean"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 90.38281469458698, + }, + { + time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC), + 86.51447101892941, + }, + { + time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), + 91.71877558217454, + }, + { + time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC), + 87.10524436107617, + }, + { + time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC), + 90.3900735196668, + }, + }, + }, + }, + } + case 2: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu_usage_idle", + Tags: map[string]string{"cpu": "cpu0"}, + Columns: []string{"time", "mean"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 83.56930693069836, + }, + { + time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC), + 79.12871287128638, + }, + { + time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), + 88.99559823928229, + }, + { + time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC), + 85.50000000000182, + }, + { + time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC), + 86.02860286029956, + }, + }, + }, + }, + } + case 3: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu_usage_idle", + Tags: map[string]string{"cpu": "cpu1"}, + Columns: []string{"time", "mean"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 93.49999999999409, + }, + { + time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC), + 91.44444444443974, + }, + { + time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), + 93.44897959187637, + }, + { + time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC), + 95.99999999995998, + }, + { + time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC), + 97.00970097012197, + }, + }, + }, + }, + } + case 4: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu_usage_idle", + Tags: map[string]string{"cpu": "cpu-total"}, + Columns: []string{"time", "mean"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 90.8919959776013, + }, + { + time.Date(1971, 1, 1, 0, 0, 12, 0, time.UTC), + 86.54244306420236, + }, + { + time.Date(1971, 1, 1, 0, 0, 14, 0, time.UTC), + 91.01699558842134, + }, + { + time.Date(1971, 1, 1, 0, 0, 16, 0, time.UTC), + 85.66378399063848, + }, + { + time.Date(1971, 1, 1, 0, 0, 18, 0, time.UTC), + 89.90919811320221, + }, + }, + }, + }, + } + case 5: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu_usage_idle", + Tags: map[string]string{"cpu": "cpu0"}, + Columns: []string{"time", "mean"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 81.72501716191164, + }, + { + time.Date(1971, 1, 1, 0, 0, 12, 0, time.UTC), + 81.03810381037587, + }, + { + time.Date(1971, 1, 1, 0, 0, 14, 0, time.UTC), + 85.93434343435388, + }, + { + time.Date(1971, 1, 1, 0, 0, 16, 0, time.UTC), + 85.36734693878043, + }, + { + time.Date(1971, 1, 1, 0, 0, 18, 0, time.UTC), + 83.01320528210614, + }, + }, + }, + }, + } + case 6: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu_usage_idle", + Tags: map[string]string{"cpu": "cpu1"}, + Columns: []string{"time", "mean"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 95.98484848485191, + }, + { + time.Date(1971, 1, 1, 0, 0, 12, 0, time.UTC), + 92.098039215696, + }, + { + time.Date(1971, 1, 1, 0, 0, 14, 0, time.UTC), + 92.99999999998363, + }, + { + time.Date(1971, 1, 1, 0, 0, 16, 0, time.UTC), + 86.54015887023496, + }, + { + time.Date(1971, 1, 1, 0, 0, 18, 0, time.UTC), + 95.48979591840603, + }, + }, + }, + }, + } + case 7: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu_usage_idle", + Tags: map[string]string{"cpu": "cpu-total"}, + Columns: []string{"time", "mean"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC), + 91.06416290101595, + }, + { + time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC), + 85.9694442394385, + }, + { + time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC), + 90.62985736134186, + }, + { + time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC), + 86.45443196005628, + }, + { + time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC), + 88.97243107764031, + }, + }, + }, + }, + } + case 8: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu_usage_idle", + Tags: map[string]string{"cpu": "cpu0"}, + Columns: []string{"time", "mean"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC), + 85.08910891088406, + }, + { + time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC), + 78.00000000002001, + }, + { + time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC), + 84.23607066586464, + }, + { + time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC), + 80.85858585861834, + }, + { + time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC), + 80.61224489791657, + }, + }, + }, + }, + } + case 9: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu_usage_idle", + Tags: map[string]string{"cpu": "cpu1"}, + Columns: []string{"time", "mean"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC), + 96.49999999996908, + }, + { + time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC), + 93.46464646468584, + }, + { + time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC), + 95.00950095007724, + }, + { + time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC), + 92.99999999998636, + }, + { + time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC), + 90.99999999998545, + }, + }, + }, + }, + } + } + if eq, msg := compareResults(er, result); !eq { + t.Errorf("unexpected alert data for request: %d %s", rc, msg) + } + })) + defer ts.Close() + + var script = ` +batch + |query(''' + SELECT mean("value") + FROM "telegraf"."default".cpu_usage_idle + WHERE "host" = 'serverA' AND "cpu" != 'cpu-total' +''') + .period(10s) + .every(10s) + .groupBy(time(2s), 'cpu') + |httpPost('` + ts.URL + `') + |httpOut('TestBatch_HttpPost') +` + + er := models.Result{ + Series: models.Rows{ + { + Name: "cpu_usage_idle", + Tags: map[string]string{"cpu": "cpu-total"}, + Columns: []string{"time", "mean"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC), + 91.06416290101595, + }, + { + time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC), + 85.9694442394385, + }, + { + time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC), + 90.62985736134186, + }, + { + time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC), + 86.45443196005628, + }, + { + time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC), + 88.97243107764031, + }, + }, + }, + { + Name: "cpu_usage_idle", + Tags: map[string]string{"cpu": "cpu0"}, + Columns: []string{"time", "mean"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC), + 85.08910891088406, + }, + { + time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC), + 78.00000000002001, + }, + { + time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC), + 84.23607066586464, + }, + { + time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC), + 80.85858585861834, + }, + { + time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC), + 80.61224489791657, + }, + }, + }, + { + Name: "cpu_usage_idle", + Tags: map[string]string{"cpu": "cpu1"}, + Columns: []string{"time", "mean"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC), + 96.49999999996908, + }, + { + time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC), + 93.46464646468584, + }, + { + time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC), + 95.00950095007724, + }, + { + time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC), + 92.99999999998636, + }, + { + time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC), + 90.99999999998545, + }, + }, + }, + }, + } + + testBatcherWithOutput(t, "TestBatch_HttpPost", script, 30*time.Second, er, false) +} + // Helper test function for batcher func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.ExecutingTask, <-chan error, *kapacitor.TaskMaster) { if testing.Verbose() { diff --git a/integrations/data/TestBatch_HttpPost.0.brpl b/integrations/data/TestBatch_HttpPost.0.brpl new file mode 100644 index 000000000..1337caedb --- /dev/null +++ b/integrations/data/TestBatch_HttpPost.0.brpl @@ -0,0 +1,9 @@ +{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":90.38281469458698},"time":"2015-10-30T17:14:12Z"},{"fields":{"mean":86.51447101892941},"time":"2015-10-30T17:14:14Z"},{"fields":{"mean":91.71877558217454},"time":"2015-10-30T17:14:16Z"},{"fields":{"mean":87.10524436107617},"time":"2015-10-30T17:14:18Z"},{"fields":{"mean":90.3900735196668},"time":"2015-10-30T17:14:20Z"}]} +{"name":"cpu_usage_idle","tags":{"cpu":"cpu0"},"points":[{"fields":{"mean":83.56930693069836},"time":"2015-10-30T17:14:12Z"},{"fields":{"mean":79.12871287128638},"time":"2015-10-30T17:14:14Z"},{"fields":{"mean":88.99559823928229},"time":"2015-10-30T17:14:16Z"},{"fields":{"mean":85.50000000000182},"time":"2015-10-30T17:14:18Z"},{"fields":{"mean":86.02860286029956},"time":"2015-10-30T17:14:20Z"}]} +{"name":"cpu_usage_idle","tags":{"cpu":"cpu1"},"points":[{"fields":{"mean":93.49999999999409},"time":"2015-10-30T17:14:12Z"},{"fields":{"mean":91.44444444443974},"time":"2015-10-30T17:14:14Z"},{"fields":{"mean":93.44897959187637},"time":"2015-10-30T17:14:16Z"},{"fields":{"mean":95.99999999995998},"time":"2015-10-30T17:14:18Z"},{"fields":{"mean":97.00970097012197},"time":"2015-10-30T17:14:20Z"}]} +{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":90.8919959776013},"time":"2015-10-30T17:14:22Z"},{"fields":{"mean":86.54244306420236},"time":"2015-10-30T17:14:24Z"},{"fields":{"mean":91.01699558842134},"time":"2015-10-30T17:14:26Z"},{"fields":{"mean":85.66378399063848},"time":"2015-10-30T17:14:28Z"},{"fields":{"mean":89.90919811320221},"time":"2015-10-30T17:14:30Z"}]} +{"name":"cpu_usage_idle","tags":{"cpu":"cpu0"},"points":[{"fields":{"mean":81.72501716191164},"time":"2015-10-30T17:14:22Z"},{"fields":{"mean":81.03810381037587},"time":"2015-10-30T17:14:24Z"},{"fields":{"mean":85.93434343435388},"time":"2015-10-30T17:14:26Z"},{"fields":{"mean":85.36734693878043},"time":"2015-10-30T17:14:28Z"},{"fields":{"mean":83.01320528210614},"time":"2015-10-30T17:14:30Z"}]} +{"name":"cpu_usage_idle","tags":{"cpu":"cpu1"},"points":[{"fields":{"mean":95.98484848485191},"time":"2015-10-30T17:14:22Z"},{"fields":{"mean":92.098039215696},"time":"2015-10-30T17:14:24Z"},{"fields":{"mean":92.99999999998363},"time":"2015-10-30T17:14:26Z"},{"fields":{"mean":86.54015887023496},"time":"2015-10-30T17:14:28Z"},{"fields":{"mean":95.48979591840603},"time":"2015-10-30T17:14:30Z"}]} +{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":91.06416290101595},"time":"2015-10-30T17:14:32Z"},{"fields":{"mean":85.9694442394385},"time":"2015-10-30T17:14:34Z"},{"fields":{"mean":90.62985736134186},"time":"2015-10-30T17:14:36Z"},{"fields":{"mean":86.45443196005628},"time":"2015-10-30T17:14:38Z"},{"fields":{"mean":88.97243107764031},"time":"2015-10-30T17:14:40Z"}]} +{"name":"cpu_usage_idle","tags":{"cpu":"cpu0"},"points":[{"fields":{"mean":85.08910891088406},"time":"2015-10-30T17:14:32Z"},{"fields":{"mean":78.00000000002001},"time":"2015-10-30T17:14:34Z"},{"fields":{"mean":84.23607066586464},"time":"2015-10-30T17:14:36Z"},{"fields":{"mean":80.85858585861834},"time":"2015-10-30T17:14:38Z"},{"fields":{"mean":80.61224489791657},"time":"2015-10-30T17:14:40Z"}]} +{"name":"cpu_usage_idle","tags":{"cpu":"cpu1"},"points":[{"fields":{"mean":96.49999999996908},"time":"2015-10-30T17:14:32Z"},{"fields":{"mean":93.46464646468584},"time":"2015-10-30T17:14:34Z"},{"fields":{"mean":95.00950095007724},"time":"2015-10-30T17:14:36Z"},{"fields":{"mean":92.99999999998636},"time":"2015-10-30T17:14:38Z"},{"fields":{"mean":90.99999999998545},"time":"2015-10-30T17:14:40Z"}]} diff --git a/integrations/data/TestStream_HttpPost.srpl b/integrations/data/TestStream_HttpPost.srpl new file mode 100644 index 000000000..27872ec7a --- /dev/null +++ b/integrations/data/TestStream_HttpPost.srpl @@ -0,0 +1,18 @@ +dbname +rpname +cpu,type=idle,host=serverA value=97.1 0000000001 +dbname +rpname +cpu,type=idle,host=serverA value=92.6 0000000002 +dbname +rpname +cpu,type=idle,host=serverA value=95.6 0000000003 +dbname +rpname +cpu,type=idle,host=serverA value=93.1 0000000004 +dbname +rpname +cpu,type=idle,host=serverA value=92.6 0000000005 +dbname +rpname +cpu,type=idle,host=serverA value=95.8 0000000006 diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index b361dbad0..9bba9a25c 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -2099,6 +2099,142 @@ stream testStreamerWithOutput(t, "TestStream_AllMeasurements", script, 15*time.Second, er, false, nil) } +func TestStream_HttpPost(t *testing.T) { + requestCount := int32(0) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + result := models.Result{} + dec := json.NewDecoder(r.Body) + err := dec.Decode(&result) + if err != nil { + t.Fatal(err) + } + atomic.AddInt32(&requestCount, 1) + rc := atomic.LoadInt32(&requestCount) + + var er models.Result + switch rc { + case 1: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "idle"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 97.1, + }}, + }, + }, + } + case 2: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "idle"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC), + 92.6, + }}, + }, + }, + } + case 3: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "idle"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC), + 95.6, + }}, + }, + }, + } + case 4: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "idle"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC), + 93.1, + }}, + }, + }, + } + case 5: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "idle"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), + 92.6, + }}, + }, + }, + } + case 6: + er = models.Result{ + Series: models.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "idle"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC), + 95.8, + }}, + }, + }, + } + } + if eq, msg := compareResults(er, result); !eq { + t.Errorf("unexpected alert data for request: %d %s", rc, msg) + } + })) + defer ts.Close() + + var script = ` +stream + |from() + .measurement('cpu') + .where(lambda: "host" == 'serverA') + .groupBy('host') + |httpPost('` + ts.URL + `') + |httpOut('TestStream_HttpPost') +` + + er := models.Result{ + Series: models.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "idle"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC), + 95.8, + }}, + }, + }, + } + + testStreamerWithOutput(t, "TestStream_HttpPost", script, 13*time.Second, er, false, nil) + + if rc := atomic.LoadInt32(&requestCount); rc != 6 { + t.Errorf("got %v exp %v", rc, 6) + } +} + func TestStream_HttpOutPassThrough(t *testing.T) { var script = ` diff --git a/pipeline/http_post.go b/pipeline/http_post.go new file mode 100644 index 000000000..232e5b22d --- /dev/null +++ b/pipeline/http_post.go @@ -0,0 +1,26 @@ +package pipeline + +// An HTTPPostNode will take the incoming data stream and POST it to an HTTP endpoint. +// +// Example: +// stream +// |window() +// .period(10s) +// .every(5s) +// |top('value', 10) +// //Post the top 10 results over the last 10s updated every 5s. +// |httpPost('http://example.com/api/top10') +// +type HTTPPostNode struct { + chainnode + + // tick:ignore + Url string +} + +func newHTTPPostNode(wants EdgeType, url string) *HTTPPostNode { + return &HTTPPostNode{ + chainnode: newBasicChainNode("http_post", wants, wants), + Url: url, + } +} diff --git a/pipeline/node.go b/pipeline/node.go index edaa6d726..684432503 100644 --- a/pipeline/node.go +++ b/pipeline/node.go @@ -326,6 +326,13 @@ func (n *chainnode) HttpOut(endpoint string) *HTTPOutNode { return h } +// Creates an HTTP Post node that POSTS received data to the provided HTTP endpoint. +func (n *chainnode) HttpPost(url string) *HTTPPostNode { + h := newHTTPPostNode(n.provides, url) + n.linkChild(h) + return h +} + // Create an influxdb output node that will store the incoming data into InfluxDB. func (n *chainnode) InfluxDBOut() *InfluxDBOutNode { i := newInfluxDBOutNode(n.provides) diff --git a/task.go b/task.go index f3a6b7424..80dd7bf73 100644 --- a/task.go +++ b/task.go @@ -454,6 +454,8 @@ func (et *ExecutingTask) createNode(p pipeline.Node, l *log.Logger) (n Node, err n, err = newWindowNode(et, t, l) case *pipeline.HTTPOutNode: n, err = newHTTPOutNode(et, t, l) + case *pipeline.HTTPPostNode: + n, err = newHTTPPostNode(et, t, l) case *pipeline.InfluxDBOutNode: n, err = newInfluxDBOutNode(et, t, l) case *pipeline.AlertNode: