diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index 1446aafabf8..90bd0a02fd4 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -1443,6 +1443,22 @@ func Test3NodeClusterPartiallyReplicated(t *testing.T) { runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes)-1) } +// ensure that all queries work if there are more nodes in a cluster than the replication factor +func Test5NodeClusterPartiallyReplicated(t *testing.T) { + testName := "5-node server integration partial replication" + if testing.Short() { + t.Skip(fmt.Sprintf("skipping '%s'", testName)) + } + dir := tempfile() + defer os.RemoveAll(dir) + + nodes := createCombinedNodeCluster(t, testName, dir, 5, nil) + defer nodes.Close() + + runTestsData(t, testName, nodes, "mydb", "myrp", 2) + runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", 2) +} + func TestClientLibrary(t *testing.T) { testName := "single server integration via client library" if testing.Short() { diff --git a/influxql/engine.go b/influxql/engine.go index c5e52035d96..7b89632d897 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -204,6 +204,50 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) { out <- row } +// mergeOutputs merges two sorted slices of rawQueryMapOutput such that duplicate +// timestamp entries, if they exists, are remove and the final output is sorted by time +func (m *MapReduceJob) mergeOutputs(first, second []*rawQueryMapOutput) []*rawQueryMapOutput { + var i, j int + v := []*rawQueryMapOutput{} + for { + + // indexes are past both slice maxes + if i >= len(first) && j >= len(second) { + break + } + + // first slice is done, append the rest of second + if i >= len(first) { + v = append(v, second[j:]...) + break + } + + // second slice is done, append the rest of first + if j >= len(second) { + v = append(v, first[i:]...) + break + } + + f := first[i] + s := second[j] + + // append the next smallest value to keep output sorted by time + if f.Timestamp < s.Timestamp { + v = append(v, f) + i += 1 + } else if f.Timestamp > s.Timestamp { + v = append(v, s) + j += 1 + // timestamps are the same so there is a dup, pick exiting and continue + } else { + v = append(v, f) + i += 1 + j += 1 + } + } + return v +} + // processRawQuery will handle running the mappers and then reducing their output // for queries that pull back raw data values without computing any kind of aggregates. func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { @@ -313,7 +357,8 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) { valuesSent += len(values) } - valuesToReturn = append(valuesToReturn, values...) + // merge the existing values with the new ones + valuesToReturn = m.mergeOutputs(valuesToReturn, values) // hit the chunk size? Send out what has been accumulated, but keep // processing. diff --git a/influxql/engine_test.go b/influxql/engine_test.go new file mode 100644 index 00000000000..ae61592642d --- /dev/null +++ b/influxql/engine_test.go @@ -0,0 +1,93 @@ +package influxql + +import ( + "testing" + "time" +) + +func TestMergeOutputs(t *testing.T) { + job := MapReduceJob{} + + test := []struct { + name string + first []*rawQueryMapOutput + second []*rawQueryMapOutput + expected []*rawQueryMapOutput + }{ + { + name: "empty slices", + first: []*rawQueryMapOutput{}, + second: []*rawQueryMapOutput{}, + expected: []*rawQueryMapOutput{}, + }, + { + name: "first empty", + first: []*rawQueryMapOutput{}, + second: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}}, + expected: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}}, + }, + { + name: "second empty", + first: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}}, + second: []*rawQueryMapOutput{}, + expected: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}}, + }, + { + name: "first before", + first: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}}, + second: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}}, + expected: []*rawQueryMapOutput{ + &rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}, + &rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}, + }, + }, + { + name: "second before", + first: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}}, + second: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}}, + expected: []*rawQueryMapOutput{ + &rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}, + &rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}, + }, + }, + { + name: "dups removed", + first: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}}, + second: []*rawQueryMapOutput{&rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}}, + expected: []*rawQueryMapOutput{ + &rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}, + }, + }, + { + name: "sorted dups removed", + first: []*rawQueryMapOutput{ + &rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}, + &rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}, + }, + second: []*rawQueryMapOutput{ + &rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}, + &rawQueryMapOutput{time.Unix(2, 0).UnixNano(), 0}, + }, + expected: []*rawQueryMapOutput{ + &rawQueryMapOutput{time.Unix(0, 0).UnixNano(), 0}, + &rawQueryMapOutput{time.Unix(1, 0).UnixNano(), 0}, + &rawQueryMapOutput{time.Unix(2, 0).UnixNano(), 0}, + }, + }, + } + + for _, c := range test { + got := job.mergeOutputs(c.first, c.second) + + if len(got) != len(c.expected) { + t.Errorf("test %s: result length mismatch: got %v, exp %v", c.name, len(got), len(c.expected)) + } + + for j := 0; j < len(c.expected); j++ { + if exp := c.expected[j]; exp.Timestamp != got[j].Timestamp { + t.Errorf("test %s: timestamp mismatch: got %v, exp %v", c.name, got[j].Timestamp, exp.Timestamp) + } + } + + } +} diff --git a/tx.go b/tx.go index e0fb85d331c..6660e794863 100644 --- a/tx.go +++ b/tx.go @@ -139,60 +139,56 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri // create mappers for each shard we need to hit for _, sg := range shardGroups { - if len(sg.Shards) != 1 { // we'll only have more than 1 shard in a group when RF < # servers in cluster - // TODO: implement distributed queries. - panic("distributed queries not implemented yet and there are too many shards in this group") - } - - shard := sg.Shards[0] - - var mapper influxql.Mapper - - // create either a remote or local mapper for this shard - if shard.store == nil { - nodes := tx.server.DataNodesByID(shard.DataNodeIDs) - if len(nodes) == 0 { - return nil, ErrShardNotFound + for _, shard := range sg.Shards { + var mapper influxql.Mapper + + // create either a remote or local mapper for this shard + if shard.store == nil { + nodes := tx.server.DataNodesByID(shard.DataNodeIDs) + if len(nodes) == 0 { + return nil, ErrShardNotFound + } + + balancer := NewDataNodeBalancer(nodes) + + mapper = &RemoteMapper{ + dataNodes: balancer, + Database: mm.Database, + MeasurementName: m.Name, + TMin: tmin.UnixNano(), + TMax: tmax.UnixNano(), + SeriesIDs: t.SeriesIDs, + ShardID: shard.ID, + WhereFields: whereFields, + SelectFields: selectFields, + SelectTags: selectTags, + Limit: stmt.Limit, + Offset: stmt.Offset, + Interval: interval, + } + mapper.(*RemoteMapper).SetFilters(t.Filters) + } else { + mapper = &LocalMapper{ + seriesIDs: t.SeriesIDs, + db: shard.store, + job: job, + decoder: NewFieldCodec(m), + filters: t.Filters, + whereFields: whereFields, + selectFields: selectFields, + selectTags: selectTags, + tmax: tmax.UnixNano(), + interval: interval, + // multiple mappers may need to be merged together to get the results + // for a raw query. So each mapper will have to read at least the + // limit plus the offset in data points to ensure we've hit our mark + limit: uint64(stmt.Limit) + uint64(stmt.Offset), + } } - balancer := NewDataNodeBalancer(nodes) - - mapper = &RemoteMapper{ - dataNodes: balancer, - Database: mm.Database, - MeasurementName: m.Name, - TMin: tmin.UnixNano(), - TMax: tmax.UnixNano(), - SeriesIDs: t.SeriesIDs, - ShardID: shard.ID, - WhereFields: whereFields, - SelectFields: selectFields, - SelectTags: selectTags, - Limit: stmt.Limit, - Offset: stmt.Offset, - Interval: interval, - } - mapper.(*RemoteMapper).SetFilters(t.Filters) - } else { - mapper = &LocalMapper{ - seriesIDs: t.SeriesIDs, - db: shard.store, - job: job, - decoder: NewFieldCodec(m), - filters: t.Filters, - whereFields: whereFields, - selectFields: selectFields, - selectTags: selectTags, - tmax: tmax.UnixNano(), - interval: interval, - // multiple mappers may need to be merged together to get the results - // for a raw query. So each mapper will have to read at least the - // limit plus the offset in data points to ensure we've hit our mark - limit: uint64(stmt.Limit) + uint64(stmt.Offset), - } - } + mappers = append(mappers, mapper) - mappers = append(mappers, mapper) + } } job.Mappers = mappers