Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for a paste operator in join.as() prefix specifications. #444

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 47 additions & 7 deletions join.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,15 @@ func (g *group) emitAll() error {
}

// emit a single joined set
func (g *group) emitJoinedSet(set *joinset) error {
func (g *group) emitJoinedSet(set *joinset) (err error) {
defer func() {
if p := recover(); p != nil {
var ok bool
if err, ok = p.(error); !ok {
panic(p)
}
}
}()
if set.name == "" {
set.name = set.First().PointName()
}
Expand Down Expand Up @@ -404,6 +412,8 @@ type joinset struct {
tolerance time.Duration
values []models.PointInterface

claims map[string]int

expected int
size int
finished int
Expand Down Expand Up @@ -434,6 +444,7 @@ func newJoinset(
time: time,
tolerance: tolerance,
logger: l,
claims: make(map[string]int),
}
}

Expand Down Expand Up @@ -467,19 +478,19 @@ func (js *joinset) JoinIntoPoint() (models.Point, bool) {
switch js.fill {
case influxql.NullFill:
for k := range js.First().PointFields() {
fields[js.prefixes[i]+"."+k] = nil
fields[js.outName(i, k)] = nil
}
case influxql.NumberFill:
for k := range js.First().PointFields() {
fields[js.prefixes[i]+"."+k] = js.fillValue
fields[js.outName(i, k)] = js.fillValue
}
default:
// inner join no valid point possible
return models.Point{}, false
}
} else {
for k, v := range p.PointFields() {
fields[js.prefixes[i]+"."+k] = v
fields[js.outName(i, k)] = v
}
}
}
Expand Down Expand Up @@ -554,19 +565,19 @@ func (js *joinset) JoinIntoBatch() (models.Batch, bool) {
switch js.fill {
case influxql.NullFill:
for _, k := range fieldNames {
fields[js.prefixes[i]+"."+k] = nil
fields[js.outName(i, k)] = nil
}
case influxql.NumberFill:
for _, k := range fieldNames {
fields[js.prefixes[i]+"."+k] = js.fillValue
fields[js.outName(i, k)] = js.fillValue
}
default:
// inner join no valid point possible
return models.Batch{}, false
}
} else {
for k, v := range bp.Fields {
fields[js.prefixes[i]+"."+k] = v
fields[js.outName(i, k)] = v
}
}
}
Expand All @@ -580,6 +591,35 @@ func (js *joinset) JoinIntoBatch() (models.Batch, bool) {
return newBatch, true
}

// outName returns the fully qualified name for field k, in stream joined stream i.
//
// If the specified prefix is empty, the output name is k.
//
// If the specified prefix contains a trailing #, the output name is the prefix
// without the trailing # concatenated with k
//
// Otherwise, the output name is prefix+"."+k
//
// A runtime check is performed to guarantee that all output points have unique field
// names.
func (js *joinset) outName(i int, k string) string {
prefix := js.prefixes[i]
if len(prefix) == 0 {
prefix = ""
} else if prefix[len(prefix)-1] == '#' {
prefix = prefix[0 : len(prefix)-1]
} else {
prefix = prefix + "."
}
n := prefix + k
if claim, ok := js.claims[n]; ok && claim != i {
panic(fmt.Errorf("field %s of input %d conflicts with input %d", k, i, claim))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove the panic here and return an error instead?

The calling functions JoinInto* can then log the error and return models.Point/Batch{},false the join will be skipped.

Copy link
Contributor Author

@jonseymour jonseymour May 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made use of panic because it complicates every caller of outName which would otherwise have to check for an error.

update: I had a quick look at removing the panic - it really does make things quite messy. However, if you really want me to do that, let me know and I will push such a change

} else if !ok {
js.claims[n] = i
}
return n
}

type durationVar struct {
expvar.Int
}
Expand Down
39 changes: 22 additions & 17 deletions pipeline/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@ import (
// errors
// |join(requests)
// // Provide prefix names for the fields of the data points.
// .as('errors', 'requests')
// .as('errors', '#')
// // points that are within 1 second are considered the same time.
// .tolerance(1s)
// // fill missing values with 0, implies outer join.
// .fill(0.0)
// // name the resulting stream
// .streamName('error_rate')
// // Both the "value" fields from each parent have been prefixed
// // with the respective names 'errors' and 'requests'.
// |eval(lambda: "errors.value" / "requests.value")
// // The "value" field from the errors parent has been
// // prefixed with 'errors.' but the "value" field from the requests parent has
// // been copied without prepending an additional prefix.
// |eval(lambda: "errors.value" / "value"))
// .as('rate')
// ...
//
Expand Down Expand Up @@ -90,11 +91,23 @@ func newJoinNode(e EdgeType, parents []Node) *JoinNode {
}

// Prefix names for all fields from the respective nodes.
//
// Each field from the parent nodes will be prefixed with the provided name and a '.'.
// See the example above.
//
// The names cannot have a dot '.' character.
//
// If a prefix is not specified or is empty, then field names from parent nodes
// will be copied without attaching a prefix.
//
// If a prefix contains a trailing '#' character, then the prefix
// upto, but not including the trailing '#' character is prepended to the field name
// from the parent node without adding the '.' character after the prefix.
//
// It is the callers responsibility to ensure that when these rules are applied
// the collection of output field names does not contain any duplicate names. Failure
// to ensure this will result in a runtime error.
Copy link
Contributor

@nathanielc nathanielc May 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the entire task be stopped because of a name conflict? I know that in the EvalNode for example if invalid data arrives an error is logged and the task continues to process. I see two extremes

  1. A single point can stop an entire task
  2. A single point can be dropped from the data set.

Either extreme could be detrimental based on the use case but I think #1 is the worse of the two since is means possibly lots of data is dropped, while the task is off.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that the task should be stopped since data it is likely that if a set of points fails the uniqueness test then all subsequent sets of points will also fail the test in which case joined data will be silently dropped from the output stream.

//
// tick:property
func (j *JoinNode) As(names ...string) *JoinNode {
j.Names = names
Expand Down Expand Up @@ -137,25 +150,17 @@ func (j *JoinNode) validate() error {
return fmt.Errorf("a call to join.as() is required to specify the output stream prefixes.")
}

if len(j.Names) != len(j.Parents()) {
if len(j.Names) > len(j.Parents()) {
return fmt.Errorf("number of prefixes specified by join.as() must match the number of joined streams")
} else if len(j.Names) < len(j.Parents()) {
tmp := make([]string, len(j.Parents()))
copy(tmp, j.Names)
j.Names = tmp
}

for _, name := range j.Names {
if len(name) == 0 {
return fmt.Errorf("must provide a prefix name for the join node, see .as() property method")
}
if strings.ContainsRune(name, '.') {
return fmt.Errorf("cannot use name %s as field prefix, it contains a '.' character", name)
}
}
names := make(map[string]bool, len(j.Names))
for _, name := range j.Names {
if names[name] {
return fmt.Errorf("cannot use the same prefix name see .as() property method")
}
names[name] = true
}

return nil
}