-
Notifications
You must be signed in to change notification settings - Fork 489
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Add httppost service (add headers to POST alerts) #1254
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few changes
alert.go
Outdated
// } | ||
// h := alertservice.NewPostHandler(c, l) | ||
// an.handlers = append(an.handlers, h) | ||
//} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove
alert.go
Outdated
} | ||
if p.Endpoint != "" { | ||
c.Endpoint = p.Endpoint | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary.
Just do
c.URL = p.URL
c.Endpoint = p.Endpoint
etc/kapacitor/kapacitor.conf
Outdated
# endpoint = "example" | ||
# url = "http://example.com" | ||
# headers = { Authorization = "your-key" } | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this work with environment variables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, have we tried it? I think there is support for maps in our env code but we should test it. Even add a test to the server package for it.
integrations/streamer_test.go
Outdated
@@ -9523,6 +9597,7 @@ func testStreamer( | |||
tm.HTTPDService = httpdService | |||
tm.TaskStore = taskStore{} | |||
tm.DeadmanService = deadman{} | |||
tm.PostService = alertpost.NewService(nil, logService.NewLogger("[alertpost] ", log.LstdFlags)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to
tm.AlertPostService = alertpost.NewService(nil, logService.NewLogger("[alertpost] ", log.LstdFlags))
pipeline/alert.go
Outdated
@@ -485,6 +507,9 @@ type PostHandler struct { | |||
// The POST URL. | |||
// tick:ignore | |||
URL string | |||
|
|||
// Name of the endpoint to be used, as defined in the configuration file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as is defined by the configuration.
server/server.go
Outdated
@@ -457,6 +459,18 @@ func (s *Server) appendPushoverService() { | |||
s.AppendService("pushover", srv) | |||
} | |||
|
|||
func (s *Server) appendPostService() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
appendAlertPostService
services/alert/service.go
Outdated
@@ -108,6 +109,9 @@ type Service struct { | |||
PushoverService interface { | |||
Handler(pushover.HandlerConfig, *log.Logger) alert.Handler | |||
} | |||
PostService interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AlertPostService
services/alertpost/service.go
Outdated
|
||
func (s *Service) TestOptions() interface{} { | ||
return &testOptions{} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it needs to implement the interface. Also would having and endpoint option make sense so you can test each configured endpoint?
task_master.go
Outdated
@@ -105,6 +106,9 @@ type TaskMaster struct { | |||
PushoverService interface { | |||
Handler(pushover.HandlerConfig, *log.Logger) alert.Handler | |||
} | |||
PostService interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AlertPostService
6043675
to
a975e89
Compare
@nathanielc this should be in a reviewable state. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good. I tried to answer some of your questions and had a few of my own.
etc/kapacitor/kapacitor.conf
Outdated
# endpoint = "example" | ||
# url = "http://example.com" | ||
# headers = { Authorization = "your-key" } | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, have we tried it? I think there is support for maps in our env code but we should test it. Even add a test to the server package for it.
services/alertpost/service.go
Outdated
|
||
func (s *Service) TestOptions() interface{} { | ||
return &testOptions{} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it needs to implement the interface. Also would having and endpoint option make sense so you can test each configured endpoint?
services/alertpost/service.go
Outdated
|
||
// Create the HTTP request | ||
var req *http.Request | ||
if h.url != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we validate in pipeline that you do not have both endpoint and url specified?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would we validate this in pipeline?
services/alertpost/service.go
Outdated
Duration time.Duration `json:"duration"` | ||
Level alert.Level `json:"level"` | ||
Data models.Result `json:"data"` | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, should we move the AlertData struct out of the alert/service package and into the alert package? The purpose of the struct is to provide a consistent view of how alert data is exposed so I'd rather there only be one definition of the type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that sounds like a good idea.
c4bd28d
to
b2ea7e7
Compare
@nathanielc this should be in a reviewable state. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have two requested changes.
First, that we separate the different use cases of the httppost service a bit more.
There are two use cases for the service:
- Post arbitrary data to an endpoint.
- Post alert data to an endpoint.
Both of these uses cases naturally involve posting data. But they also both involve working with endpoints, not handlers.
My suggestion is the have the httppost service expose an Endpoint type that the httpPost node consumes instead of the handler interface.
Then you can create a Handler type that consumes the Endpoint type.
My second request is about how live config updates propagate. In the current state changes to endpoints do not effect running tasks and they should. We need a way for the running alert handlers and httpPost nodes to get these live updates. I think the type separating above will help with this, since we can lock the NewHTTPRequest method and then modify the endpoints instead of having to hand out new ones.
Do those requests make sense? Can I clarify anything?
http_post.go
Outdated
mu sync.RWMutex | ||
bp *bufpool.Pool | ||
c *pipeline.HTTPPostNode | ||
handlers []*httppost.Handler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels odd that the HTTPPost node is dealing with alert handlers. Maybe we should rename the type httppost.Handler
to httppost.Endpoint
since all it really is is an endpoint. This change also helps clarify the purpose of the type since currently the Handler type is not used for its Handle
method, in this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
http_post.go
Outdated
if n.Url != "" { | ||
c := httppost.HandlerConfig{} | ||
c.URL = n.Url | ||
h := et.tm.HTTPPostService.Handler(c, l) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type change is also useful here since you can return the concrete type so you do not have to type assert it. Or event add a new method for getting Endpoints.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
http_post.go
Outdated
// Technically we don't need the wait group, but | ||
// having it hear will help contain any leaky go | ||
// routines | ||
var wg sync.WaitGroup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably defer wg.Wait() here instead of at the end of the function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
http_post.go
Outdated
h.logger.Printf("E! failed to POST row data: %v", err) | ||
return | ||
req.Header.Set("Content-Type", "application/json") | ||
resp, err := http.DefaultClient.Do(req) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this break the HTTP proxy support we added? Or had that not been merged yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It hasn't been merged yet, but I also think that it won't break. The default client, I'm pretty sure, does proxy from environment.
alert/types.go
Outdated
|
||
// AlertData is a structure that contains relevant data about an alert event. | ||
// The structure is intended to be JSON encoded, providing a consistent data format. | ||
type AlertData struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The move was necessary since we moved the post handler out of the alertservice package correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also should we rename the type to Data
to remove the stutter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. Stutter removed.
pipeline/node.go
Outdated
@@ -327,8 +327,8 @@ func (n *chainnode) HttpOut(endpoint string) *HTTPOutNode { | |||
} | |||
|
|||
// Creates an HTTP Post node that POSTS received data to the provided HTTP endpoint. | |||
func (n *chainnode) HttpPost(url string) *HTTPPostNode { | |||
h := newHTTPPostNode(n.provides, url) | |||
func (n *chainnode) HttpPost(url ...string) *HTTPPostNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should document that the URL is optional if the endpoint is specified, hence the variadic signature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where should this be documented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added it in the description of HTTPPostNode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can also be documented here in the comment for this function. The comment will become part of the auto generated reference docs.
} | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a test for this code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, I should add that. Is there a place where the logic for array env elements lives?
services/httppost/service.go
Outdated
defer s.mu.Unlock() | ||
for _, nc := range newConfigs { | ||
if c, ok := nc.(Config); ok { | ||
s.endpoints[c.Endpoint] = c |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does look like it is going to work.
The config updates need to effect live code, meaning if you change the URL for an endpoint then the existing alert handlers and httpPost nodes need to update to the new URL. This should be easily accomplished by locking the Handler|Endpoint
type NewHTTPRequest method etc.
As it currently stands the configs are read once so these changes never propagate to running code.
Let me know if I can help clarify this more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could be crazy, but I think this works.
services/httppost/service.go
Outdated
} | ||
|
||
// Create the HTTP request | ||
var req *http.Request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the NewHTTPRequest method here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fized
http_post.go
Outdated
bp *bufpool.Pool | ||
c *pipeline.HTTPPostNode | ||
handlers []*httppost.Handler | ||
url string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this field is now unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
I'll fix that right now.
I think this currently works.
with configuration
and a telegraf reporting every second. when I first enable the task, the data is written to a mock server I have on
the data no longer written to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I see how the dynamic update is working now. I have another suggestion to further separate out Endpoints vs Handlers.
pipeline/http_post.go
Outdated
} | ||
} | ||
|
||
// tick:ignore | ||
func (p *HTTPPostNode) Validate() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be named validate
in order to be automatically called in the parsing execution steps.
pipeline/http_post.go
Outdated
return nil | ||
} | ||
|
||
func (p *HTTPPostNode) URL() (u string, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this is a nice clean up we can't do this as the public API of this struct is callable via TICKscript. We have to treat pipeline struct types as config structs with no behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah dang :( I'll come up with another solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplest thing is to define this method inside http_post.go
like
func nodeURL(p *pipeline.HTTPPostNode) (u string, err error) {
}
it needs a better name but you get the idea.
services/httppost/service.go
Outdated
} | ||
return | ||
} | ||
c, ok := e.s.endpoint(e.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the bit I missed before, we reread the config on every NewHTTPRequest, which is why it is working.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although my suggestion above invalidates this way of doing it.
services/httppost/service.go
Outdated
return c.NewRequest(body) | ||
} | ||
|
||
func (s *Service) Endpoint(c HandlerConfig) *Endpoint { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to create a HandlerConfig to get an Endpoint?
services/httppost/service.go
Outdated
type Endpoint struct { | ||
s *Service | ||
name string | ||
url string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a preference I would like to see Endpoint as having a url field and not know its name. Then for the Endpoints created with just a URL use the URL as the name of the endpoint in the map.
This way all endpoints are the same and just reference a URL and a set of headers.
Also, where are the headers? Are they in the config struct still?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This way all endpoints are the same and just reference a URL and a set of headers.
I like this idea, but I think we might need the name for the case when an endpoint hasn't been defined, but is used in a tickscript. When the task in initialized and the endpoint doesn't currently exist and then we create the endpoint. When the configuration is updated how do we know which httppost.Endpoints
need to have their values updated? I could be over thinking this.
Also, where are the headers? Are they in the config struct still?
Yeah they're only in the config struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like how the Endpoint stuff came together!
I have a few more comments, mostly smaller things I didn't notice before.
pipeline/node.go
Outdated
@@ -327,8 +327,8 @@ func (n *chainnode) HttpOut(endpoint string) *HTTPOutNode { | |||
} | |||
|
|||
// Creates an HTTP Post node that POSTS received data to the provided HTTP endpoint. | |||
func (n *chainnode) HttpPost(url string) *HTTPPostNode { | |||
h := newHTTPPostNode(n.provides, url) | |||
func (n *chainnode) HttpPost(url ...string) *HTTPPostNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can also be documented here in the comment for this function. The comment will become part of the auto generated reference docs.
@@ -70,5 +77,7 @@ boltdb = "/tmp/kapacitor.db" | |||
t.Fatalf("unexpected storage boltdb-path: %s", c.Storage.BoltDBPath) | |||
} else if c.InfluxDB[0].URLs[0] != "http://localhost:18086" { | |||
t.Fatalf("unexpected url 0: %s", c.InfluxDB[0].URLs[0]) | |||
} else if c.HTTPPost[0].Headers["Authorization"] != "my-key" { | |||
t.Fatalf("unexpected header Authorization: %s", c.InfluxDB[0].URLs[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test case where you add a key to the headers map that isn't already there in the file?
|
||
type Request struct { | ||
MatchingHeaders bool | ||
Data alert.Data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems incorrect now. The posted data could be either alert data or just a raw row of data. Should we try and capture both or alternately we could just capture the entire body and let the consumer of this test type decode it.
services/httppost/service.go
Outdated
s.endpoints[c.Endpoint] = NewEndpoint(c.URL, c.Headers) | ||
continue | ||
} | ||
e.SetURL(c.URL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want these Set calls to be atomic together. Meaning there is a slight chance that the endpoint is used right after the URL is modified but before the Headers are which could cause issues. Maybe we should just have an Update method that takes a Config? Lots of ways to do it, but all updates should happen with one lock/unlock of the mutex.
services/httppost/service.go
Outdated
resp.Body.Close() | ||
} | ||
|
||
func alertDataFromEvent(event alert.Event) alert.Data { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this function already exist in another package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup it is, I'll export and use it in this package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, you can move it to the alert package now too. Maybe even just make it a method of the Event type, but what ever looks right to you.
513c8a9
to
5f3dec3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 🎉
CHANGELOG.md
Outdated
@@ -69,6 +69,7 @@ kapacitor define-handler system aggregate_by_1m.yaml | |||
- [#507](https://github.com/influxdata/kapacitor/issues/507): Add API endpoint for performing Kapacitor database backups. | |||
- [#1132](https://github.com/influxdata/kapacitor/issues/1132): Adding source for sensu alert as parameter | |||
- [#1346](https://github.com/influxdata/kapacitor/pull/1346): Add discovery and scraping services. | |||
- [#117](https://github.com/influxdata/kapacitor/issues/117): Add headers to alert POST requests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move this up to the next section now that beta1 was cut?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which section? the unreleased section?
Add enough to make tests pass Add correct POST alert handling Add tests for .endpoint on .post on alert node Add alertpost config tests Remove NewConfig section Add comments to alertpost/config.go Add fixes suggested in PR Move AlertData to alert package from service/alert Implement Test function Allow override variables to set map values Rename alertpost service to httppost Favorite package name is httpposttest :) Add endpoint property method to httpPost Allow variadic httpPost arguments Update config file for httppost Add endpoint test to http post node Make Fixes suggested in PR Add documentation for variadic arguments it httpPost Add test for map enviornment vars Refactor httppost endpoints Reorder code in httpost service package Add support for basic auth to endpoints Make changes from PR Fix httpPost functionality Allow headers to be set for alerts via tickscript Check headers Unredact headers
This PR allows users to create POST request
endpoints
for an alert node where arbitrary HTTP headers may be set. It works as a fix for #117.Todo:
Required for all non-trivial PRs