-
Notifications
You must be signed in to change notification settings - Fork 728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
mcs: fix watch primary address revision and update cache when meets not leader #6279
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
Skipping CI for Draft Pull Request. |
server/server.go
Outdated
revision = wresp.CompactRevision | ||
break | ||
} | ||
if wresp.Canceled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that, you have forgotten your comment in my pr and didn't check my reply :)
At the same place, you asked "Do we need to solve other errors?", I replied:
I checked the code in etcd watch.go. There is no other errors to solve, but I do think here using wrep.Err() != nil is better than using wresp.Canceled, because the former can accommodate future etcd changes.
Etcd Watch() returns three types of errors -- closed error, compacted error and cancelled error. closed error is captured by range watchChan, compacted error is captured by wresp.CompactRevision != 0, and cancelled error is captured here. Etcd Watch() itself underlying will retry Watch() for all other errors. Using wrep.Err() != nil is better than using wresp.Canceled.
Changed to using wrep.Err() != nil here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, I forgot about it when the copilot finished
server/server.go
Outdated
revision = wresp.CompactRevision | ||
break | ||
} | ||
if wresp.Canceled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change made some improvement, but mightn't be enough. When wresp.Canceled(), it doesn't mean context has been cancelled, so if the primary address loop returns here but the API service is still serving, we'll have improper function. Do we need to try to recreate Watcher and re-watch in a endless loop until context is cancelled, as what I do in KeyspaceGroupManager?
select {
case reqc <- wr:
ok = true
case <-wr.ctx.Done():
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
return w.Watch(ctx, key, opts...)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
server/server.go
Outdated
@@ -1726,7 +1726,7 @@ func (s *Server) watchServicePrimaryAddrLoop(serviceName string) { | |||
log.Info("start to watch", zap.String("service-key", serviceKey)) | |||
|
|||
primary := &tsopb.Participant{} | |||
ok, rev, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary) | |||
ok, revision, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we only need to watch keyspace group 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, serviceKey := fmt.Sprintf("/ms/%d/%s/%s/%s", s.clusterID, serviceName, fmt.Sprintf("%05d", 0), "primary")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for the change having improved things, though we might still need more improvements.
@@ -1737,15 +1737,26 @@ func (s *Server) watchServicePrimaryAddrLoop(serviceName string) { | |||
} else { | |||
log.Warn("service primary addr doesn't exist", zap.String("service-key", serviceKey)) | |||
} | |||
watcher := clientv3.NewWatcher(s.client) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are many codes with the same logic, the only difference is the key. How about abstracting a function for them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I will try it later.
Signed-off-by: lhy1024 <[email protected]>
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## master #6279 +/- ##
==========================================
- Coverage 75.16% 75.04% -0.12%
==========================================
Files 404 404
Lines 39860 39913 +53
==========================================
- Hits 29961 29954 -7
- Misses 7282 7332 +50
- Partials 2617 2627 +10
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 18 files with indirect coverage changes Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report in Codecov by Sentry. |
Signed-off-by: lhy1024 <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is one more comment which might need you to check and confirm.
case <-ctx.Done(): | ||
return revision, nil | ||
case <-s.updateServicePrimaryAddrCh: | ||
revision, err = s.updateServicePrimaryAddr(serviceName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most likely even after we update the service primary address, we still have one more problem -- s.clientConns stores the forwarded hosts' grpc.ClientConn. We never update the broken connections. if a forwarded host's connection broke,e.g., the forwarded host restarted and broken the existing connection, we'll retrieve the broken connection for this forwarded host continuously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will try to create a new connection automatically when the existed connection is closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
pkg/utils/tsoutil/tso_dispatcher.go
Outdated
if len(updateServicePrimaryAddrChs) > 0 { | ||
if strings.Contains(err.Error(), errs.NotLeaderErr) || strings.Contains(err.Error(), errs.MismatchLeaderErr) { | ||
updateServicePrimaryAddrChs[0] <- struct{}{} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest putting this part into a function outside the loop, which may help the compiler to do some inline optimization.
@@ -224,7 +224,7 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { | |||
} | |||
|
|||
tsoRequest := tsoutil.NewPDProtoRequest(forwardedHost, clientConn, request, stream) | |||
s.tsoDispatcher.DispatchRequest(ctx, tsoRequest, tsoProtoFactory, doneCh, errCh) | |||
s.tsoDispatcher.DispatchRequest(ctx, tsoRequest, tsoProtoFactory, doneCh, errCh, s.updateServicePrimaryAddrCh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that the channel is always passed to the function, then why use an optional parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DispatchRequest
is also used by tso server, tso server is no needed to watch api key
server/server.go
Outdated
zap.Time("retry-at", time.Now().Add(watchKEtcdChangeRetryInterval)), | ||
zap.Error(err)) | ||
revision = nextRevision | ||
time.Sleep(watchKEtcdChangeRetryInterval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest using a ticker to select rather than sleeping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, we need to wait for a retry here, instead of periodically going to use tick
server/server.go
Outdated
primary := &tsopb.Participant{} | ||
ok, revision, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary) | ||
listenUrls := primary.GetListenUrls() | ||
if !ok || err != nil || len(listenUrls) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two cases we may return 0, nil
and it breaks the retry loop, is it expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix
server/server.go
Outdated
|
||
for { | ||
WatchChan: | ||
watchChan := watcher.Watch(s.serverLoopCtx, serviceKey, clientv3.WithPrefix(), clientv3.WithRev(revision)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need the prefix?
Signed-off-by: lhy1024 <[email protected]>
Signed-off-by: lhy1024 <[email protected]>
Signed-off-by: lhy1024 <[email protected]>
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-time.After(retryIntervalGetServicePrimary): | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call s.updateServicePrimaryAddr(serviceName)
at the beginning of the loop? Otherwise, we have to wait for a retryIntervalGetServicePrimary
before the first time update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a comment still left: https://github.com/tikv/pd/pull/6279/files#r1161400175
server/server.go
Outdated
|
||
// SetServicePrimaryAddr sets the primary address directly. | ||
// Note: This function is only used for test. | ||
func (s *Server) SetServicePrimaryAddr(serviceName, addr string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do we use it?
Signed-off-by: lhy1024 <[email protected]>
Signed-off-by: lhy1024 <[email protected]>
/merge |
@lhy1024: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
This pull request has been accepted and is ready to merge. Commit hash: c5c6b5d
|
@lhy1024: Your PR was out of date, I have automatically updated it for you. If the CI test fails, you just re-trigger the test that failed and the bot will merge the PR for you after the CI passes. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
…ot leader (tikv#6279) (tikv#64) * mcs: fix watch primary address revision Signed-off-by: lhy1024 <[email protected]> * add update cache when meets not leader Signed-off-by: lhy1024 <[email protected]> --------- Signed-off-by: lhy1024 <[email protected]>
ref #5895, ref #6279, close #6289 Signed-off-by: lhy1024 <[email protected]>
ref tikv#5895, ref tikv#6279, close tikv#6289 Signed-off-by: lhy1024 <[email protected]>
ref tikv#5895, ref tikv#6279, close tikv#6289 Signed-off-by: lhy1024 <[email protected]> Signed-off-by: zeminzhou <[email protected]>
* mcs: update client when meet transport is closing (tikv#6341) * mcs: update client when meet transport is closing Signed-off-by: lhy1024 <[email protected]> * address comments Signed-off-by: lhy1024 <[email protected]> * add retry Signed-off-by: lhy1024 <[email protected]> --------- Signed-off-by: lhy1024 <[email protected]> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> Signed-off-by: lhy1024 <[email protected]> * mcs: fix watch primary address revision and update cache when meets not leader (tikv#6279) ref tikv#5895 Signed-off-by: lhy1024 <[email protected]> Co-authored-by: Ti Chi Robot <[email protected]> Signed-off-by: lhy1024 <[email protected]> --------- Signed-off-by: lhy1024 <[email protected]> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> Co-authored-by: Ti Chi Robot <[email protected]>
ref tikv#5895, ref tikv#6279, close tikv#6289 Signed-off-by: lhy1024 <[email protected]>
Signed-off-by: lhy1024 <[email protected]>
What problem does this PR solve?
Issue Number: Ref #5895.
What is changed and how does it work?
Check List
Tests
Release note