Skip to content

Commit

Permalink
fix(kds): call CloseSend and exit a goroutine when sync fails to start (
Browse files Browse the repository at this point in the history
#7869)

Signed-off-by: Lukasz Dziedziak <[email protected]>
Co-authored-by: Jakub Dyszkiewicz <[email protected]>
  • Loading branch information
2 people authored and kumahq[bot] committed Sep 26, 2023
1 parent cbdfc7d commit 5041102
Showing 1 changed file with 40 additions and 10 deletions.
50 changes: 40 additions & 10 deletions pkg/kds/mux/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,26 @@ func (c *client) startGlobalToZoneSync(ctx context.Context, log logr.Logger, con
errorCh <- err
return
}
c.globalToZoneCb.OnGlobalToZoneSyncStarted(stream, errorCh)
<-stop
log.Info("Global to Zone Sync rpc stream stopped")
if err := stream.CloseSend(); err != nil {
errorCh <- errors.Wrap(err, "CloseSend returned an error")
processingErrorsCh := make(chan error)
c.globalToZoneCb.OnGlobalToZoneSyncStarted(stream, processingErrorsCh)
select {
case <-stop:
log.Info("Global to Zone Sync rpc stream stopped")
if err := stream.CloseSend(); err != nil {
errorCh <- errors.Wrap(err, "CloseSend returned an error")
}
case err := <-processingErrorsCh:
if status.Code(err) == codes.Unimplemented {
log.Error(err, "Global to Zone Sync rpc stream failed, because Global CP does not implement this rpc. Upgrade Global CP.")
// backwards compatibility. Do not rethrow error, so Admin RPC can still operate.
return
}
log.Error(err, "Global to Zone Sync rpc stream failed, will restart in background")
if err := stream.CloseSend(); err != nil {
log.Error(err, "CloseSend returned an error")
}
errorCh <- err
return
}
}

Expand All @@ -180,11 +195,26 @@ func (c *client) startZoneToGlobalSync(ctx context.Context, log logr.Logger, con
errorCh <- err
return
}
c.zoneToGlobalCb.OnZoneToGlobalSyncStarted(stream, errorCh)
<-stop
log.Info("Zone to Global Sync rpc stream stopped")
if err := stream.CloseSend(); err != nil {
errorCh <- errors.Wrap(err, "CloseSend returned an error")
processingErrorsCh := make(chan error)
c.zoneToGlobalCb.OnZoneToGlobalSyncStarted(stream, processingErrorsCh)
select {
case <-stop:
log.Info("Zone to Global Sync rpc stream stopped")
if err := stream.CloseSend(); err != nil {
errorCh <- errors.Wrap(err, "CloseSend returned an error")
}
case err := <-processingErrorsCh:
if status.Code(err) == codes.Unimplemented {
log.Error(err, "Zone to Global Sync rpc stream failed, because Global CP does not implement this rpc. Upgrade Global CP.")
// backwards compatibility. Do not rethrow error, so Admin RPC can still operate.
return
}
log.Error(err, "Zone to Global Sync rpc stream failed, will restart in background")
if err := stream.CloseSend(); err != nil {
log.Error(err, "CloseSend returned an error")
}
errorCh <- err
return
}
}

Expand Down

0 comments on commit 5041102

Please sign in to comment.