Skip to content

Commit

Permalink
Merge pull request #535 from influxdb/fix-535-wal-hang
Browse files Browse the repository at this point in the history
WAL replay hangs if the response wasn't received
  • Loading branch information
pauldix committed May 13, 2014
2 parents a170204 + f202547 commit 730d4d4
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 11 deletions.
10 changes: 8 additions & 2 deletions src/cluster/cluster_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ClusterServer struct {
type ServerConnection interface {
Connect()
Close()
ClearRequests()
MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error
}

Expand Down Expand Up @@ -94,12 +95,12 @@ func (self *ClusterServer) Connect() {
func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) {
err := self.connection.MakeRequest(request, responseStream)
if err != nil {
self.isUp = false
message := err.Error()
select {
case responseStream <- &protocol.Response{Type: &endStreamResponse, ErrorMessage: &message}:
default:
}
self.markServerAsDown()
}
}

Expand Down Expand Up @@ -178,11 +179,16 @@ func (self *ClusterServer) getHeartbeatResponse(responseChan <-chan *protocol.Re
return nil
}

func (self *ClusterServer) markServerAsDown() {
self.isUp = false
self.connection.ClearRequests()
}

func (self *ClusterServer) handleHeartbeatError(err error) {
if self.isUp {
log.Warn("Server marked as down. Hearbeat error for server: %d - %s: %s", self.Id, self.ProtobufConnectionString, err)
}
self.isUp = false
self.markServerAsDown()
self.Backoff *= 2
if self.Backoff > self.MaxBackoff {
self.Backoff = self.MaxBackoff
Expand Down
17 changes: 17 additions & 0 deletions src/coordinator/protobuf_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (self *ProtobufClient) Close() {
self.stopped = true
self.conn = nil
}
self.ClearRequests()
}

func (self *ProtobufClient) getConnection() net.Conn {
Expand All @@ -81,6 +82,22 @@ func (self *ProtobufClient) getConnection() net.Conn {
return self.conn
}

func (self *ProtobufClient) ClearRequests() {
self.requestBufferLock.Lock()
defer self.requestBufferLock.Unlock()

message := "clearing all requests"
for _, req := range self.requestBuffer {
select {
case req.responseChan <- &protocol.Response{Type: &endStreamResponse, ErrorMessage: &message}:
default:
log.Debug("Cannot send response on channel")
}
}

self.requestBuffer = map[uint32]*runningRequest{}
}

// Makes a request to the server. If the responseStream chan is not nil it will expect a response from the server
// with a matching request.Id. The REQUEST_RETRY_ATTEMPTS constant of 3 and the RECONNECT_RETRY_WAIT of 100ms means
// that an attempt to make a request to a downed server will take 300ms to time out.
Expand Down
25 changes: 16 additions & 9 deletions src/coordinator/protobuf_request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,7 @@ func NewProtobufRequestHandler(coordinator Coordinator, clusterConfig *cluster.C

func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, conn net.Conn) error {
if *request.Type == protocol.Request_WRITE {
shard := self.clusterConfig.GetLocalShardById(*request.ShardId)
log.Debug("HANDLE: (%d):%d:%v", self.clusterConfig.LocalServer.Id, request.GetId(), shard)
err := shard.WriteLocalOnly(request)
if err != nil {
log.Error("ProtobufRequestHandler: error writing local shard: ", err)
return err
}
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}
return self.WriteResponse(conn, response)
go self.handleWrites(request, conn)
} else if *request.Type == protocol.Request_DROP_DATABASE {
go self.handleDropDatabase(request, conn)
return nil
Expand All @@ -55,6 +47,21 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con
return nil
}

func (self *ProtobufRequestHandler) handleWrites(request *protocol.Request, conn net.Conn) {
shard := self.clusterConfig.GetLocalShardById(*request.ShardId)
log.Debug("HANDLE: (%d):%d:%v", self.clusterConfig.LocalServer.Id, request.GetId(), shard)
err := shard.WriteLocalOnly(request)
var errorMsg *string
if err != nil {
log.Error("ProtobufRequestHandler: error writing local shard: %s", err)
errorMsg = protocol.String(err.Error())
}
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk, ErrorMessage: errorMsg}
if err := self.WriteResponse(conn, response); err != nil {
log.Error("ProtobufRequestHandler: error writing local shard: %s", err)
}
}

func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn net.Conn) {
// the query should always parse correctly since it was parsed at the originating server.
queries, err := parser.ParseQuery(*request.Query)
Expand Down
1 change: 1 addition & 0 deletions src/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (self *Server) ListenAndServe() error {
if err != nil {
panic(err)
}
log.Info("Connection string changed successfully")
}

go self.ProtobufServer.ListenAndServe()
Expand Down

0 comments on commit 730d4d4

Please sign in to comment.