From 9a2da9ed8c04992baa66a558069f09d0b717a533 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Wed, 9 Mar 2022 13:17:59 -0500 Subject: [PATCH] Adding denies $KV.>/$OBJ.> along leaf connections on differing domain (#2916) * Adding denies $KV.>/$OBJ.> along leaf connections on differing domain Signed-off-by: Matthias Hanel --- go.mod | 2 +- go.sum | 4 +- server/jetstream.go | 12 ++-- server/jetstream_api.go | 29 +++++++++- server/jetstream_cluster_test.go | 2 +- server/leafnode.go | 27 ++++++--- server/leafnode_test.go | 97 ++++++++++++++++++++++++++++++++ server/server.go | 10 ++-- 8 files changed, 157 insertions(+), 26 deletions(-) diff --git a/go.mod b/go.mod index 6336de6360c..add4357a6ba 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/klauspost/compress v1.14.4 github.com/minio/highwayhash v1.0.2 github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 - github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d + github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce diff --git a/go.sum b/go.sum index 6f39303cc38..ddf89a54a3c 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d h1:GRSmEJutHkdoxKsRypP575IIdoXe7Bm6yHQF6GcDBnA= -github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d h1:zJf4l8Kp67RIZhoVeniSLZs69SHNgjLHz0aNsqPPlx8= +github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/server/jetstream.go b/server/jetstream.go index 5a301828c92..88e80029d6e 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -586,18 +586,14 @@ func (a *Account) enableAllJetStreamServiceImportsAndMappings() error { // Check if we have a Domain specified. // If so add in a subject mapping that will allow local connected clients to reach us here as well. if opts := s.getOpts(); opts.JetStreamDomain != _EMPTY_ { - src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain) - found := false + mappings := generateJSMappingTable(opts.JetStreamDomain) a.mu.RLock() for _, m := range a.mappings { - if src == m.src { - found = true - break - } + delete(mappings, m.src) } a.mu.RUnlock() - if !found { - if err := a.AddMapping(src, jsAllAPI); err != nil { + for src, dest := range mappings { + if err := a.AddMapping(src, dest); err != nil { s.Errorf("Error adding JetStream domain mapping: %v", err) } } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index b89e8ceb97d..4f06e543cb4 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -256,7 +256,34 @@ const ( JSAuditAdvisory = "$JS.EVENT.ADVISORY.API" ) -var denyAllJs = []string{jscAllSubj, raftAllSubj, jsAllAPI} +var denyAllClientJs = []string{jsAllAPI, "$KV.>", "$OBJ.>"} +var denyAllJs = []string{jscAllSubj, raftAllSubj, jsAllAPI, "$KV.>", "$OBJ.>"} + +func generateJSMappingTable(domain string) map[string]string { + mappings := map[string]string{} + // This set of mappings is very very very ugly. + // It is a consequence of what we defined the domain prefix to be "$JS.domain.API" and it's mapping to "$JS.API" + // For optics $KV and $OBJ where made to be independent subject spaces. + // As materialized views of JS, they did not simply extend that subject space to say "$JS.API.KV" "$JS.API.OBJ" + // This is very unfortunate!!! + // Furthermore, it seemed bad to require different domain prefixes for JS/KV/OBJ. + // Especially since the actual API for say KV, does use stream create from JS. + // To avoid overlaps KV and OBJ views append the prefix to their API. + // (Replacing $KV with the prefix allows users to create collisions with say the bucket name) + // This mapping therefore needs to have extra token so that the mapping can properly discern between $JS, $KV, $OBJ + for srcMappingSuffix, to := range map[string]string{ + "INFO": JSApiAccountInfo, + "STREAM.>": "$JS.API.STREAM.>", + "CONSUMER.>": "$JS.API.CONSUMER.>", + "META.>": "$JS.API.META.>", + "SERVER.>": "$JS.API.SERVER.>", + "$KV.>": "$KV.>", + "$OBJ.>": "$OBJ.>", + } { + mappings[fmt.Sprintf("$JS.%s.API.%s", domain, srcMappingSuffix)] = to + } + return mappings +} // JSMaxDescription is the maximum description length for streams and consumers. const JSMaxDescriptionLen = 4 * 1024 diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 192da6261a0..4224d0fb238 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -6875,7 +6875,7 @@ func TestJetStreamClusterDomainsWithNoJSHub(t *testing.T) { // Client based API - Connected to the core cluster with no JS but account has JS. s := c.randomServer() // Make sure the JS interest from the LNs has made it to this server. - checkSubInterest(t, s, "NOJS", "$JS.SPOKE.API.>", time.Second) + checkSubInterest(t, s, "NOJS", "$JS.SPOKE.API.INFO", time.Second) nc, _ := jsClientConnect(t, s, nats.UserInfo("nojs", "p")) defer nc.Close() diff --git a/server/leafnode.go b/server/leafnode.go index a129733c253..40f068e1f8b 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1251,8 +1251,8 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c meta.Campaign() } } else { - c.Noticef("JetStream Not Extended, adding deny %q for account %q", jsAllAPI, accName) - c.mergeDenyPermissionsLocked(both, []string{jsAllAPI}) + c.Noticef("JetStream Not Extended, adding deny %+v for account %q", denyAllClientJs, accName) + c.mergeDenyPermissionsLocked(both, denyAllClientJs) } blockMappingOutgoing = true } else if acc == sysAcc { @@ -1274,19 +1274,21 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c // If the system account is shared, jsAllAPI traffic will go through the system account. // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account. // If the system account is NOT shared, jsAllAPI traffic has no business - c.Noticef("Adding deny %q for account %q", jsAllAPI, accName) - c.mergeDenyPermissionsLocked(both, []string{jsAllAPI}) + c.Noticef("Adding deny %+v for account %q", denyAllClientJs, accName) + c.mergeDenyPermissionsLocked(both, denyAllClientJs) } // If we have a specified JetStream domain we will want to add a mapping to // allow access cross domain for each non-system account. if opts.JetStreamDomain != _EMPTY_ && acc != sysAcc && opts.JetStream { - src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain) - if err := acc.AddMapping(src, jsAllAPI); err != nil { - c.Debugf("Error adding JetStream domain mapping: %s", err.Error()) - } else { - c.Noticef("Adding JetStream Domain Mapping %q to account %q", src, accName) + for src, dest := range generateJSMappingTable(opts.JetStreamDomain) { + if err := acc.AddMapping(src, dest); err != nil { + c.Debugf("Error adding JetStream domain mapping: %s", err.Error()) + } else { + c.Noticef("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName) + } } if blockMappingOutgoing { + src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain) // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection // This is a guard against a miss-config with two identical domain names and will only cover some forms // of this issue, not all of them. @@ -1496,6 +1498,10 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { } // Likewise for mappings. for _, m := range acc.mappings { + if c.isSpokeLeafNode() && !c.canSubscribe(m.src) { + c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag) + continue + } ims = append(ims, m.src) } @@ -1695,6 +1701,9 @@ func (c *client) forceAddToSmap(subj string) { c.mu.Lock() defer c.mu.Unlock() + if c.leaf.smap == nil { + return + } n := c.leaf.smap[subj] if n != 0 { return diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 6ab7c38fb3d..c1fcbe154e6 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -5183,3 +5183,100 @@ func TestLeafNodeQueueGroupWithLateLNJoin(t *testing.T) { natsPub(t, cln1, "foo", []byte("hello")) natsNexMsg(t, sln2, time.Second) } + +func TestLeafNodeJetStreamDomainMapCrossTalk(t *testing.T) { + accs := ` +accounts :{ + A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, +} +system_account: SYS +` + + sd1 := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(sd1) + confA := createConfFile(t, []byte(fmt.Sprintf(` +listen: 127.0.0.1:-1 +%s +jetstream: { domain: da, store_dir: '%s', max_mem: 50Mb, max_file: 50Mb } +leafnodes: { + listen: 127.0.0.1:-1 + no_advertise: true + authorization: { + timeout: 0.5 + } +} +`, accs, sd1))) + defer removeFile(t, confA) + sA, _ := RunServerWithConfig(confA) + defer sA.Shutdown() + + sd2 := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(sd2) + confL := createConfFile(t, []byte(fmt.Sprintf(` +listen: 127.0.0.1:-1 +%s +jetstream: { domain: dl, store_dir: '%s', max_mem: 50Mb, max_file: 50Mb } +leafnodes:{ + no_advertise: true + remotes:[{url:nats://a1:a1@127.0.0.1:%d, account: A}, + {url:nats://s1:s1@127.0.0.1:%d, account: SYS}] +} +`, accs, sd2, sA.opts.LeafNode.Port, sA.opts.LeafNode.Port))) + defer removeFile(t, confL) + sL, _ := RunServerWithConfig(confL) + defer sL.Shutdown() + + ncA := natsConnect(t, sA.ClientURL(), nats.UserInfo("a1", "a1")) + defer ncA.Close() + ncL := natsConnect(t, sL.ClientURL(), nats.UserInfo("a1", "a1")) + defer ncL.Close() + + test := func(jsA, jsL nats.JetStreamContext) { + kvA, err := jsA.CreateKeyValue(&nats.KeyValueConfig{Bucket: "bucket"}) + require_NoError(t, err) + kvL, err := jsL.CreateKeyValue(&nats.KeyValueConfig{Bucket: "bucket"}) + require_NoError(t, err) + + _, err = kvA.Put("A", nil) + require_NoError(t, err) + _, err = kvL.Put("L", nil) + require_NoError(t, err) + + // check for unwanted cross talk + _, err = kvA.Get("A") + require_NoError(t, err) + _, err = kvA.Get("l") + require_Error(t, err) + require_True(t, err == nats.ErrKeyNotFound) + + _, err = kvL.Get("A") + require_Error(t, err) + require_True(t, err == nats.ErrKeyNotFound) + _, err = kvL.Get("L") + require_NoError(t, err) + + err = jsA.DeleteKeyValue("bucket") + require_NoError(t, err) + err = jsL.DeleteKeyValue("bucket") + require_NoError(t, err) + } + + jsA, err := ncA.JetStream() + require_NoError(t, err) + jsL, err := ncL.JetStream() + require_NoError(t, err) + test(jsA, jsL) + + jsAL, err := ncA.JetStream(nats.Domain("dl")) + require_NoError(t, err) + jsLA, err := ncL.JetStream(nats.Domain("da")) + require_NoError(t, err) + test(jsAL, jsLA) + + jsAA, err := ncA.JetStream(nats.Domain("da")) + require_NoError(t, err) + jsLL, err := ncL.JetStream(nats.Domain("dl")) + require_NoError(t, err) + test(jsAA, jsLL) +} diff --git a/server/server.go b/server/server.go index 6b2476e6a1b..a15301b9940 100644 --- a/server/server.go +++ b/server/server.go @@ -1405,10 +1405,12 @@ func (s *Server) registerAccountNoLock(acc *Account) *Account { if jsEnabled { s.Warnf("Skipping Default Domain %q, set for JetStream enabled account %q", defDomain, accName) } else if defDomain != _EMPTY_ { - dest := fmt.Sprintf(jsDomainAPI, defDomain) - s.Noticef("Adding default domain mapping %q -> %q to account %q %p", jsAllAPI, dest, accName, acc) - if err := acc.AddMapping(jsAllAPI, dest); err != nil { - s.Errorf("Error adding JetStream default domain mapping: %v", err) + for src, dest := range generateJSMappingTable(defDomain) { + // flip src and dest around so the domain is inserted + s.Noticef("Adding default domain mapping %q -> %q to account %q %p", dest, src, accName, acc) + if err := acc.AddMapping(dest, src); err != nil { + s.Errorf("Error adding JetStream default domain mapping: %v", err) + } } } }