-
Notifications
You must be signed in to change notification settings - Fork 181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support connection lifetime for single client #727
base: main
Are you sure you want to change the base?
Conversation
@rueian Here is draft. Cloud you check your additional points on the discussion. There is no additional tests yet.
|
pipe.go
Outdated
@@ -1576,6 +1587,7 @@ func (p *pipe) Close() { | |||
} | |||
atomic.AddInt32(&p.waits, -1) | |||
atomic.AddInt32(&p.blcksig, -1) | |||
p.StopTimer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. I will fix it...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be removed.
pool.go
Outdated
@@ -59,6 +59,7 @@ retry: | |||
} | |||
} | |||
p.cond.L.Unlock() | |||
v.StopTimer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the timer is not stopped successfully, we need to acquire another connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that's right. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed 390e19b
pipe.go
Outdated
@@ -89,6 +91,10 @@ type pipe struct { | |||
recvs int32 | |||
r2ps bool // identify this pipe is used for resp2 pubsub or not | |||
noNoDelay bool | |||
lftm time.Duration // lifetime | |||
lftmMu sync.Mutex // guards lifetime timer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need the mutex and the bool flag?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed again, we don't need bool flag.
I thought that time.Reset and time.Stop need mutex when using <= go 1.22
. Maybe I've got it wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The source looks like it is thread-safe https://cs.opensource.google/go/go/+/refs/tags/go1.22.0:src/runtime/time.go;l=314.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, you're right. it looks like thread-safe. I will remove it.
I misread This cannot be done concurrent to other receives from the Timer's channel or other calls to the Timer's Stop method.
of https://pkg.go.dev/[email protected]#Timer.Stop . Sorry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we are using the AfterFunc
timer which has no channel associated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed f950c1e
31ffe91
to
88c8d7e
Compare
The rest is the implementation about retrying on singleclient. |
pipe.go
Outdated
@@ -1576,6 +1587,7 @@ func (p *pipe) Close() { | |||
} | |||
atomic.AddInt32(&p.waits, -1) | |||
atomic.AddInt32(&p.blcksig, -1) | |||
p.StopTimer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be removed.
I think we should use your original proposal and nothing to do with the retry handler. retry:
resp = c.conn.Do(ctx, cmd)
if resp.Error() == errConnExpired {
goto retry
}
if c.retry && cmd.IsReadOnly() && c.isRetryable(resp.Error(), ctx) {
... Because whenever an errConnExpired occurs, we know the connection is closed by ourselves, so it should be safe to retry immediately. |
@rueian Thanks. Surely we know the error and it's not good to show errConnExpired to outside when disabling retry too. Retry logic is almost done, just need to add that tests. |
Co-authored-by: Rueian <[email protected]>
client.go
Outdated
@@ -86,6 +90,13 @@ func (c *singleClient) DoMulti(ctx context.Context, multi ...Completed) (resps [ | |||
attempts := 1 | |||
retry: | |||
resps = c.conn.DoMulti(ctx, multi...).s | |||
if c.hasConnLftm { | |||
for _, resp := range resps { | |||
if resp.Error() == errConnExpired { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that errConnExpired
happens in the middle of DoMulti
? I am not sure, but If it is possible then we should not retry preceding requests that don't receive the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I think it's unlikely. Surely all responses have same error when changing p.state
.
I will change that like the following.
if resps[0].Error() == errConnExpired {
goto retry
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed c0c3657
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, could you leave a comment in the code to explain why it won't happen?
Background
Recently I noticed that request is unbalanced when its replica failover on memorystore for redis of GCP if the connection keeps. So I consider about connection lifetime to reconnect to redis endpoint because existing connection are not rerouted when a node reintroduced.
Here is the document about archtecture and connection balance manegement.
https://cloud.google.com/memorystore/docs/redis/about-read-replicas#architecture
https://cloud.google.com/memorystore/docs/redis/about-read-replicas#connection_balance_management
Ref: #725
Solution
Support connection lifetime for single client to reconnect fixed read endpoint.