Skip to content

Commit

Permalink
Fix streaming RPCs for agentless. (#20868)
Browse files Browse the repository at this point in the history
* Fix streaming RPCs for agentless.

This PR fixes an issue where cross-dc RPCs were unable to utilize
the streaming backend due to having the node name set. The result
of this was the agent-cache being utilized, which would cause high
cpu utilization and memory consumption due to the fact that it
keeps queries alive for 72 hours before purging inactive entries.

This resource consumption is compounded by the fact that each pod
in consul-k8s gets a unique token. Since the agent-cache uses the
token as a component of the key, the same query is duplicated for
each pod that is deployed.

* Add changelog.
  • Loading branch information
hashi-derek authored Mar 15, 2024
1 parent 0ac8ae6 commit ac83ac1
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 2 deletions.
3 changes: 3 additions & 0 deletions .changelog/20868.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
connect: Fix issue where Consul-dataplane xDS sessions would not utilize the streaming backend for wan-federated queries.
```
1 change: 1 addition & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,7 @@ func (a *Agent) Start(ctx context.Context) error {
Segment: a.config.SegmentName,
Node: a.config.NodeName,
NodePartition: a.config.PartitionOrEmpty(),
DisableNode: true, // Disable for agentless so that streaming RPCs can be used.
},
DNSConfig: proxycfg.DNSConfig{
Domain: a.config.DNSDomain,
Expand Down
7 changes: 6 additions & 1 deletion agent/rpcclient/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ func (c *Client) Notify(
}

func (c *Client) useStreaming(req structs.ServiceSpecificRequest) bool {
return c.UseStreamingBackend && !req.Ingress && req.Source.Node == ""
return c.UseStreamingBackend &&
!req.Ingress &&
// Streaming is incompatible with NearestN queries (due to lack of ordering),
// so we can only use it if the NearestN would never work (Node == "")
// or if we explicitly say to ignore the Node field for queries (agentless xDS).
(req.Source.Node == "" || req.Source.DisableNode)
}

func (c *Client) newServiceRequest(req structs.ServiceSpecificRequest) serviceRequest {
Expand Down
6 changes: 5 additions & 1 deletion agent/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,11 @@ type QuerySource struct {
Segment string
Node string
NodePartition string `json:",omitempty"`
Ip string
// DisableNode indicates that the Node and NodePartition fields should not be used
// for determining the flow of the RPC. This is needed for agentless + wanfed to
// utilize streaming RPCs.
DisableNode bool `json:",omitempty"`
Ip string
}

func (s QuerySource) NodeEnterpriseMeta() *acl.EnterpriseMeta {
Expand Down

0 comments on commit ac83ac1

Please sign in to comment.