From 1c31d7e072d682e472a95bbb298326c5650ecbb7 Mon Sep 17 00:00:00 2001 From: Jakub Dyszkiewicz Date: Tue, 26 Sep 2023 13:52:51 +0200 Subject: [PATCH] feat(kds): better error handling (#7868) Signed-off-by: Jakub Dyszkiewicz --- pkg/kds/mux/server.go | 1 + pkg/kds/mux/zone_sync.go | 89 ++++++++++++++++++++++ pkg/kds/service/server.go | 49 +++++++++++- pkg/kds/v2/server/error_recorder_stream.go | 46 +++++++++++ pkg/kds/v2/server/kds.go | 59 ++++++++++++++ pkg/kds/v2/server/streamwrapper.go | 64 ++++++++++++++++ 6 files changed, 305 insertions(+), 3 deletions(-) create mode 100644 pkg/kds/mux/zone_sync.go create mode 100644 pkg/kds/v2/server/error_recorder_stream.go create mode 100644 pkg/kds/v2/server/kds.go create mode 100644 pkg/kds/v2/server/streamwrapper.go diff --git a/pkg/kds/mux/server.go b/pkg/kds/mux/server.go index d6f4ecf7a079..f7b4ca9a61b4 100644 --- a/pkg/kds/mux/server.go +++ b/pkg/kds/mux/server.go @@ -139,6 +139,7 @@ func (s *server) Start(stop <-chan struct{}) error { } } +// StreamMessage handle Mux messages for KDS V1. It's not used in KDS V2 func (s *server) StreamMessage(stream mesh_proto.MultiplexService_StreamMessageServer) error { clientID, err := util.ClientIDFromIncomingCtx(stream.Context()) if err != nil { diff --git a/pkg/kds/mux/zone_sync.go b/pkg/kds/mux/zone_sync.go new file mode 100644 index 000000000000..f605bc4d95e2 --- /dev/null +++ b/pkg/kds/mux/zone_sync.go @@ -0,0 +1,89 @@ +package mux + +import ( + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" + "github.com/kumahq/kuma/pkg/core" +) + +type FilterV2 interface { + InterceptServerStream(stream grpc.ServerStream) error + InterceptClientStream(stream grpc.ClientStream) error +} + +type OnGlobalToZoneSyncConnectFunc func(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer, errorCh chan error) + +func (f OnGlobalToZoneSyncConnectFunc) OnGlobalToZoneSyncConnect(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer, errorCh chan error) { + f(stream, errorCh) +} + +type OnZoneToGlobalSyncConnectFunc func(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncServer, errorCh chan error) + +func (f OnZoneToGlobalSyncConnectFunc) OnZoneToGlobalSyncConnect(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncServer, errorCh chan error) { + f(stream, errorCh) +} + +var clientLog = core.Log.WithName("kds-delta-client") + +type KDSSyncServiceServer struct { + globalToZoneCb OnGlobalToZoneSyncConnectFunc + zoneToGlobalCb OnZoneToGlobalSyncConnectFunc + filters []FilterV2 + mesh_proto.UnimplementedKDSSyncServiceServer +} + +func NewKDSSyncServiceServer(globalToZoneCb OnGlobalToZoneSyncConnectFunc, zoneToGlobalCb OnZoneToGlobalSyncConnectFunc, filters []FilterV2) *KDSSyncServiceServer { + return &KDSSyncServiceServer{ + globalToZoneCb: globalToZoneCb, + zoneToGlobalCb: zoneToGlobalCb, + filters: filters, + } +} + +var _ mesh_proto.KDSSyncServiceServer = &KDSSyncServiceServer{} + +func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer) error { + for _, filter := range g.filters { + if err := filter.InterceptServerStream(stream); err != nil { + return errors.Wrap(err, "closing KDS stream following a callback error") + } + } + processingErrorsCh := make(chan error) + go g.globalToZoneCb.OnGlobalToZoneSyncConnect(stream, processingErrorsCh) + select { + case <-stream.Context().Done(): + clientLog.Info("GlobalToZoneSync rpc stream stopped") + return nil + case err := <-processingErrorsCh: + if status.Code(err) == codes.Unimplemented { + return errors.Wrap(err, "GlobalToZoneSync rpc stream failed, because Global CP does not implement this rpc. Upgrade Global CP.") + } + clientLog.Error(err, "GlobalToZoneSync rpc stream failed prematurely, will restart in background") + return status.Error(codes.Internal, "stream failed") + } +} + +func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncServer) error { + for _, filter := range g.filters { + if err := filter.InterceptServerStream(stream); err != nil { + return errors.Wrap(err, "closing KDS stream following a callback error") + } + } + processingErrorsCh := make(chan error) + go g.zoneToGlobalCb.OnZoneToGlobalSyncConnect(stream, processingErrorsCh) + select { + case <-stream.Context().Done(): + clientLog.Info("ZoneToGlobalSync rpc stream stopped") + return nil + case err := <-processingErrorsCh: + if status.Code(err) == codes.Unimplemented { + return errors.Wrap(err, "ZoneToGlobalSync rpc stream failed, because Global CP does not implement this rpc. Upgrade Global CP.") + } + clientLog.Error(err, "ZoneToGlobalSync rpc stream failed prematurely, will restart in background") + return status.Error(codes.Internal, "stream failed") + } +} diff --git a/pkg/kds/service/server.go b/pkg/kds/service/server.go index bc21271994a1..f3895f631549 100644 --- a/pkg/kds/service/server.go +++ b/pkg/kds/service/server.go @@ -1,7 +1,18 @@ package service import ( +<<<<<<< HEAD +======= + "context" + "fmt" + "io" + "time" + + "github.com/sethvargo/go-retry" +>>>>>>> 3ab585c92 (feat(kds): better error handling (#7868)) "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" "github.com/kumahq/kuma/pkg/core" @@ -9,6 +20,8 @@ import ( util_grpc "github.com/kumahq/kuma/pkg/util/grpc" ) +var log = core.Log.WithName("kds-service") + type GlobalKDSServiceServer struct { envoyAdminRPCs EnvoyAdminRPCs mesh_proto.UnimplementedGlobalKDSServiceServer @@ -48,19 +61,49 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( ) error { zone, err := util.ClientIDFromIncomingCtx(stream.Context()) if err != nil { - return err + return status.Error(codes.InvalidArgument, err.Error()) } +<<<<<<< HEAD core.Log.Info("Envoy Admin RPC stream started", "rpc", rpcName, "zone", zone) rpc.ClientConnected(zone, stream) defer rpc.ClientDisconnected(zone) +======= + clientID := ClientID(stream.Context(), zone) + logger := log.WithValues("rpc", rpcName, "clientID", clientID) + logger.Info("Envoy Admin RPC stream started") + rpc.ClientConnected(clientID, stream) + if err := g.storeStreamConnection(stream.Context(), zone, rpcName, g.instanceID); err != nil { + logger.Error(err, "could not store stream connection") + return status.Error(codes.Internal, "could not store stream connection") + } + defer func() { + rpc.ClientDisconnected(clientID) + // stream.Context() is cancelled here, we need to use another ctx + ctx := multitenant.CopyIntoCtx(stream.Context(), context.Background()) + if err := g.storeStreamConnection(ctx, zone, rpcName, ""); err != nil { + logger.Error(err, "could not clear stream connection information in ZoneInsight") + } + }() +>>>>>>> 3ab585c92 (feat(kds): better error handling (#7868)) for { resp, err := recv() - if err != nil { - return err + if err == io.EOF { + return nil } +<<<<<<< HEAD core.Log.V(1).Info("Envoy Admin RPC response received", "rpc", rpc, "zone", zone, "requestId", resp.GetRequestId()) if err := rpc.ResponseReceived(zone, resp); err != nil { return err +======= + if err != nil { + logger.Error(err, "could not receive a message") + return status.Error(codes.Internal, "could not receive a message") + } + logger.V(1).Info("Envoy Admin RPC response received", "requestId", resp.GetRequestId()) + if err := rpc.ResponseReceived(clientID, resp); err != nil { + logger.Error(err, "could not mark the response as received") + return status.Error(codes.InvalidArgument, "could not mark the response as received") +>>>>>>> 3ab585c92 (feat(kds): better error handling (#7868)) } } } diff --git a/pkg/kds/v2/server/error_recorder_stream.go b/pkg/kds/v2/server/error_recorder_stream.go new file mode 100644 index 000000000000..e108d98c0de9 --- /dev/null +++ b/pkg/kds/v2/server/error_recorder_stream.go @@ -0,0 +1,46 @@ +package server + +import ( + "io" + "sync" + + envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" +) + +// ErrorRecorderStream is a DeltaStream that records an error +// We need this because go-control-plane@v0.11.1/pkg/server/delta/v3/server.go:190 swallows an error on Recv() +type ErrorRecorderStream interface { + stream.DeltaStream + Err() error +} + +type errorRecorderStream struct { + stream.DeltaStream + err error + sync.Mutex +} + +var _ stream.DeltaStream = &errorRecorderStream{} + +func NewErrorRecorderStream(s stream.DeltaStream) ErrorRecorderStream { + return &errorRecorderStream{ + DeltaStream: s, + } +} + +func (e *errorRecorderStream) Recv() (*envoy_sd.DeltaDiscoveryRequest, error) { + res, err := e.DeltaStream.Recv() + if err != nil && err != io.EOF { // do not consider "end of stream" an error + e.Lock() + e.err = err + e.Unlock() + } + return res, err +} + +func (e *errorRecorderStream) Err() error { + e.Lock() + defer e.Unlock() + return e.err +} diff --git a/pkg/kds/v2/server/kds.go b/pkg/kds/v2/server/kds.go new file mode 100644 index 000000000000..cb0961c9364a --- /dev/null +++ b/pkg/kds/v2/server/kds.go @@ -0,0 +1,59 @@ +package server + +import ( + "context" + + envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/delta/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" + envoy_server "github.com/envoyproxy/go-control-plane/pkg/server/v3" + "github.com/go-logr/logr" + + mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" +) + +// Server is common for global and zone +type Server interface { + ZoneToGlobal(stream.DeltaStream) error + mesh_proto.KDSSyncServiceServer +} + +func NewServer(config envoy_cache.Cache, callbacks envoy_server.Callbacks, log logr.Logger) Server { + deltaServer := delta.NewServer(context.Background(), config, callbacks) + return &server{Server: deltaServer} +} + +var _ Server = &server{} + +type server struct { + delta.Server + mesh_proto.UnimplementedKDSSyncServiceServer +} + +func (s *server) GlobalToZoneSync(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer) error { + errorStream := NewErrorRecorderStream(stream) + err := s.Server.DeltaStreamHandler(errorStream, "") + if err == nil { + err = errorStream.Err() + } + return err +} + +// Delta xDS server expects `KDSSyncService_ZoneToGlobalSyncServer` to have Send(*v3.DeltaDiscoveryResponse) +// and Recv() (*v3.DeltaDiscoveryRequest, error) but proto has different definition to make it works for +// synchronization from Zone to Global. +func (s *server) ZoneToGlobalSync(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncServer) error { + panic("not implemented") +} + +// ZoneToGlobal is the custom implementation for `ZoneToGlobalSync` to support running delta server +// on zone while kds.proto has different definition of `KDSSyncService_ZoneToGlobalSyncServer` then +// expected by delta xDS server. +func (s *server) ZoneToGlobal(stream stream.DeltaStream) error { + errorStream := NewErrorRecorderStream(stream) + err := s.Server.DeltaStreamHandler(errorStream, "") + if err == nil { + err = errorStream.Err() + } + return err +} diff --git a/pkg/kds/v2/server/streamwrapper.go b/pkg/kds/v2/server/streamwrapper.go new file mode 100644 index 000000000000..87c9c3d4b119 --- /dev/null +++ b/pkg/kds/v2/server/streamwrapper.go @@ -0,0 +1,64 @@ +package server + +import ( + "context" + + envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" + "google.golang.org/grpc/metadata" + + mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" +) + +type ServerStream interface { + stream.DeltaStream +} + +type serverStream struct { + stream mesh_proto.KDSSyncService_ZoneToGlobalSyncClient +} + +// NewServerStream converts client stream to a server's DeltaStream, so it can be used in DeltaStreamHandler +func NewServerStream(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncClient) ServerStream { + s := &serverStream{ + stream: stream, + } + return s +} + +func (k *serverStream) Send(response *envoy_sd.DeltaDiscoveryResponse) error { + err := k.stream.Send(response) + return err +} + +func (k *serverStream) Recv() (*envoy_sd.DeltaDiscoveryRequest, error) { + res, err := k.stream.Recv() + if err != nil { + return nil, err + } + return res, nil +} + +func (k *serverStream) SetHeader(metadata.MD) error { + panic("not implemented") +} + +func (k *serverStream) SendHeader(metadata.MD) error { + panic("not implemented") +} + +func (k *serverStream) SetTrailer(metadata.MD) { + panic("not implemented") +} + +func (k *serverStream) Context() context.Context { + return k.stream.Context() +} + +func (k *serverStream) SendMsg(m interface{}) error { + panic("not implemented") +} + +func (k *serverStream) RecvMsg(m interface{}) error { + panic("not implemented") +}