Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding denies $KV.>/$OBJ.> along leaf connections on differing domain #2916

Merged
merged 3 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/klauspost/compress v1.14.4
github.com/minio/highwayhash v1.0.2
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d h1:GRSmEJutHkdoxKsRypP575IIdoXe7Bm6yHQF6GcDBnA=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d h1:zJf4l8Kp67RIZhoVeniSLZs69SHNgjLHz0aNsqPPlx8=
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
12 changes: 4 additions & 8 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,18 +586,14 @@ func (a *Account) enableAllJetStreamServiceImportsAndMappings() error {
// Check if we have a Domain specified.
// If so add in a subject mapping that will allow local connected clients to reach us here as well.
if opts := s.getOpts(); opts.JetStreamDomain != _EMPTY_ {
src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
found := false
mappings := generateJSMappingTable(opts.JetStreamDomain)
a.mu.RLock()
for _, m := range a.mappings {
if src == m.src {
found = true
break
}
delete(mappings, m.src)
}
a.mu.RUnlock()
if !found {
if err := a.AddMapping(src, jsAllAPI); err != nil {
for src, dest := range mappings {
if err := a.AddMapping(src, dest); err != nil {
s.Errorf("Error adding JetStream domain mapping: %v", err)
}
}
Expand Down
29 changes: 28 additions & 1 deletion server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,34 @@ const (
JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
)

var denyAllJs = []string{jscAllSubj, raftAllSubj, jsAllAPI}
var denyAllClientJs = []string{jsAllAPI, "$KV.>", "$OBJ.>"}
var denyAllJs = []string{jscAllSubj, raftAllSubj, jsAllAPI, "$KV.>", "$OBJ.>"}

func generateJSMappingTable(domain string) map[string]string {
mappings := map[string]string{}
// This set of mappings is very very very ugly.
// It is a consequence of what we defined the domain prefix to be "$JS.domain.API" and it's mapping to "$JS.API"
// For optics $KV and $OBJ where made to be independent subject spaces.
// As materialized views of JS, they did not simply extend that subject space to say "$JS.API.KV" "$JS.API.OBJ"
// This is very unfortunate!!!
// Furthermore, it seemed bad to require different domain prefixes for JS/KV/OBJ.
// Especially since the actual API for say KV, does use stream create from JS.
// To avoid overlaps KV and OBJ views append the prefix to their API.
// (Replacing $KV with the prefix allows users to create collisions with say the bucket name)
// This mapping therefore needs to have extra token so that the mapping can properly discern between $JS, $KV, $OBJ
for srcMappingSuffix, to := range map[string]string{
"INFO": JSApiAccountInfo,
"STREAM.>": "$JS.API.STREAM.>",
"CONSUMER.>": "$JS.API.CONSUMER.>",
"META.>": "$JS.API.META.>",
"SERVER.>": "$JS.API.SERVER.>",
"$KV.>": "$KV.>",
"$OBJ.>": "$OBJ.>",
} {
mappings[fmt.Sprintf("$JS.%s.API.%s", domain, srcMappingSuffix)] = to
}
return mappings
}

// JSMaxDescription is the maximum description length for streams and consumers.
const JSMaxDescriptionLen = 4 * 1024
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6875,7 +6875,7 @@ func TestJetStreamClusterDomainsWithNoJSHub(t *testing.T) {
// Client based API - Connected to the core cluster with no JS but account has JS.
s := c.randomServer()
// Make sure the JS interest from the LNs has made it to this server.
checkSubInterest(t, s, "NOJS", "$JS.SPOKE.API.>", time.Second)
checkSubInterest(t, s, "NOJS", "$JS.SPOKE.API.INFO", time.Second)
nc, _ := jsClientConnect(t, s, nats.UserInfo("nojs", "p"))
defer nc.Close()

Expand Down
27 changes: 18 additions & 9 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1251,8 +1251,8 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c
meta.Campaign()
}
} else {
c.Noticef("JetStream Not Extended, adding deny %q for account %q", jsAllAPI, accName)
c.mergeDenyPermissionsLocked(both, []string{jsAllAPI})
c.Noticef("JetStream Not Extended, adding deny %+v for account %q", denyAllClientJs, accName)
c.mergeDenyPermissionsLocked(both, denyAllClientJs)
}
blockMappingOutgoing = true
} else if acc == sysAcc {
Expand All @@ -1274,19 +1274,21 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c
// If the system account is shared, jsAllAPI traffic will go through the system account.
// So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
// If the system account is NOT shared, jsAllAPI traffic has no business
c.Noticef("Adding deny %q for account %q", jsAllAPI, accName)
c.mergeDenyPermissionsLocked(both, []string{jsAllAPI})
c.Noticef("Adding deny %+v for account %q", denyAllClientJs, accName)
c.mergeDenyPermissionsLocked(both, denyAllClientJs)
}
// If we have a specified JetStream domain we will want to add a mapping to
// allow access cross domain for each non-system account.
if opts.JetStreamDomain != _EMPTY_ && acc != sysAcc && opts.JetStream {
src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
if err := acc.AddMapping(src, jsAllAPI); err != nil {
c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
} else {
c.Noticef("Adding JetStream Domain Mapping %q to account %q", src, accName)
for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
if err := acc.AddMapping(src, dest); err != nil {
c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
} else {
c.Noticef("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
}
}
if blockMappingOutgoing {
src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
// make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
// This is a guard against a miss-config with two identical domain names and will only cover some forms
// of this issue, not all of them.
Expand Down Expand Up @@ -1496,6 +1498,10 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
}
// Likewise for mappings.
for _, m := range acc.mappings {
if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
continue
}
ims = append(ims, m.src)
}

Expand Down Expand Up @@ -1695,6 +1701,9 @@ func (c *client) forceAddToSmap(subj string) {
c.mu.Lock()
defer c.mu.Unlock()

if c.leaf.smap == nil {
return
}
n := c.leaf.smap[subj]
if n != 0 {
return
Expand Down
97 changes: 97 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5183,3 +5183,100 @@ func TestLeafNodeQueueGroupWithLateLNJoin(t *testing.T) {
natsPub(t, cln1, "foo", []byte("hello"))
natsNexMsg(t, sln2, time.Second)
}

func TestLeafNodeJetStreamDomainMapCrossTalk(t *testing.T) {
accs := `
accounts :{
A:{ jetstream: enable, users:[ {user:a1,password:a1}]},
SYS:{ users:[ {user:s1,password:s1}]},
}
system_account: SYS
`

sd1 := createDir(t, JetStreamStoreDir)
defer os.RemoveAll(sd1)
confA := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
%s
jetstream: { domain: da, store_dir: '%s', max_mem: 50Mb, max_file: 50Mb }
leafnodes: {
listen: 127.0.0.1:-1
no_advertise: true
authorization: {
timeout: 0.5
}
}
`, accs, sd1)))
defer removeFile(t, confA)
sA, _ := RunServerWithConfig(confA)
defer sA.Shutdown()

sd2 := createDir(t, JetStreamStoreDir)
defer os.RemoveAll(sd2)
confL := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
%s
jetstream: { domain: dl, store_dir: '%s', max_mem: 50Mb, max_file: 50Mb }
leafnodes:{
no_advertise: true
remotes:[{url:nats://a1:[email protected]:%d, account: A},
{url:nats://s1:[email protected]:%d, account: SYS}]
}
`, accs, sd2, sA.opts.LeafNode.Port, sA.opts.LeafNode.Port)))
defer removeFile(t, confL)
sL, _ := RunServerWithConfig(confL)
defer sL.Shutdown()

ncA := natsConnect(t, sA.ClientURL(), nats.UserInfo("a1", "a1"))
matthiashanel marked this conversation as resolved.
Show resolved Hide resolved
defer ncA.Close()
ncL := natsConnect(t, sL.ClientURL(), nats.UserInfo("a1", "a1"))
defer ncL.Close()

test := func(jsA, jsL nats.JetStreamContext) {
kvA, err := jsA.CreateKeyValue(&nats.KeyValueConfig{Bucket: "bucket"})
require_NoError(t, err)
kvL, err := jsL.CreateKeyValue(&nats.KeyValueConfig{Bucket: "bucket"})
require_NoError(t, err)

_, err = kvA.Put("A", nil)
require_NoError(t, err)
_, err = kvL.Put("L", nil)
require_NoError(t, err)

// check for unwanted cross talk
_, err = kvA.Get("A")
require_NoError(t, err)
_, err = kvA.Get("l")
require_Error(t, err)
require_True(t, err == nats.ErrKeyNotFound)

_, err = kvL.Get("A")
require_Error(t, err)
require_True(t, err == nats.ErrKeyNotFound)
_, err = kvL.Get("L")
require_NoError(t, err)

err = jsA.DeleteKeyValue("bucket")
require_NoError(t, err)
err = jsL.DeleteKeyValue("bucket")
require_NoError(t, err)
}

jsA, err := ncA.JetStream()
require_NoError(t, err)
jsL, err := ncL.JetStream()
require_NoError(t, err)
test(jsA, jsL)

jsAL, err := ncA.JetStream(nats.Domain("dl"))
require_NoError(t, err)
jsLA, err := ncL.JetStream(nats.Domain("da"))
require_NoError(t, err)
test(jsAL, jsLA)

jsAA, err := ncA.JetStream(nats.Domain("da"))
require_NoError(t, err)
jsLL, err := ncL.JetStream(nats.Domain("dl"))
require_NoError(t, err)
test(jsAA, jsLL)
}
10 changes: 6 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1405,10 +1405,12 @@ func (s *Server) registerAccountNoLock(acc *Account) *Account {
if jsEnabled {
s.Warnf("Skipping Default Domain %q, set for JetStream enabled account %q", defDomain, accName)
} else if defDomain != _EMPTY_ {
dest := fmt.Sprintf(jsDomainAPI, defDomain)
s.Noticef("Adding default domain mapping %q -> %q to account %q %p", jsAllAPI, dest, accName, acc)
if err := acc.AddMapping(jsAllAPI, dest); err != nil {
s.Errorf("Error adding JetStream default domain mapping: %v", err)
for src, dest := range generateJSMappingTable(defDomain) {
// flip src and dest around so the domain is inserted
s.Noticef("Adding default domain mapping %q -> %q to account %q %p", dest, src, accName, acc)
if err := acc.AddMapping(dest, src); err != nil {
s.Errorf("Error adding JetStream default domain mapping: %v", err)
}
}
}
}
Expand Down