Skip to content

Commit

Permalink
Handle distributed queries when shards != data nodes
Browse files Browse the repository at this point in the history
Fixes #2272

There was previously a explict panic put in the query engine to prevent
queries where the number of shards was not equal to the number of data nodes
in the cluster.  This was waiting for the distributed queries branch to land
but was not removed when that landed.
  • Loading branch information
jwilder committed Apr 20, 2015
1 parent 3332f09 commit d5bd804
Showing 1 changed file with 51 additions and 49 deletions.
100 changes: 51 additions & 49 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,60 +142,62 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri

// create mappers for each shard we need to hit
for _, sg := range shardGroups {
if len(sg.Shards) != 1 { // we'll only have more than 1 shard in a group when RF < # servers in cluster
// TODO: implement distributed queries.
panic("distributed queries not implemented yet and there are too many shards in this group")
}

shard := sg.Shards[0]

var mapper influxql.Mapper
shards := map[*Shard][]uint64{}
for _, sid := range t.SeriesIDs {
shard := sg.ShardBySeriesID(sid)
shards[shard] = append(shards[shard], sid)
}

// create either a remote or local mapper for this shard
if shard.store == nil {
nodes := tx.server.DataNodesByID(shard.DataNodeIDs)
if len(nodes) == 0 {
return nil, ErrShardNotFound
for shard, _ := range shards {
var mapper influxql.Mapper

// create either a remote or local mapper for this shard
if shard.store == nil {
nodes := tx.server.DataNodesByID(shard.DataNodeIDs)
if len(nodes) == 0 {
return nil, ErrShardNotFound
}

balancer := NewDataNodeBalancer(nodes)

mapper = &RemoteMapper{
dataNodes: balancer,
Database: mm.Database,
MeasurementName: m.Name,
TMin: tmin.UnixNano(),
TMax: tmax.UnixNano(),
SeriesIDs: t.SeriesIDs,
ShardID: shard.ID,
WhereFields: whereFields,
SelectFields: selectFields,
SelectTags: selectTags,
Limit: stmt.Limit,
Offset: stmt.Offset,
Interval: interval,
}
mapper.(*RemoteMapper).SetFilters(t.Filters)
} else {
mapper = &LocalMapper{
seriesIDs: t.SeriesIDs,
db: shard.store,
job: job,
decoder: NewFieldCodec(m),
filters: t.Filters,
whereFields: whereFields,
selectFields: selectFields,
selectTags: selectTags,
tmax: tmax.UnixNano(),
interval: interval,
// multiple mappers may need to be merged together to get the results
// for a raw query. So each mapper will have to read at least the
// limit plus the offset in data points to ensure we've hit our mark
limit: uint64(stmt.Limit) + uint64(stmt.Offset),
}
}

balancer := NewDataNodeBalancer(nodes)

mapper = &RemoteMapper{
dataNodes: balancer,
Database: mm.Database,
MeasurementName: m.Name,
TMin: tmin.UnixNano(),
TMax: tmax.UnixNano(),
SeriesIDs: t.SeriesIDs,
ShardID: shard.ID,
WhereFields: whereFields,
SelectFields: selectFields,
SelectTags: selectTags,
Limit: stmt.Limit,
Offset: stmt.Offset,
Interval: interval,
}
mapper.(*RemoteMapper).SetFilters(t.Filters)
} else {
mapper = &LocalMapper{
seriesIDs: t.SeriesIDs,
db: shard.store,
job: job,
decoder: NewFieldCodec(m),
filters: t.Filters,
whereFields: whereFields,
selectFields: selectFields,
selectTags: selectTags,
tmax: tmax.UnixNano(),
interval: interval,
// multiple mappers may need to be merged together to get the results
// for a raw query. So each mapper will have to read at least the
// limit plus the offset in data points to ensure we've hit our mark
limit: uint64(stmt.Limit) + uint64(stmt.Offset),
}
mappers = append(mappers, mapper)
}

mappers = append(mappers, mapper)
}

job.Mappers = mappers
Expand Down

0 comments on commit d5bd804

Please sign in to comment.