Skip to content

Commit 2bb67c3

Browse files
tgrosstantra35
andcommitted
client: use Status.RPCServers RPC for Consul discovery
In #16217 we switched clients using Consul discovery to the `Status.Members` endpoint for getting the list of servers so that we're using the correct address. This endpoint has an authorization gate, so this fails if the anonymous policy doesn't have `node:read`. We also can't check the `AuthToken` for the request for the client secret, because the client hasn't yet registered so the server doesn't have anything to compare against. Create a new `Status.RPCServers` endpoint that mirrors the `Status.Peers` endpoint but provides the RPC server addresses instead of the Raft addresses. This fixes the authentication bug but also ensures we're only registering with servers in the client's region and not in any other servers that might have registered with Consul. This changeset also expands the test coverage of the RPC endpoint and closes up potential holes in the `ResolveACL` method that aren't currently bugs but easily could become bugs if we called the method without ensuring its invariants are upheld. Co-authored-by: tantra35 <[email protected]>
1 parent eaf22f2 commit 2bb67c3

File tree

6 files changed

+107
-21
lines changed

6 files changed

+107
-21
lines changed

.changelog/16490.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
client: Fixed a bug where clients using Consul discovery to join the cluster would get permission denied errors
3+
```

.semgrep/rpc_endpoint.yml

+1
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ rules:
103103
- pattern-not: '"CSIPlugin.List"'
104104
- pattern-not: '"Status.Leader"'
105105
- pattern-not: '"Status.Peers"'
106+
- pattern-not: '"Status.RPCServers"'
106107
- pattern-not: '"Status.Version"'
107108
message: "RPC method $METHOD appears to be unauthenticated"
108109
languages:

client/client.go

+16-17
Original file line numberDiff line numberDiff line change
@@ -2906,7 +2906,8 @@ func (c *Client) consulDiscoveryImpl() error {
29062906
dcs = dcs[0:helper.Min(len(dcs), datacenterQueryLimit)]
29072907
}
29082908

2909-
// Query for servers in this client's region only
2909+
// Query for servers in this client's region only. Note this has to be an
2910+
// unauthenticated request because we haven't registered yet.
29102911
region := c.Region()
29112912
rpcargs := structs.GenericRequest{
29122913
QueryOptions: structs.QueryOptions{
@@ -2944,26 +2945,24 @@ DISCOLOOP:
29442945
continue
29452946
}
29462947

2947-
// Query the members from the region that Consul gave us, and
2948-
// extract the client-advertise RPC address from each member
2949-
var membersResp structs.ServerMembersResponse
2950-
if err := c.connPool.RPC(region, addr, "Status.Members", rpcargs, &membersResp); err != nil {
2948+
srv := &servers.Server{Addr: addr}
2949+
nomadServers = append(nomadServers, srv)
2950+
2951+
// Query the client-advertise RPC addresses from the region that
2952+
// Consul gave us
2953+
var members []string
2954+
if err := c.connPool.RPC(region, addr, "Status.RPCServers", rpcargs, &members); err != nil {
29512955
mErr.Errors = append(mErr.Errors, err)
29522956
continue
29532957
}
2954-
for _, member := range membersResp.Members {
2955-
if addrTag, ok := member.Tags["rpc_addr"]; ok {
2956-
if portTag, ok := member.Tags["port"]; ok {
2957-
addr, err := net.ResolveTCPAddr("tcp",
2958-
fmt.Sprintf("%s:%s", addrTag, portTag))
2959-
if err != nil {
2960-
mErr.Errors = append(mErr.Errors, err)
2961-
continue
2962-
}
2963-
srv := &servers.Server{Addr: addr}
2964-
nomadServers = append(nomadServers, srv)
2965-
}
2958+
for _, member := range members {
2959+
addr, err := net.ResolveTCPAddr("tcp", member)
2960+
if err != nil {
2961+
mErr.Errors = append(mErr.Errors, err)
2962+
continue
29662963
}
2964+
srv := &servers.Server{Addr: addr}
2965+
nomadServers = append(nomadServers, srv)
29672966
}
29682967

29692968
if len(nomadServers) > 0 {

nomad/acl.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -161,9 +161,14 @@ func (s *Server) remoteIPFromRPCContext(ctx *RPCContext) (net.IP, error) {
161161
// for the identity they intend the operation to be performed with.
162162
func (s *Server) ResolveACL(args structs.RequestWithIdentity) (*acl.ACL, error) {
163163
identity := args.GetIdentity()
164-
if !s.config.ACLEnabled || identity == nil {
164+
if !s.config.ACLEnabled {
165165
return nil, nil
166166
}
167+
if identity == nil {
168+
// Server.Authenticate should never return a nil identity unless there's
169+
// an authentication error, but enforce that invariant here
170+
return nil, structs.ErrPermissionDenied
171+
}
167172
aclToken := identity.GetACLToken()
168173
if aclToken != nil {
169174
return s.ResolveACLForToken(aclToken)
@@ -172,7 +177,10 @@ func (s *Server) ResolveACL(args structs.RequestWithIdentity) (*acl.ACL, error)
172177
if claims != nil {
173178
return s.ResolveClaims(claims)
174179
}
175-
return nil, nil
180+
181+
// return an error here so that we enforce the invariant that we check for
182+
// Identity.ClientID before trying to resolve ACLs
183+
return nil, structs.ErrPermissionDenied
176184
}
177185

178186
// ResolveACLForToken resolves an ACL from a token only. It should be used only

nomad/status_endpoint.go

+25
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,31 @@ func (s *Status) Peers(args *structs.GenericRequest, reply *[]string) error {
7878
return nil
7979
}
8080

81+
// RPCServers is used to get all the RPC server addresses in a region
82+
func (s *Status) RPCServers(args *structs.GenericRequest, reply *[]string) error {
83+
// note: we're intentionally throwing away any auth error here and only
84+
// authenticate so that we can measure rate metrics
85+
s.srv.Authenticate(s.ctx, args)
86+
s.srv.MeasureRPCRate("status", structs.RateMetricList, args)
87+
88+
if args.Region == "" {
89+
args.Region = s.srv.config.Region
90+
}
91+
if done, err := s.srv.forward("Status.RPCServers", args, args, reply); done {
92+
return err
93+
}
94+
95+
future := s.srv.raft.GetConfiguration()
96+
if err := future.Error(); err != nil {
97+
return err
98+
}
99+
100+
for _, server := range future.Configuration().Servers {
101+
*reply = append(*reply, s.srv.localPeers[server.Address].RPCAddr.String())
102+
}
103+
return nil
104+
}
105+
81106
// Members return the list of servers in a cluster that a particular server is
82107
// aware of
83108
func (s *Status) Members(args *structs.GenericRequest, reply *structs.ServerMembersResponse) error {

nomad/status_endpoint_test.go

+52-2
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
package nomad
22

33
import (
4+
"fmt"
5+
"net"
46
"testing"
57

68
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
9+
"github.com/shoenig/test/must"
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
713
"github.com/hashicorp/nomad/acl"
814
"github.com/hashicorp/nomad/ci"
915
"github.com/hashicorp/nomad/helper/uuid"
1016
"github.com/hashicorp/nomad/nomad/mock"
1117
"github.com/hashicorp/nomad/nomad/structs"
1218
"github.com/hashicorp/nomad/testutil"
13-
"github.com/stretchr/testify/assert"
14-
"github.com/stretchr/testify/require"
1519
)
1620

1721
func TestStatusPing(t *testing.T) {
@@ -73,6 +77,52 @@ func TestStatusPeers(t *testing.T) {
7377
}
7478
}
7579

80+
func TestStatus_RPCServers(t *testing.T) {
81+
ci.Parallel(t)
82+
83+
advAddr1 := "127.0.1.1:1234"
84+
adv1, err := net.ResolveTCPAddr("tcp", advAddr1)
85+
must.NoError(t, err)
86+
87+
s1, cleanupS1 := TestServer(t, func(c *Config) {
88+
c.Region = "region1"
89+
c.ClientRPCAdvertise = adv1
90+
})
91+
defer cleanupS1()
92+
93+
s2, cleanupS2 := TestServer(t, func(c *Config) {
94+
c.Region = "region2"
95+
})
96+
defer cleanupS2()
97+
98+
// Join them together
99+
s2Addr := fmt.Sprintf("127.0.0.1:%d", s2.config.SerfConfig.MemberlistConfig.BindPort)
100+
n, err := s1.Join([]string{s2Addr})
101+
must.NoError(t, err, must.Sprintf("Failed joining: %v (%d joined)", err, n))
102+
103+
codec := rpcClient(t, s1)
104+
105+
t.Run("own region", func(t *testing.T) {
106+
arg := &structs.GenericRequest{
107+
QueryOptions: structs.QueryOptions{Region: "region1"},
108+
}
109+
var members []string
110+
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Status.RPCServers", arg, &members))
111+
must.Len(t, 1, members)
112+
must.Eq(t, advAddr1, members[0])
113+
})
114+
115+
t.Run("other region", func(t *testing.T) {
116+
arg := &structs.GenericRequest{
117+
QueryOptions: structs.QueryOptions{Region: "region2"},
118+
}
119+
var members []string
120+
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Status.RPCServers", arg, &members))
121+
must.Len(t, 1, members)
122+
must.Eq(t, s2.clientRpcAdvertise.String(), members[0])
123+
})
124+
}
125+
76126
func TestStatusMembers(t *testing.T) {
77127
ci.Parallel(t)
78128

0 commit comments

Comments
 (0)