From 5041102ea73e285ecdee8b353221cdde954a1a48 Mon Sep 17 00:00:00 2001 From: Lukasz Dziedziak Date: Tue, 26 Sep 2023 15:07:46 +0200 Subject: [PATCH] fix(kds): call CloseSend and exit a goroutine when sync fails to start (#7869) Signed-off-by: Lukasz Dziedziak Co-authored-by: Jakub Dyszkiewicz --- pkg/kds/mux/client.go | 50 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/pkg/kds/mux/client.go b/pkg/kds/mux/client.go index 858a9a862f62..5a9a7e7404d4 100644 --- a/pkg/kds/mux/client.go +++ b/pkg/kds/mux/client.go @@ -164,11 +164,26 @@ func (c *client) startGlobalToZoneSync(ctx context.Context, log logr.Logger, con errorCh <- err return } - c.globalToZoneCb.OnGlobalToZoneSyncStarted(stream, errorCh) - <-stop - log.Info("Global to Zone Sync rpc stream stopped") - if err := stream.CloseSend(); err != nil { - errorCh <- errors.Wrap(err, "CloseSend returned an error") + processingErrorsCh := make(chan error) + c.globalToZoneCb.OnGlobalToZoneSyncStarted(stream, processingErrorsCh) + select { + case <-stop: + log.Info("Global to Zone Sync rpc stream stopped") + if err := stream.CloseSend(); err != nil { + errorCh <- errors.Wrap(err, "CloseSend returned an error") + } + case err := <-processingErrorsCh: + if status.Code(err) == codes.Unimplemented { + log.Error(err, "Global to Zone Sync rpc stream failed, because Global CP does not implement this rpc. Upgrade Global CP.") + // backwards compatibility. Do not rethrow error, so Admin RPC can still operate. + return + } + log.Error(err, "Global to Zone Sync rpc stream failed, will restart in background") + if err := stream.CloseSend(); err != nil { + log.Error(err, "CloseSend returned an error") + } + errorCh <- err + return } } @@ -180,11 +195,26 @@ func (c *client) startZoneToGlobalSync(ctx context.Context, log logr.Logger, con errorCh <- err return } - c.zoneToGlobalCb.OnZoneToGlobalSyncStarted(stream, errorCh) - <-stop - log.Info("Zone to Global Sync rpc stream stopped") - if err := stream.CloseSend(); err != nil { - errorCh <- errors.Wrap(err, "CloseSend returned an error") + processingErrorsCh := make(chan error) + c.zoneToGlobalCb.OnZoneToGlobalSyncStarted(stream, processingErrorsCh) + select { + case <-stop: + log.Info("Zone to Global Sync rpc stream stopped") + if err := stream.CloseSend(); err != nil { + errorCh <- errors.Wrap(err, "CloseSend returned an error") + } + case err := <-processingErrorsCh: + if status.Code(err) == codes.Unimplemented { + log.Error(err, "Zone to Global Sync rpc stream failed, because Global CP does not implement this rpc. Upgrade Global CP.") + // backwards compatibility. Do not rethrow error, so Admin RPC can still operate. + return + } + log.Error(err, "Zone to Global Sync rpc stream failed, will restart in background") + if err := stream.CloseSend(); err != nil { + log.Error(err, "CloseSend returned an error") + } + errorCh <- err + return } }