Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional to be able to show a task in the context of a replay #652

Merged
merged 1 commit into from
Jun 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### Features

- [#636](https://github.com/influxdata/kapacitor/pull/636): Change HTTP logs to be in Common Log format.
- [#](https://github.com/influxdata/kapacitor/pull/): Add optional replay ID to the task API so that you can get information about a task inside a running replay.

### Bugfixes

Expand Down
1 change: 1 addition & 0 deletions client/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ To get information about a task make a GET request to the `/kapacitor/v1/tasks/T
| --------------- | ------- | ------- |
| dot-view | attributes | One of `labels` or `attributes`. Labels is less readable but will correctly render with all the information contained in labels. |
| script-format | formatted | One of `formatted` or `raw`. Raw will return the script identical to how it was defined. Formatted will first format the script. |
| replay-id | | Optional ID of a running replay. The returned task information will be in the context of the task for the running replay. |


A task has these read only properties in addition to the properties listed [above](#define-task).
Expand Down
2 changes: 2 additions & 0 deletions client/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ func (c *Client) UpdateTask(link Link, opt UpdateTaskOptions) (Task, error) {
type TaskOptions struct {
DotView string
ScriptFormat string
ReplayID string
}

func (o *TaskOptions) Default() {
Expand All @@ -708,6 +709,7 @@ func (o *TaskOptions) Values() *url.Values {
v := &url.Values{}
v.Set("dot-view", o.DotView)
v.Set("script-format", o.ScriptFormat)
v.Set("replay-id", o.ReplayID)
return v
}

Expand Down
4 changes: 4 additions & 0 deletions client/v1/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ paths:
required: true
type: string
pattern: (formatted|raw)
- name: replay-id
in: query
description: Optional ID of a running replay. The returned task information will be in the context of the task for the running replay.
type: string
responses:
'200':
description: Task information
Expand Down
20 changes: 16 additions & 4 deletions cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"strings"
"time"

"github.com/dustin/go-humanize"
humanize "github.com/dustin/go-humanize"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/kapacitor/client/v1"
"github.com/pkg/errors"
Expand Down Expand Up @@ -143,7 +143,8 @@ func main() {
commandArgs = args
commandF = doList
case "show":
commandArgs = args
showFlags.Parse(args)
commandArgs = showFlags.Args()
commandF = doShow
case "show-template":
commandArgs = args
Expand Down Expand Up @@ -177,6 +178,7 @@ func init() {
replayFlags.Usage = replayUsage
defineFlags.Usage = defineUsage
defineTemplateFlags.Usage = defineTemplateUsage
showFlags.Usage = showUsage

recordStreamFlags.Usage = recordStreamUsage
recordBatchFlags.Usage = recordBatchUsage
Expand Down Expand Up @@ -1180,13 +1182,20 @@ func doReload(args []string) error {
}

// Show
var (
showFlags = flag.NewFlagSet("show", flag.ExitOnError)
sReplayId = showFlags.String("replay", "", "Optional replay ID. If set the task information is in the context of the running replay.")
)

func showUsage() {
var u = `Usage: kapacitor show [task ID]
var u = `Usage: kapacitor show [-replay] [task ID]

Show details about a specific task.

Options:
`
fmt.Fprintln(os.Stderr, u)
showFlags.PrintDefaults()
}

func doShow(args []string) error {
Expand All @@ -1196,7 +1205,10 @@ func doShow(args []string) error {
os.Exit(2)
}

t, err := cli.Task(cli.TaskLink(args[0]), nil)
t, err := cli.Task(
cli.TaskLink(args[0]),
&client.TaskOptions{ReplayID: *sReplayId},
)
if err != nil {
return err
}
Expand Down
10 changes: 7 additions & 3 deletions cmd/kapacitord/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type Server struct {

err chan error

TaskMaster *kapacitor.TaskMaster
TaskMaster *kapacitor.TaskMaster
TaskMasterLookup *kapacitor.TaskMasterLookup

LogService logging.Interface
HTTPDService *httpd.Service
Expand Down Expand Up @@ -118,7 +119,9 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (*
s.Logger.Printf("I! ClusterID: %s ServerID: %s", s.ClusterID, s.ServerID)

// Start Task Master
s.TaskMaster = kapacitor.NewTaskMaster("main", logService)
s.TaskMasterLookup = kapacitor.NewTaskMasterLookup()
s.TaskMaster = kapacitor.NewTaskMaster(kapacitor.MainTaskMaster, logService)
s.TaskMasterLookup.Set(s.TaskMaster)
if err := s.TaskMaster.Open(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -224,7 +227,7 @@ func (s *Server) appendTaskStoreService(c task_store.Config) {
srv := task_store.NewService(c, l)
srv.StorageService = s.StorageService
srv.HTTPDService = s.HTTPDService
srv.TaskMaster = s.TaskMaster
srv.TaskMasterLookup = s.TaskMasterLookup

s.TaskStore = srv
s.TaskMaster.TaskStore = srv
Expand All @@ -239,6 +242,7 @@ func (s *Server) appendReplayService(c replay.Config) {
srv.HTTPDService = s.HTTPDService
srv.InfluxDBService = s.InfluxDBService
srv.TaskMaster = s.TaskMaster
srv.TaskMasterLookup = s.TaskMasterLookup

s.ReplayService = srv
s.Services = append(s.Services, srv)
Expand Down
9 changes: 9 additions & 0 deletions influxql.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (c *baseReduceContext) Time() time.Time {
func (n *InfluxQLNode) runStreamInfluxQL() error {
contexts := make(map[models.GroupID]reduceContext)
for p, ok := n.ins[0].NextPoint(); ok; {
n.timer.Start()
context := contexts[p.Group]
// Fisrt point in window
if context == nil {
Expand Down Expand Up @@ -127,13 +128,15 @@ func (n *InfluxQLNode) runStreamInfluxQL() error {
// go through loop again to initialize new iterator.
}
}
n.timer.Stop()
}
return nil
}

func (n *InfluxQLNode) runBatchInfluxQL() error {
var exampleValue interface{}
for b, ok := n.ins[0].NextBatch(); ok; b, ok = n.ins[0].NextBatch() {
n.timer.Start()
// Create new base context
c := baseReduceContext{
as: n.n.As,
Expand All @@ -148,6 +151,7 @@ func (n *InfluxQLNode) runBatchInfluxQL() error {
if len(b.Points) == 0 {
if !n.n.ReduceCreater.IsEmptyOK {
// If the reduce does not handle empty batches continue
n.timer.Stop()
continue
}
if exampleValue == nil {
Expand All @@ -171,6 +175,7 @@ func (n *InfluxQLNode) runBatchInfluxQL() error {
if err != nil {
n.logger.Println("E! failed to emit batch:", err)
}
n.timer.Stop()
}
return nil
}
Expand All @@ -194,20 +199,24 @@ func (n *InfluxQLNode) emit(context reduceContext) error {
if err != nil {
return err
}
n.timer.Pause()
for _, out := range n.outs {
err := out.CollectPoint(p)
if err != nil {
return err
}
}
n.timer.Resume()
case pipeline.BatchEdge:
b := context.EmitBatch()
n.timer.Pause()
for _, out := range n.outs {
err := out.CollectBatch(b)
if err != nil {
return err
}
}
n.timer.Resume()
}
return nil
}
8 changes: 8 additions & 0 deletions services/replay/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ type Service struct {
NewDefaultClient() (client.Client, error)
NewNamedClient(name string) (client.Client, error)
}
TaskMasterLookup interface {
Get(string) *kapacitor.TaskMaster
Set(*kapacitor.TaskMaster)
Delete(*kapacitor.TaskMaster)
}
TaskMaster interface {
NewFork(name string, dbrps []kapacitor.DBRP, measurements []string) (*kapacitor.Edge, error)
DelFork(name string)
Expand Down Expand Up @@ -1165,6 +1170,9 @@ func (r *Service) doLiveQueryReplay(id string, task *kapacitor.Task, clk clock.C
func (r *Service) doReplay(id string, task *kapacitor.Task, runReplay func(tm *kapacitor.TaskMaster) error) error {
// Create new isolated task master
tm := r.TaskMaster.New(id)
r.TaskMasterLookup.Set(tm)
defer r.TaskMasterLookup.Delete(tm)

tm.Open()
defer tm.Close()
et, err := tm.StartTask(task)
Expand Down
Loading