Skip to content

Commit

Permalink
Merge pull request #418 from influxdb/fix-418-break-large-requests
Browse files Browse the repository at this point in the history
We shouldn't send requests or responses that are larger than MAX_REQUEST_SIZE
  • Loading branch information
pauldix committed May 30, 2014
2 parents 7498a36 + c89a6d3 commit de0a938
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 9 deletions.
20 changes: 20 additions & 0 deletions src/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,26 @@ func (self *CoordinatorImpl) CommitSeriesData(db string, serieses []*protocol.Se

func (self *CoordinatorImpl) write(db string, series []*protocol.Series, shard cluster.Shard, sync bool) error {
request := &protocol.Request{Type: &write, Database: &db, MultiSeries: series}
// break the request if it's too big
if request.Size() >= MAX_REQUEST_SIZE {
if l := len(series); l > 1 {
// create two requests with half the serie
if err := self.write(db, series[:l/2], shard, sync); err != nil {
return err
}
return self.write(db, series[l/2:], shard, sync)
}

// otherwise, split the points of the only series
s := series[0]
l := len(s.Points)
s1 := &protocol.Series{Name: s.Name, Fields: s.Fields, Points: s.Points[:l/2]}
if err := self.write(db, []*protocol.Series{s1}, shard, sync); err != nil {
return err
}
s2 := &protocol.Series{Name: s.Name, Fields: s.Fields, Points: s.Points[l/2:]}
return self.write(db, []*protocol.Series{s2}, shard, sync)
}
if sync {
return shard.SyncWrite(request)
}
Expand Down
19 changes: 10 additions & 9 deletions src/coordinator/protobuf_request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,10 @@ func (self *ProtobufRequestHandler) handleDropDatabase(request *protocol.Request
}

func (self *ProtobufRequestHandler) WriteResponse(conn net.Conn, response *protocol.Response) error {
data, err := response.Encode()
if err != nil {
log.Error("error encoding response: %s", err)
return err
}
if len(data) >= MAX_RESPONSE_SIZE {
pointCount := len(response.Series.Points)
firstHalfPoints := response.Series.Points[:pointCount]
secondHalfPoints := response.Series.Points[pointCount:]
if response.Size() >= MAX_RESPONSE_SIZE {
l := len(response.Series.Points)
firstHalfPoints := response.Series.Points[:l/2]
secondHalfPoints := response.Series.Points[l/2:]
response.Series.Points = firstHalfPoints
err := self.WriteResponse(conn, response)
if err != nil {
Expand All @@ -133,6 +128,12 @@ func (self *ProtobufRequestHandler) WriteResponse(conn net.Conn, response *proto
return self.WriteResponse(conn, response)
}

data, err := response.Encode()
if err != nil {
log.Error("error encoding response: %s", err)
return err
}

buff := bytes.NewBuffer(make([]byte, 0, len(data)+8))
binary.Write(buff, binary.LittleEndian, uint32(len(data)))
_, err = conn.Write(append(buff.Bytes(), data...))
Expand Down
17 changes: 17 additions & 0 deletions src/integration/multiple_servers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,23 @@ func (self *ServerSuite) TearDownSuite(c *C) {
}
}

func (self *ServerSuite) TestLargeRequestSize(c *C) {
client := self.serverProcesses[0].GetClient("db1", c)
c.Assert(client.CreateDatabase("db1"), IsNil)
numberOfPoints := 2 * 1024 * 1024
data := CreatePoints("test_large_requests", 1, numberOfPoints)
self.serverProcesses[0].WriteData(data, c)
for _, s := range self.serverProcesses {
s.WaitForServerToSync()
}
for _, s := range self.serverProcesses {
data = s.RunQuery("select count(column0) from test_large_requests", "m", c)
c.Assert(data, HasLen, 1)
c.Assert(data[0].Points, HasLen, 1)
c.Assert(data[0].Points[0][1], Equals, float64(numberOfPoints))
}
}

func (self *ServerSuite) TestChangingRootPassword(c *C) {
rootClient := self.serverProcesses[0].GetClient("", c)
c.Assert(rootClient.CreateClusterAdmin("newroot", "root"), IsNil)
Expand Down
8 changes: 8 additions & 0 deletions src/protocol/protocol_extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func (self *Point) GetFieldValueAsString(idx int) string {
}
}

func (self *Request) Size() int {
return proto.Size(self)
}

func DecodeRequest(buff *bytes.Buffer) (request *Request, err error) {
request = &Request{}
err = proto.Unmarshal(buff.Bytes(), request)
Expand All @@ -112,6 +116,10 @@ func (self *Request) Decode(data []byte) error {
return proto.Unmarshal(data, self)
}

func (self *Response) Size() int {
return proto.Size(self)
}

func DecodeResponse(buff *bytes.Buffer) (response *Response, err error) {
response = &Response{}
err = proto.Unmarshal(buff.Bytes(), response)
Expand Down

0 comments on commit de0a938

Please sign in to comment.