Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global ratelimiter: everything else #6141

Merged
merged 29 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
573ea38
dynamic config set up, and metrics, and now need more logic to handle…
Groxx Jun 10, 2024
8100d32
got a panic button disable going
Groxx Jun 10, 2024
794f664
Merge remote-tracking branch 'origin/master' into limiter_interface
Groxx Jun 15, 2024
4f1cccc
most of shadowing is probably working
Groxx Jun 18, 2024
fae6457
partial copies from rpc branch
Groxx Jun 18, 2024
b6df0e5
metrics, most rpc added, no tests yet
Groxx Jun 18, 2024
9eb9ca8
minor cleanup/fix
Groxx Jun 18, 2024
1d7afcf
minor
Groxx Jun 18, 2024
84b5167
it builds
Groxx Jun 18, 2024
e7750ad
basic test of collection is passing, boilerplate handler built but no…
Groxx Jun 18, 2024
f472646
tests pass, most things finished, maybe try running
Groxx Jun 19, 2024
2a3b361
fix metric scope location
Groxx Jun 19, 2024
2b1b78e
minor fixes and behavior improvements
Groxx Jun 20, 2024
816f3e5
arguable: add a "run 2 instances" helper config
Groxx Jun 20, 2024
9361aa1
use will-be-default ratelimiter mode by default
Groxx Jun 20, 2024
8c0d9c1
more tests for collection-related stuff, pretty good coverage
Groxx Jun 22, 2024
9e892fd
lint fix
Groxx Jun 24, 2024
62c4c9b
more linting blah
Groxx Jun 24, 2024
5c58beb
Feedback from a first review pass
Groxx Jun 27, 2024
7403ba9
more small touchups, test fix
Groxx Jun 27, 2024
135fdc7
test fix
Groxx Jun 27, 2024
789edf6
Merge remote-tracking branch 'origin/master' into limiter_interface
Groxx Jun 27, 2024
943b935
disable -> disabled consistency
Groxx Jun 27, 2024
7618574
Switch to merged IDL change, minor cleanup
Groxx Jun 27, 2024
f631296
dynamic config key description improvements, minor var name consistency
Groxx Jun 27, 2024
0942c4c
coverage for new dynamic config types
Groxx Jun 27, 2024
02cff69
fixing dynamic config key docs
Groxx Jun 27, 2024
1f37531
add missing dynamic config filter parsing, and test to catch it next …
Groxx Jun 28, 2024
0d7d4cb
also make sure no dups exist
Groxx Jun 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,173 changes: 1,171 additions & 2 deletions .gen/go/history/history.go

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion client/clientBean.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type (
// Bean in an collection of clients
Bean interface {
GetHistoryClient() history.Client
GetHistoryPeers() history.PeerResolver
Groxx marked this conversation as resolved.
Show resolved Hide resolved
GetMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error)
GetFrontendClient() frontend.Client
GetRemoteAdminClient(cluster string) admin.Client
Expand All @@ -51,6 +52,7 @@ type (
clientBeanImpl struct {
sync.Mutex
historyClient history.Client
historyPeers history.PeerResolver
matchingClient atomic.Value
frontendClient frontend.Client
remoteAdminClients map[string]admin.Client
Expand All @@ -62,7 +64,7 @@ type (
// NewClientBean provides a collection of clients
func NewClientBean(factory Factory, dispatcher *yarpc.Dispatcher, clusterMetadata cluster.Metadata) (Bean, error) {

historyClient, err := factory.NewHistoryClient()
historyClient, historyPeers, err := factory.NewHistoryClient()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -97,6 +99,7 @@ func NewClientBean(factory Factory, dispatcher *yarpc.Dispatcher, clusterMetadat
return &clientBeanImpl{
factory: factory,
historyClient: historyClient,
historyPeers: historyPeers,
frontendClient: remoteFrontendClients[clusterMetadata.GetCurrentClusterName()],
remoteAdminClients: remoteAdminClients,
remoteFrontendClients: remoteFrontendClients,
Expand All @@ -107,6 +110,10 @@ func (h *clientBeanImpl) GetHistoryClient() history.Client {
return h.historyClient
}

func (h *clientBeanImpl) GetHistoryPeers() history.PeerResolver {
return h.historyPeers
}

func (h *clientBeanImpl) GetMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error) {
if client := h.matchingClient.Load(); client != nil {
return client.(matching.Client), nil
Expand Down
14 changes: 14 additions & 0 deletions client/clientBean_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ import (
type (
// Factory can be used to create RPC clients for cadence services
Factory interface {
NewHistoryClient() (history.Client, error)
NewHistoryClient() (history.Client, history.PeerResolver, error)
NewMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error)

NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, error)
NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, history.PeerResolver, error)
NewMatchingClientWithTimeout(domainIDToName DomainIDToNameFunc, timeout time.Duration, longPollTimeout time.Duration) (matching.Client, error)

NewAdminClientWithTimeoutAndConfig(config transport.ClientConfig, timeout time.Duration, largeTimeout time.Duration) (admin.Client, error)
Expand Down Expand Up @@ -96,15 +96,15 @@ func NewRPCClientFactory(
}
}

func (cf *rpcClientFactory) NewHistoryClient() (history.Client, error) {
func (cf *rpcClientFactory) NewHistoryClient() (history.Client, history.PeerResolver, error) {
return cf.NewHistoryClientWithTimeout(timeoutwrapper.HistoryDefaultTimeout)
}

func (cf *rpcClientFactory) NewMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error) {
return cf.NewMatchingClientWithTimeout(domainIDToName, timeoutwrapper.MatchingDefaultTimeout, timeoutwrapper.MatchingDefaultLongPollTimeout)
}

func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, error) {
func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, history.PeerResolver, error) {
var rawClient history.Client
var namedPort = membership.PortTchannel

Expand Down Expand Up @@ -132,7 +132,7 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (
client = metered.NewHistoryClient(client, cf.metricsClient)
}
client = timeoutwrapper.NewHistoryClient(client, timeout)
return client, nil
return client, peerResolver, nil
}

func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
Expand Down
17 changes: 17 additions & 0 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package history

import (
"context"
"fmt"
"math/rand"
"sync"
"time"
Expand Down Expand Up @@ -1079,6 +1080,22 @@ func (c *clientImpl) GetFailoverInfo(
return c.client.GetFailoverInfo(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
}

func (c *clientImpl) RatelimitUpdate(ctx context.Context, request *types.RatelimitUpdateRequest, opts ...yarpc.CallOption) (*types.RatelimitUpdateResponse, error) {
if len(opts) == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are not checking existence of yarpc.WithShardKey() when len > 0

Copy link
Member Author

@Groxx Groxx Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because there isn't a good way to do so, yes.

(see the comment lines immediately below)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WithShardKey is a CallOption: https://github.com/yarpc/yarpc-go/blob/v1.73.0/call.go#L70
Which is an encoding.CallOption: https://github.com/yarpc/yarpc-go/blob/v1.73.0/call.go#L34
Which is this struct: https://github.com/yarpc/yarpc-go/blob/dev/api/encoding/call_option.go#L29-L31
Which is implemented by this here: https://github.com/yarpc/yarpc-go/blob/dev/api/encoding/call_option.go#L60-L70

So our only real option (AFAICT) is to reflect on the internal opt field, and try to read the un-typed field.String() raw contents, and hope it's recognizable as a shard key (because reflection will not give us the actual type on private fields).

Copy link
Member Author

@Groxx Groxx Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

poking a bit more, I think this works:

reflect.ValueOf(k).FieldByName("opt").Elem().Type().Name() // shardKeyOption

and we should be able to detect it changing in tests, but tbh I'm not sure how reliable non-panicky that'd be with other args / future yarpc changes 🤔.

a "recover and log and fail" might take care of the edge cases we care about well enough, just feels like a potential bit of pain in a surprise-future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about making shard key an explicit parameter?

Copy link
Member Author

@Groxx Groxx Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might end up being the nicest route... but doing that means changing quite a lot of code, as well as a few pieces of codegen :\

I'm happy to stick it on a "try it soon" to-do list (there are a few followups worth doing), but tbh it doesn't feel worth making this PR even bigger. There's only one caller, and that's likely all there ever will be.
(it'll also be a pretty clear / self-contained followup, as opposed to mingling in a few places in this PR)


I did try a different route that @taylanisikdemir suggested (pushing the key-arg into the request object), and while I kinda liked that better than the CallOption slice... it ran into quite a few issues when I fully built it :\ both import cycles (for nicer types) and "this really just moves the flaw, it doesn't eliminate it, and it's more complicated"-like issues.

// unfortunately there is not really any way to ensure "must have a shard key option"
// due to the closed nature of yarpc.CallOption's implementation, outside private-field-reading reflection.
//
// there are a few options to work around this, but they are currently rather high effort or
// run into import cycles or similar. risk is low for now as there is only one caller, and likely
// never will be others.
return nil, fmt.Errorf("invalid arguments, missing yarpc.WithShardKey(peer) at a minimum")
}

// intentionally does not use peer-redirecting retries, as keys in this request
// could end up on multiple different hosts after a peer change.
return c.client.RatelimitUpdate(ctx, request, opts...)
}

func (c *clientImpl) executeWithRedirect(
ctx context.Context,
peer string,
Expand Down
33 changes: 33 additions & 0 deletions client/history/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,22 @@ func TestClient_withResponse(t *testing.T) {
},
want: &types.StartWorkflowExecutionResponse{},
},
{
name: "RatelimitUpdate",
op: func(c Client) (any, error) {
return c.RatelimitUpdate(context.Background(), &types.RatelimitUpdateRequest{
Any: &types.Any{
ValueType: "something",
Value: []byte("data"),
},
}, yarpc.WithShardKey("test-peer"))
},
mock: func(p *MockPeerResolver, c *MockClient) {
c.EXPECT().RatelimitUpdate(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("test-peer")}).
Return(&types.RatelimitUpdateResponse{}, nil).Times(1)
},
want: &types.RatelimitUpdateResponse{},
},
{
name: "StartWorkflowExecution peer resolve failure",
op: func(c Client) (any, error) {
Expand Down Expand Up @@ -906,6 +922,23 @@ func TestClient_withResponse(t *testing.T) {
},
wantError: true,
},
{
name: "RatelimitUpdate requires explicit shard key arg",
op: func(c Client) (any, error) {
// same as successful call...
return c.RatelimitUpdate(context.Background(), &types.RatelimitUpdateRequest{
// Peer: "", // intentionally the zero value
Any: &types.Any{
ValueType: "something",
Value: []byte("data"),
},
})
},
mock: func(p *MockPeerResolver, c *MockClient) {
// no calls expected
},
wantError: true,
},
}

for _, tt := range tests {
Expand Down
8 changes: 8 additions & 0 deletions client/history/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,12 @@ type Client interface {
SyncShardStatus(context.Context, *types.SyncShardStatusRequest, ...yarpc.CallOption) error
TerminateWorkflowExecution(context.Context, *types.HistoryTerminateWorkflowExecutionRequest, ...yarpc.CallOption) error
GetFailoverInfo(context.Context, *types.GetFailoverInfoRequest, ...yarpc.CallOption) (*types.GetFailoverInfoResponse, error)

// RatelimitUpdate pushes usage info for the passed ratelimit keys, and requests updated weight info from aggregating hosts.
// Exact semantics beyond this depend on the load-balanced ratelimit implementation.
//
// A peer (via yarpc.WithShardkey) MUST be determined before calling and passed in yarpc opts,
// and unlike most endpoints this will NOT be forwarded to a new peer if the ring membership changes.
// To correctly forward keys to the new hosts, they must be re-sharded to find their new hosts.
RatelimitUpdate(ctx context.Context, request *types.RatelimitUpdateRequest, opts ...yarpc.CallOption) (*types.RatelimitUpdateResponse, error)
}
20 changes: 20 additions & 0 deletions client/history/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 69 additions & 0 deletions client/history/peer_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
package history

import (
"fmt"

"go.uber.org/yarpc"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/service"
Expand All @@ -38,6 +42,20 @@ type PeerResolver interface {
FromShardID(shardID int) (string, error)
FromHostAddress(hostAddress string) (string, error)
GetAllPeers() ([]string, error)

// GlobalRatelimitPeers partitions the ratelimit keys into map[yarpc peer][]limits_for_peer
GlobalRatelimitPeers(ratelimits []string) (ratelimitsByPeer map[Peer][]string, err error)
}

// Peer is used to mark a string as the routing information to a peer process.
//
// This is essentially the host:port address of the peer to be contacted,
// but it is meant to be treated as an opaque blob until given to yarpc
// via ToYarpcShardKey.
type Peer string

func (s Peer) ToYarpcShardKey() yarpc.CallOption {
return yarpc.WithShardKey(string(s))
}

type peerResolver struct {
Expand Down Expand Up @@ -110,3 +128,54 @@ func (pr peerResolver) GetAllPeers() ([]string, error) {
}
return peers, nil
}

func (pr peerResolver) GlobalRatelimitPeers(ratelimits []string) (map[Peer][]string, error) {
// History was selected simply because it already has a ring and an internal-only API.
// Any service should be fine, it just needs to be shared by both ends of the system.
hosts, err := pr.resolver.Members(service.History)
if err != nil {
return nil, fmt.Errorf("unable to get history peers: %w", err)
}
if len(hosts) == 0 {
// can occur when shutting down the only instance because it calls EvictSelf ASAP.
// this is common in dev, but otherwise *probably* should not be possible in a healthy system.
return nil, fmt.Errorf("unable to get history peers: no peers available")
}

results := make(map[Peer][]string, len(hosts))
initialCapacity := len(ratelimits) / len(hosts)
// add a small buffer to reduce copies, as this is only an estimate
initialCapacity += initialCapacity / 10
// but don't use zero, that'd be pointless
if initialCapacity < 1 {
initialCapacity = 1
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add debug logs to a few places in this func for the initial rollout

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I'd not bother maybe as part of this PR, but at allowing hostnames to be mapped to a particular rate-limit (via sampling or debug logs) would probably be operationally be quite helpful in diagnosing any problems

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since I broadly agree but I think there are a few overlapping uses:
what kind of goals do you think are worth building immediately vs "soon"?

OTOH I think there are:

  • if we think there's a sharding error logic of some kind, we'd need to know keys and mapped peers. sampling might luckily log the issue, but this isn't hit all that frequently so like 1% might take several minutes until host X tells us what we need... and I'm not really sure that's good enough tbh. but it feels like too much info to log all the time.
  • if we want to sanity check the behavior, sampling works fine. we should see evidence in other places too, e.g. wrong allow/reject rates, but sampled logs let us ignore it for a bit and check later and that's kinda convenient.
  • ^ both of these are possibly solved by a debug page that dumps collection info / usage / etc, for single host investigating (but not bulk), and it'd be nice to have a system to allow that in open-source. I don't believe we have any right now though.

and... feels like maybe other scenarios are useful? I haven't been involved in much of our ring issues, not sure what worked / would have helped in those.

for _, r := range ratelimits {
// figure out the destination for this ratelimit
host, err := pr.resolver.Lookup(service.History, r)
if err != nil {
return nil, fmt.Errorf(
"unable to find host for ratelimit key %q: %w", r, err)
}
peer, err := host.GetNamedAddress(pr.namedPort)
if err != nil {
// AFAICT should not happen unless the peer data is incomplete / the ring has no port info.
// retry may work? eventually?
return nil, fmt.Errorf(
"unable to get address from host: %s, %w",
host.String(), // HostInfo has a readable String(), it's good enough
err,
)
}

// add to the peer's partition
current := results[Peer(peer)]
if len(current) == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: more readable version with bool ok

current, ok := results[peer]
if !ok {
  current = make([]string, 0, initialCapacity)
}
current = append(current, r)
results[peer] = current

Copy link
Member Author

@Groxx Groxx Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh I don't think I'd label that "more readable" - it adds a var and allows a third category of behavior, where "current" exists but is empty. len(current)==0 has only two cases to consider: empty or not.

though I suppose empty is fine to append to. just feels odd/worse to me to imply "if something else added here, use it verbatim", particularly with how weird-mutation-prone slices are.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added as a nit comment but since you are also interested in this topic let me share more on my reasoning. (not a blocker for this PR by the way)

Both options achieve the same thing for this case. The only advantage of using if !ok approach is the consistency across various map existence check scenarios in the codebase. I guess we'd agree that when a convention is consistently applied in the codebase it helps with reading experience. If each developer chooses a viable option for their own taste then you lose some readability score.
Bool ok is part of the map interface and handles all cases. However using the values to determine existence fall short if the map can contain values that are the default values such as integer 0 or a zero struct value. In that case checking if myMap[key] != 0 doesn't work. This means you would have to use if !ok for some maps and zero value check for others. Obviously inconsistent => less readable.

We can argue how many milliseconds does this add to reading experience :) From principles perspective, I see the obvious choice of sticking with the approach that works for all cases to stay consistent.

Copy link
Member Author

@Groxx Groxx Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh I don't think we're particularly consistent about this in the codebase. but for this, zero values are valid and safe and comparable in this case, which is why I don't differentiate between "exists" and "not exists". if they were unsafe or ambiguous I'd absolutely agree, ok is the only reasonable choice.

I do wish Go had something like python's defaultdict(constructor). it makes more-efficient-init stuff like this a lot more readable, particularly if there are multiple branches that need the same checks :\

current = make([]string, 0, initialCapacity)
}
current = append(current, r)
results[Peer(peer)] = current
}
return results, nil
}
15 changes: 15 additions & 0 deletions client/history/peer_resolver_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading