Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kds): better error handling (backport of #7868) #7878

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kds/mux/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (s *server) Start(stop <-chan struct{}) error {
}
}

// StreamMessage handle Mux messages for KDS V1. It's not used in KDS V2
func (s *server) StreamMessage(stream mesh_proto.MultiplexService_StreamMessageServer) error {
clientID, err := util.ClientIDFromIncomingCtx(stream.Context())
if err != nil {
Expand Down
89 changes: 89 additions & 0 deletions pkg/kds/mux/zone_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package mux

import (
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/core"
)

type FilterV2 interface {
InterceptServerStream(stream grpc.ServerStream) error
InterceptClientStream(stream grpc.ClientStream) error
}

type OnGlobalToZoneSyncConnectFunc func(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer, errorCh chan error)

func (f OnGlobalToZoneSyncConnectFunc) OnGlobalToZoneSyncConnect(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer, errorCh chan error) {
f(stream, errorCh)
}

type OnZoneToGlobalSyncConnectFunc func(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncServer, errorCh chan error)

func (f OnZoneToGlobalSyncConnectFunc) OnZoneToGlobalSyncConnect(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncServer, errorCh chan error) {
f(stream, errorCh)
}

var clientLog = core.Log.WithName("kds-delta-client")

type KDSSyncServiceServer struct {
globalToZoneCb OnGlobalToZoneSyncConnectFunc
zoneToGlobalCb OnZoneToGlobalSyncConnectFunc
filters []FilterV2
mesh_proto.UnimplementedKDSSyncServiceServer
}

func NewKDSSyncServiceServer(globalToZoneCb OnGlobalToZoneSyncConnectFunc, zoneToGlobalCb OnZoneToGlobalSyncConnectFunc, filters []FilterV2) *KDSSyncServiceServer {
return &KDSSyncServiceServer{
globalToZoneCb: globalToZoneCb,
zoneToGlobalCb: zoneToGlobalCb,
filters: filters,
}
}

var _ mesh_proto.KDSSyncServiceServer = &KDSSyncServiceServer{}

func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer) error {
for _, filter := range g.filters {
if err := filter.InterceptServerStream(stream); err != nil {
return errors.Wrap(err, "closing KDS stream following a callback error")
}
}
processingErrorsCh := make(chan error)
go g.globalToZoneCb.OnGlobalToZoneSyncConnect(stream, processingErrorsCh)
select {
case <-stream.Context().Done():
clientLog.Info("GlobalToZoneSync rpc stream stopped")
return nil
case err := <-processingErrorsCh:
if status.Code(err) == codes.Unimplemented {
return errors.Wrap(err, "GlobalToZoneSync rpc stream failed, because Global CP does not implement this rpc. Upgrade Global CP.")
}
clientLog.Error(err, "GlobalToZoneSync rpc stream failed prematurely, will restart in background")
return status.Error(codes.Internal, "stream failed")
}
}

func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncServer) error {
for _, filter := range g.filters {
if err := filter.InterceptServerStream(stream); err != nil {
return errors.Wrap(err, "closing KDS stream following a callback error")
}
}
processingErrorsCh := make(chan error)
go g.zoneToGlobalCb.OnZoneToGlobalSyncConnect(stream, processingErrorsCh)
select {
case <-stream.Context().Done():
clientLog.Info("ZoneToGlobalSync rpc stream stopped")
return nil
case err := <-processingErrorsCh:
if status.Code(err) == codes.Unimplemented {
return errors.Wrap(err, "ZoneToGlobalSync rpc stream failed, because Global CP does not implement this rpc. Upgrade Global CP.")
}
clientLog.Error(err, "ZoneToGlobalSync rpc stream failed prematurely, will restart in background")
return status.Error(codes.Internal, "stream failed")
}
}
49 changes: 46 additions & 3 deletions pkg/kds/service/server.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
package service

import (
<<<<<<< HEAD
=======
"context"
"fmt"
"io"
"time"

"github.com/sethvargo/go-retry"
>>>>>>> 3ab585c92 (feat(kds): better error handling (#7868))
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/kds/util"
util_grpc "github.com/kumahq/kuma/pkg/util/grpc"
)

var log = core.Log.WithName("kds-service")

type GlobalKDSServiceServer struct {
envoyAdminRPCs EnvoyAdminRPCs
mesh_proto.UnimplementedGlobalKDSServiceServer
Expand Down Expand Up @@ -48,19 +61,49 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC(
) error {
zone, err := util.ClientIDFromIncomingCtx(stream.Context())
if err != nil {
return err
return status.Error(codes.InvalidArgument, err.Error())
}
<<<<<<< HEAD
core.Log.Info("Envoy Admin RPC stream started", "rpc", rpcName, "zone", zone)
rpc.ClientConnected(zone, stream)
defer rpc.ClientDisconnected(zone)
=======
clientID := ClientID(stream.Context(), zone)
logger := log.WithValues("rpc", rpcName, "clientID", clientID)
logger.Info("Envoy Admin RPC stream started")
rpc.ClientConnected(clientID, stream)
if err := g.storeStreamConnection(stream.Context(), zone, rpcName, g.instanceID); err != nil {
logger.Error(err, "could not store stream connection")
return status.Error(codes.Internal, "could not store stream connection")
}
defer func() {
rpc.ClientDisconnected(clientID)
// stream.Context() is cancelled here, we need to use another ctx
ctx := multitenant.CopyIntoCtx(stream.Context(), context.Background())
if err := g.storeStreamConnection(ctx, zone, rpcName, ""); err != nil {
logger.Error(err, "could not clear stream connection information in ZoneInsight")
}
}()
>>>>>>> 3ab585c92 (feat(kds): better error handling (#7868))
for {
resp, err := recv()
if err != nil {
return err
if err == io.EOF {
return nil
}
<<<<<<< HEAD
core.Log.V(1).Info("Envoy Admin RPC response received", "rpc", rpc, "zone", zone, "requestId", resp.GetRequestId())
if err := rpc.ResponseReceived(zone, resp); err != nil {
return err
=======
if err != nil {
logger.Error(err, "could not receive a message")
return status.Error(codes.Internal, "could not receive a message")
}
logger.V(1).Info("Envoy Admin RPC response received", "requestId", resp.GetRequestId())
if err := rpc.ResponseReceived(clientID, resp); err != nil {
logger.Error(err, "could not mark the response as received")
return status.Error(codes.InvalidArgument, "could not mark the response as received")
>>>>>>> 3ab585c92 (feat(kds): better error handling (#7868))
}
}
}
46 changes: 46 additions & 0 deletions pkg/kds/v2/server/error_recorder_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package server

import (
"io"
"sync"

envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

// ErrorRecorderStream is a DeltaStream that records an error
// We need this because [email protected]/pkg/server/delta/v3/server.go:190 swallows an error on Recv()
type ErrorRecorderStream interface {
stream.DeltaStream
Err() error
}

type errorRecorderStream struct {
stream.DeltaStream
err error
sync.Mutex
}

var _ stream.DeltaStream = &errorRecorderStream{}

func NewErrorRecorderStream(s stream.DeltaStream) ErrorRecorderStream {
return &errorRecorderStream{
DeltaStream: s,
}
}

func (e *errorRecorderStream) Recv() (*envoy_sd.DeltaDiscoveryRequest, error) {
res, err := e.DeltaStream.Recv()
if err != nil && err != io.EOF { // do not consider "end of stream" an error
e.Lock()
e.err = err
e.Unlock()
}
return res, err
}

func (e *errorRecorderStream) Err() error {
e.Lock()
defer e.Unlock()
return e.err
}
59 changes: 59 additions & 0 deletions pkg/kds/v2/server/kds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package server

import (
"context"

envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/delta/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
envoy_server "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"github.com/go-logr/logr"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
)

// Server is common for global and zone
type Server interface {
ZoneToGlobal(stream.DeltaStream) error
mesh_proto.KDSSyncServiceServer
}

func NewServer(config envoy_cache.Cache, callbacks envoy_server.Callbacks, log logr.Logger) Server {
deltaServer := delta.NewServer(context.Background(), config, callbacks)
return &server{Server: deltaServer}
}

var _ Server = &server{}

type server struct {
delta.Server
mesh_proto.UnimplementedKDSSyncServiceServer
}

func (s *server) GlobalToZoneSync(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer) error {
errorStream := NewErrorRecorderStream(stream)
err := s.Server.DeltaStreamHandler(errorStream, "")
if err == nil {
err = errorStream.Err()
}
return err
}

// Delta xDS server expects `KDSSyncService_ZoneToGlobalSyncServer` to have Send(*v3.DeltaDiscoveryResponse)
// and Recv() (*v3.DeltaDiscoveryRequest, error) but proto has different definition to make it works for
// synchronization from Zone to Global.
func (s *server) ZoneToGlobalSync(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncServer) error {
panic("not implemented")
}

// ZoneToGlobal is the custom implementation for `ZoneToGlobalSync` to support running delta server
// on zone while kds.proto has different definition of `KDSSyncService_ZoneToGlobalSyncServer` then
// expected by delta xDS server.
func (s *server) ZoneToGlobal(stream stream.DeltaStream) error {
errorStream := NewErrorRecorderStream(stream)
err := s.Server.DeltaStreamHandler(errorStream, "")
if err == nil {
err = errorStream.Err()
}
return err
}
64 changes: 64 additions & 0 deletions pkg/kds/v2/server/streamwrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package server

import (
"context"

envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
"google.golang.org/grpc/metadata"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
)

type ServerStream interface {
stream.DeltaStream
}

type serverStream struct {
stream mesh_proto.KDSSyncService_ZoneToGlobalSyncClient
}

// NewServerStream converts client stream to a server's DeltaStream, so it can be used in DeltaStreamHandler
func NewServerStream(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncClient) ServerStream {
s := &serverStream{
stream: stream,
}
return s
}

func (k *serverStream) Send(response *envoy_sd.DeltaDiscoveryResponse) error {
err := k.stream.Send(response)
return err
}

func (k *serverStream) Recv() (*envoy_sd.DeltaDiscoveryRequest, error) {
res, err := k.stream.Recv()
if err != nil {
return nil, err
}
return res, nil
}

func (k *serverStream) SetHeader(metadata.MD) error {
panic("not implemented")
}

func (k *serverStream) SendHeader(metadata.MD) error {
panic("not implemented")
}

func (k *serverStream) SetTrailer(metadata.MD) {
panic("not implemented")
}

func (k *serverStream) Context() context.Context {
return k.stream.Context()
}

func (k *serverStream) SendMsg(m interface{}) error {
panic("not implemented")
}

func (k *serverStream) RecvMsg(m interface{}) error {
panic("not implemented")
}