Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gateway: use keyspace events when buffering requests #8700

Merged
merged 9 commits into from
Sep 2, 2021

Conversation

vmg
Copy link
Collaborator

@vmg vmg commented Aug 27, 2021

Description

Fixes #8462 (lol, hopefully).
Fixes #7059
Fixes #7061

Alright, this is the first draft of a solution for the dreaded request buffering issue. After a lot of investigation, we've arrived at this approach to fix the issue.

Summarizing:

  • We're still performing the request buffering at the shard level, like we were before, but we've replaced the old HealthCheck-based approach to find out when a failover is finished with a brand new KeyspaceEventWatcher.
  • The KeyspaceEventWatcher is a new implementation that augments HealthCheck events with metadata from the topology server for keyspace changes. This allows it to process any primary promotion events wholistically for the whole keyspace, so when these events are part of a resharding operation, they are only reported once the whole keyspace has been properly sharded and all the new shards are healthy and serving.
  • Because of this, we can now distinguish in the Buffer code between plained failovers for a single primary (whose buffered requests can be retried) and primary promotions as part of a resharding operation (whose buffered requests cannot be retried because the shard they were targetting is now gone as part of the resharding process).
  • When the latter situation arises, the buffering code reports a special error code that is handled in the execution engine. This handling has been implemented at the primitive level (as opposed to the initial approach that intended to handle this at the plan generation level). This means that the buffered query is only re-executed in the new shard for the primitive subquery, and not for the whole plan -- this is required for handling some corner cases, see engine: add support for finding a plan's affected shards #8681 for examples.

As far as we can tell (and as far as I can test manually), this implementation can resume buffering after any kind of resharding event. Notably however, it does not support MoveTable events, on which we're punting for now.

The implementation has a bit of duplicated code on Buffer because I wanted to leave in the old implementation to make them swappable while we gain confidence on the new KeyspaceEvent-based buffering code.

I need a drink.

cc @deepthi @harshit-gangal @sougou

Related Issue(s)

Checklist

  • Should this PR be backported?
  • Tests were added or are not required
  • Documentation was added or is not required

Deployment Notes

@vmg vmg added Component: Cluster management Component: Query Serving Type: Enhancement Logical improvement (somewhere between a bug and feature) labels Aug 30, 2021
@vmg vmg marked this pull request as ready for review August 30, 2021 15:26
@vmg vmg mentioned this pull request Aug 31, 2021
3 tasks
@vmg
Copy link
Collaborator Author

vmg commented Sep 1, 2021

OK, I think this PR is ready for review merge. I spent all of today implementing tests for the new functionality and I think I was successful. I've split the existing tabletgateway/buffer end-to-end tests into "reparenting" and "resharding" tests, to verify both behaviors.

The new tests ensure that queries are buffered and do not error out when a Vitess cluster is reparented or resharded. They both use the new Keyspace Events Watcher API for event detection. The reparenting tests are green when using the KEW and the old HealthCheck API, but I decided to run them only with KEW because we're deprecating the old API and these are expensive tests. The resharding tests are green with the KEW and faill with the HealthCheck API, which is, huh, kinda the whole point of this project.

One last thing I noticed while stress testing the resharding operation is another failure case that we never detected and never discussed before: in a high-traffic Vitess cluster with high QPS, it's possible that a query arrives to the VTGate during a resharding operation such as that the query is planned for shard 0 but such shard has been marked as unhealthy by the resharding operation by the time the query is executed. This would usually cause TabletGateway to fail fast with an UNAVAILABLE error that is not retried, since the topology server would return no shards capable of serving the query, and the buffering code would not be aware (yet) of any buffering events for the shard (as the shard was removed before any queries could fail on it and hence start the buffering process).

This is easy to reproduce with high enough QPS, and in practice results in a very short burst of instantly failed queries from the VTGate in the exact same moment than the reparenting starts. Fortunately, the new Keyspace Event Watcher is smart enough to notice this new corner case, so I've updated TabletGateway to handle it explicitly: if there are no shards available to serve the query, but the KEW knows that no shards are healthy because the keyspace is currently being resharded, we'll buffer the query all the same and re-try it at the end of the event so that it lands in the new shard. 👌

Comment on lines +272 to +276
// if we have a keyspace event watcher, check if the reason why our primary is not available is that it's currently being resharded
if target.TabletType == topodatapb.TabletType_PRIMARY && gw.kev != nil && gw.kev.TargetIsBeingResharded(target.Keyspace, target.Shard) {
err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, "current keyspace is being resharded")
continue
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the special handling I was talking about.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question to make sure I'm following correctly this change: Now, in addition to previous existent logic, it will use these events to retry.

