diff --git a/pkg/kds/mux/server.go b/pkg/kds/mux/server.go index 91f76419aa6d..e0b6efc8c7ba 100644 --- a/pkg/kds/mux/server.go +++ b/pkg/kds/mux/server.go @@ -164,6 +164,7 @@ func (s *server) Start(stop <-chan struct{}) error { } } +// StreamMessage handle Mux messages for KDS V1. It's not used in KDS V2 func (s *server) StreamMessage(stream mesh_proto.MultiplexService_StreamMessageServer) error { clientID, err := util.ClientIDFromIncomingCtx(stream.Context()) if err != nil { diff --git a/pkg/kds/mux/zone_sync.go b/pkg/kds/mux/zone_sync.go index 1caa72b4750e..f605bc4d95e2 100644 --- a/pkg/kds/mux/zone_sync.go +++ b/pkg/kds/mux/zone_sync.go @@ -62,7 +62,8 @@ func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService if status.Code(err) == codes.Unimplemented { return errors.Wrap(err, "GlobalToZoneSync rpc stream failed, because Global CP does not implement this rpc. Upgrade Global CP.") } - return errors.Wrap(err, "GlobalToZoneSync rpc stream failed prematurely, will restart in background") + clientLog.Error(err, "GlobalToZoneSync rpc stream failed prematurely, will restart in background") + return status.Error(codes.Internal, "stream failed") } } @@ -82,6 +83,7 @@ func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService if status.Code(err) == codes.Unimplemented { return errors.Wrap(err, "ZoneToGlobalSync rpc stream failed, because Global CP does not implement this rpc. Upgrade Global CP.") } - return errors.Wrap(err, "ZoneToGlobalSync rpc stream failed prematurely, will restart in background") + clientLog.Error(err, "ZoneToGlobalSync rpc stream failed prematurely, will restart in background") + return status.Error(codes.Internal, "stream failed") } } diff --git a/pkg/kds/service/server.go b/pkg/kds/service/server.go index 68dcffde461b..49c6b9c0c783 100644 --- a/pkg/kds/service/server.go +++ b/pkg/kds/service/server.go @@ -3,10 +3,13 @@ package service import ( "context" "fmt" + "io" "time" "github.com/sethvargo/go-retry" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" "github.com/kumahq/kuma/api/system/v1alpha1" @@ -20,6 +23,8 @@ import ( util_grpc "github.com/kumahq/kuma/pkg/util/grpc" ) +var log = core.Log.WithName("kds-service") + type GlobalKDSServiceServer struct { envoyAdminRPCs EnvoyAdminRPCs resManager manager.ResourceManager @@ -67,30 +72,37 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( ) error { zone, err := util.ClientIDFromIncomingCtx(stream.Context()) if err != nil { - return err + return status.Error(codes.InvalidArgument, err.Error()) } clientID := ClientID(stream.Context(), zone) - core.Log.Info("Envoy Admin RPC stream started", "rpc", rpcName, "clientID", clientID) + logger := log.WithValues("rpc", rpcName, "clientID", clientID) + logger.Info("Envoy Admin RPC stream started") rpc.ClientConnected(clientID, stream) if err := g.storeStreamConnection(stream.Context(), zone, rpcName, g.instanceID); err != nil { - return err + logger.Error(err, "could not store stream connection") + return status.Error(codes.Internal, "could not store stream connection") } defer func() { rpc.ClientDisconnected(clientID) // stream.Context() is cancelled here, we need to use another ctx ctx := multitenant.CopyIntoCtx(stream.Context(), context.Background()) if err := g.storeStreamConnection(ctx, zone, rpcName, ""); err != nil { - core.Log.Error(err, "could not clear stream connection information in ZoneInsight", "rpc", rpcName, "clientID", clientID, "rpc", rpcName) + logger.Error(err, "could not clear stream connection information in ZoneInsight") } }() for { resp, err := recv() + if err == io.EOF { + return nil + } if err != nil { - return err + logger.Error(err, "could not receive a message") + return status.Error(codes.Internal, "could not receive a message") } - core.Log.V(1).Info("Envoy Admin RPC response received", "rpc", rpc, "clientID", clientID, "requestId", resp.GetRequestId()) + logger.V(1).Info("Envoy Admin RPC response received", "requestId", resp.GetRequestId()) if err := rpc.ResponseReceived(clientID, resp); err != nil { - return err + logger.Error(err, "could not mark the response as received") + return status.Error(codes.InvalidArgument, "could not mark the response as received") } } } diff --git a/pkg/kds/v2/server/error_recorder_stream.go b/pkg/kds/v2/server/error_recorder_stream.go new file mode 100644 index 000000000000..e108d98c0de9 --- /dev/null +++ b/pkg/kds/v2/server/error_recorder_stream.go @@ -0,0 +1,46 @@ +package server + +import ( + "io" + "sync" + + envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" +) + +// ErrorRecorderStream is a DeltaStream that records an error +// We need this because go-control-plane@v0.11.1/pkg/server/delta/v3/server.go:190 swallows an error on Recv() +type ErrorRecorderStream interface { + stream.DeltaStream + Err() error +} + +type errorRecorderStream struct { + stream.DeltaStream + err error + sync.Mutex +} + +var _ stream.DeltaStream = &errorRecorderStream{} + +func NewErrorRecorderStream(s stream.DeltaStream) ErrorRecorderStream { + return &errorRecorderStream{ + DeltaStream: s, + } +} + +func (e *errorRecorderStream) Recv() (*envoy_sd.DeltaDiscoveryRequest, error) { + res, err := e.DeltaStream.Recv() + if err != nil && err != io.EOF { // do not consider "end of stream" an error + e.Lock() + e.err = err + e.Unlock() + } + return res, err +} + +func (e *errorRecorderStream) Err() error { + e.Lock() + defer e.Unlock() + return e.err +} diff --git a/pkg/kds/v2/server/kds.go b/pkg/kds/v2/server/kds.go index 0507dbb8df0c..cb0961c9364a 100644 --- a/pkg/kds/v2/server/kds.go +++ b/pkg/kds/v2/server/kds.go @@ -31,7 +31,12 @@ type server struct { } func (s *server) GlobalToZoneSync(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer) error { - return s.Server.DeltaStreamHandler(stream, "") + errorStream := NewErrorRecorderStream(stream) + err := s.Server.DeltaStreamHandler(errorStream, "") + if err == nil { + err = errorStream.Err() + } + return err } // Delta xDS server expects `KDSSyncService_ZoneToGlobalSyncServer` to have Send(*v3.DeltaDiscoveryResponse) @@ -45,5 +50,10 @@ func (s *server) ZoneToGlobalSync(stream mesh_proto.KDSSyncService_ZoneToGlobalS // on zone while kds.proto has different definition of `KDSSyncService_ZoneToGlobalSyncServer` then // expected by delta xDS server. func (s *server) ZoneToGlobal(stream stream.DeltaStream) error { - return s.Server.DeltaStreamHandler(stream, "") + errorStream := NewErrorRecorderStream(stream) + err := s.Server.DeltaStreamHandler(errorStream, "") + if err == nil { + err = errorStream.Err() + } + return err } diff --git a/pkg/kds/v2/server/streamwrapper.go b/pkg/kds/v2/server/streamwrapper.go index 4c9772451ea8..87c9c3d4b119 100644 --- a/pkg/kds/v2/server/streamwrapper.go +++ b/pkg/kds/v2/server/streamwrapper.go @@ -18,6 +18,7 @@ type serverStream struct { stream mesh_proto.KDSSyncService_ZoneToGlobalSyncClient } +// NewServerStream converts client stream to a server's DeltaStream, so it can be used in DeltaStreamHandler func NewServerStream(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncClient) ServerStream { s := &serverStream{ stream: stream,