From f2025477033536b1e40357a1f2ce696290210c4b Mon Sep 17 00:00:00 2001 From: John Shahid Date: Mon, 12 May 2014 17:49:51 -0400 Subject: [PATCH] Fix #535. WAL replay hangs if the remote server crashed before sending response --- src/cluster/cluster_server.go | 10 +++++++-- src/coordinator/protobuf_client.go | 17 ++++++++++++++ src/coordinator/protobuf_request_handler.go | 25 +++++++++++++-------- src/server/server.go | 1 + 4 files changed, 42 insertions(+), 11 deletions(-) diff --git a/src/cluster/cluster_server.go b/src/cluster/cluster_server.go index 43f96583bf1..e34c0873871 100644 --- a/src/cluster/cluster_server.go +++ b/src/cluster/cluster_server.go @@ -34,6 +34,7 @@ type ClusterServer struct { type ServerConnection interface { Connect() Close() + ClearRequests() MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error } @@ -94,12 +95,12 @@ func (self *ClusterServer) Connect() { func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) { err := self.connection.MakeRequest(request, responseStream) if err != nil { - self.isUp = false message := err.Error() select { case responseStream <- &protocol.Response{Type: &endStreamResponse, ErrorMessage: &message}: default: } + self.markServerAsDown() } } @@ -178,11 +179,16 @@ func (self *ClusterServer) getHeartbeatResponse(responseChan <-chan *protocol.Re return nil } +func (self *ClusterServer) markServerAsDown() { + self.isUp = false + self.connection.ClearRequests() +} + func (self *ClusterServer) handleHeartbeatError(err error) { if self.isUp { log.Warn("Server marked as down. Hearbeat error for server: %d - %s: %s", self.Id, self.ProtobufConnectionString, err) } - self.isUp = false + self.markServerAsDown() self.Backoff *= 2 if self.Backoff > self.MaxBackoff { self.Backoff = self.MaxBackoff diff --git a/src/coordinator/protobuf_client.go b/src/coordinator/protobuf_client.go index aea88e45a2b..af5d5b2d4eb 100644 --- a/src/coordinator/protobuf_client.go +++ b/src/coordinator/protobuf_client.go @@ -73,6 +73,7 @@ func (self *ProtobufClient) Close() { self.stopped = true self.conn = nil } + self.ClearRequests() } func (self *ProtobufClient) getConnection() net.Conn { @@ -81,6 +82,22 @@ func (self *ProtobufClient) getConnection() net.Conn { return self.conn } +func (self *ProtobufClient) ClearRequests() { + self.requestBufferLock.Lock() + defer self.requestBufferLock.Unlock() + + message := "clearing all requests" + for _, req := range self.requestBuffer { + select { + case req.responseChan <- &protocol.Response{Type: &endStreamResponse, ErrorMessage: &message}: + default: + log.Debug("Cannot send response on channel") + } + } + + self.requestBuffer = map[uint32]*runningRequest{} +} + // Makes a request to the server. If the responseStream chan is not nil it will expect a response from the server // with a matching request.Id. The REQUEST_RETRY_ATTEMPTS constant of 3 and the RECONNECT_RETRY_WAIT of 100ms means // that an attempt to make a request to a downed server will take 300ms to time out. diff --git a/src/coordinator/protobuf_request_handler.go b/src/coordinator/protobuf_request_handler.go index f64a61408d3..3710bf8814d 100644 --- a/src/coordinator/protobuf_request_handler.go +++ b/src/coordinator/protobuf_request_handler.go @@ -31,15 +31,7 @@ func NewProtobufRequestHandler(coordinator Coordinator, clusterConfig *cluster.C func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, conn net.Conn) error { if *request.Type == protocol.Request_WRITE { - shard := self.clusterConfig.GetLocalShardById(*request.ShardId) - log.Debug("HANDLE: (%d):%d:%v", self.clusterConfig.LocalServer.Id, request.GetId(), shard) - err := shard.WriteLocalOnly(request) - if err != nil { - log.Error("ProtobufRequestHandler: error writing local shard: ", err) - return err - } - response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk} - return self.WriteResponse(conn, response) + go self.handleWrites(request, conn) } else if *request.Type == protocol.Request_DROP_DATABASE { go self.handleDropDatabase(request, conn) return nil @@ -55,6 +47,21 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con return nil } +func (self *ProtobufRequestHandler) handleWrites(request *protocol.Request, conn net.Conn) { + shard := self.clusterConfig.GetLocalShardById(*request.ShardId) + log.Debug("HANDLE: (%d):%d:%v", self.clusterConfig.LocalServer.Id, request.GetId(), shard) + err := shard.WriteLocalOnly(request) + var errorMsg *string + if err != nil { + log.Error("ProtobufRequestHandler: error writing local shard: %s", err) + errorMsg = protocol.String(err.Error()) + } + response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk, ErrorMessage: errorMsg} + if err := self.WriteResponse(conn, response); err != nil { + log.Error("ProtobufRequestHandler: error writing local shard: %s", err) + } +} + func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn net.Conn) { // the query should always parse correctly since it was parsed at the originating server. queries, err := parser.ParseQuery(*request.Query) diff --git a/src/server/server.go b/src/server/server.go index 15fc3d2dd67..f75d952392f 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -111,6 +111,7 @@ func (self *Server) ListenAndServe() error { if err != nil { panic(err) } + log.Info("Connection string changed successfully") } go self.ProtobufServer.ListenAndServe()