One follow up question for these retries: other errors in this block, were not that time sensitive. For instance, if a connection to a tablet failed, in the next iteration of the block another tablet would be used for the retry. In this case, there is some amount of time that needs to elapsed before new primaries will become available. Would it make sense to start thinking of exponential backoff between retries?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need for exponential backoff here. In this specific case, we're setting an explicit buffering error, so that the next iteration of the loop will block directly on the buffering code and will not resume until the primary becomes available. This is not a busy loop, so we don't need to backoff. 👌

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I right that the "buffer" request here is not really put a "request" into a queue, we are queueing the entry and the actaul request goroutine just gets block until the shard split/failover finished?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct. What is stored in memory is a watcher-struct which is shared between all the requests to the same target (i.e. shard + keyspace), and the individual requests for the target become individually blocked on their corresponding goroutines until the buffering has finished.

Copy link
Member

@deepthi deepthi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work. Very clean as usual.
I had a couple of nits, otherwise LGTM.

go/vt/vtgate/buffer/buffer.go Outdated Show resolved Hide resolved
go/vt/vtgate/buffer/flags.go Outdated Show resolved Hide resolved
go/vt/vtgate/gateway.go Outdated Show resolved Hide resolved
@deepthi
Copy link
Member

deepthi commented Sep 2, 2021

Cluster test tabletgateway_buffer_reshard is failing right now, that needs to be resolved.

@vmg
Copy link
Collaborator Author

vmg commented Sep 2, 2021

Fixed all the tests 🍏, I'll merge at the end of the day in case somebody else wants to review.

@vmg vmg merged commit 990d49e into vitessio:main Sep 2, 2021
@deepthi
Copy link
Member

deepthi commented Sep 2, 2021

Use the new flag buffer_implementation=keyspace_events to enable this feature.

Copy link
Member

@rafael rafael left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arriving late to this party. This looks good to me as well. I left couple comments that are more for my own curiosity and understanding of the change.

@@ -231,10 +258,22 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target,
defer retryDone()
bufferedOnce = true
}

if bufferErr != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you provide some context on this change in the order of the logic here? We are checking retryDone, before checking the error.

I'm not familiarized with the details of the logic here, but this chance caught my attention.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a small corner case fix: there are now cases when the buffering code returns both an error and a cancelation function. It's important to defer the cancelation function whenever it's returned, even if we also have an error and must exit the function right away -- not defering the function would cause a (tiny) memory leak.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah cool. Makes sense as well.

Comment on lines +272 to +276
// if we have a keyspace event watcher, check if the reason why our primary is not available is that it's currently being resharded
if target.TabletType == topodatapb.TabletType_PRIMARY && gw.kev != nil && gw.kev.TargetIsBeingResharded(target.Keyspace, target.Shard) {
err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, "current keyspace is being resharded")
continue
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question to make sure I'm following correctly this change: Now, in addition to previous existent logic, it will use these events to retry.

One follow up question for these retries: other errors in this block, were not that time sensitive. For instance, if a connection to a tablet failed, in the next iteration of the block another tablet would be used for the retry. In this case, there is some amount of time that needs to elapsed before new primaries will become available. Would it make sense to start thinking of exponential backoff between retries?

Comment on lines +82 to +102
func TestBufferResharding(t *testing.T) {
t.Run("slow queries", func(t *testing.T) {
bt := &buffer.BufferingTest{
Assert: assertResharding,
Failover: reshard02,
SlowQueries: true,
VSchema: vschema,
}
bt.Test(t)
})

t.Run("fast queries", func(t *testing.T) {
bt := &buffer.BufferingTest{
Assert: assertResharding,
Failover: reshard02,
SlowQueries: false,
VSchema: vschema,
}
bt.Test(t)
})
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like we have reparenting tests with reserved connections, we should also check resharding with reserved connections.

for _, shard := range ksevent.Shards {
sb := b.getOrCreateBuffer(shard.Target.Keyspace, shard.Target.Shard)
if sb != nil {
sb.recordKeyspaceEvent(shard.Tablet, shard.Serving)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding, how would this work when vtgate gets a healthcheck for primary being down? My mental model is if we detect the primary is not serving via health check, we should start buffering the request, but looks likerecordKeyspaceEvent will always call stopBufferingLocked so nothing will be buffered?

What am I missing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually don't trigger any buffering operations when we receive the healcheck fail for a primary! That's because these healthchecks are processed by the topology engine, which often lags behind the actual availability issue. Instead, what we do it start buffering once the vtgate itself fails to reach the primary (i.e. when we get an error return during a request), and the Keyspace Events handler is designed to detect when the availability incident is over cluster-wide.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @vmg that makes sense.

start buffering once the vtgate itself fails to reach the primary (i.e. when we get an error return during a request)

it looks like the retry is triggered on these 3 kinds of error code: https://github.com/vitessio/vitess/blob/main/go/vt/vttablet/queryservice/wrapped.go#L75

My current understanding is vttablet maps to those error code here: Code_FAILED_PRECONDITION here, Code_UNAVAILABLE here and Code_CLUSTER_EVENT here - they seem not related to a primary failure. The other place where we return Code_CLUSTER_EVENT is when the primary tablet has !serving servingState, which only happen after a reparent event.

I think I'm missing some details here on the failover behavior, could you shed some lights on it? Thanks

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling on the tablet was wired up by @harshit-gangal, so I don't know all the details, but the actual buffering code is triggered with this check, so the only relevant error code when it comes to buffering is CLUSTER_EVENT:

func CausedByFailover(err error) bool {
log.V(2).Infof("Checking error (type: %T) if it is caused by a failover. err: %v", err, err)
return vterrors.Code(err) == vtrpcpb.Code_CLUSTER_EVENT
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...In retrospect, this function could use a different name since CLUSTER_EVENT can also be caused by a resharding operation. 😅

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, looking at the check here, looks like the buffering logic only covers after a reparent (but before vtgate get notified via health check), a vttablet would return Code_CLUSTER_EVENT. Since replHealthy is always true for primary tablet and serving state only changes via a reparent event. Does that sound right to you?

}

for shard, sstate := range kss.shards {
if sstate.serving && !activeShardsInPartition[shard] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cosmetic nit: this seems won't happen because of the early return at L209

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not accurate: the first check in line 209 is iterating through all the shards that the topology service knows and making sure we already know about them and that we know them to be healthy. This second loop is iterating through all the shards that we know about, to make sure there are no healthy shards that we know about but the topology service doesn't.

activeShardsInPartition := make(map[string]bool)
for _, shard := range primary.ShardReferences {
sstate := kss.shards[shard.Name]
if sstate == nil || !sstate.serving {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure I understand it correctly: this is only for shard split cases right? i.e., when we mark source shard as "not_serving", it will be reflected srv keyspace and we only care the "end" of the shard split - therefore we can early return at L209

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've documented this behavior in #8890 -- it explains all the different consistency checks.

Comment on lines +272 to +276
// if we have a keyspace event watcher, check if the reason why our primary is not available is that it's currently being resharded
if target.TabletType == topodatapb.TabletType_PRIMARY && gw.kev != nil && gw.kev.TargetIsBeingResharded(target.Keyspace, target.Shard) {
err = vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, "current keyspace is being resharded")
continue
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I right that the "buffer" request here is not really put a "request" into a queue, we are queueing the entry and the actaul request goroutine just gets block until the shard split/failover finished?

// If result is nil it must mean the channel has been closed. Stop goroutine in that case
bufferCancel()
gw.setupBuffering(ctx)
gw.QueryService = queryservice.Wrap(nil, gw.withRetry)
Copy link
Member

@5antelope 5antelope Sep 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we rely on srv keyspace to detect shard split events, how would a failover being handled here? Say we have a failover in a shard from A to B:
t1: A became problematic / not responding
t2: Orchestrator detects the problem and do an external reparent from A to B
t3: healthcheck detect the reparent event and set primary of the shard to B

After t3, requests should be handled properly. I'm thinking before t3, what is the mechanism to handle / buffer requests (if it exists)? I'm asking because if we can buffer as much requests as possible between t1 and t3, in theory we should have higher availability.

I see tablet has the logic to return Code_CLUSTER_EVENT when it is not serving.

Since replHealthy should always be true for primary (or is it?), in order to trigger buffer in vtgate, the orchestrator needs to modify sm.state on the old primary (A) at t2 so that vtgate can buffer requests from t2. Is my understanding correct or sm.state is set somehow magically by vttablet already when the primary is unhealthy?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Component: Cluster management Component: Query Serving Type: Enhancement Logical improvement (somewhere between a bug and feature)
Projects
None yet
6 participants