Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vault: fix token revocation during workflow migration #19689

Merged
merged 4 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/19689.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
vault: Fixed a bug that could cause errors during leadership transition when migrating to the new JWT and workload identity authentication workflow
```
20 changes: 20 additions & 0 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,26 @@ func TestLeader_revokeVaultAccessorsOnRestore(t *testing.T) {
}
}

func TestLeader_revokeVaultAccessorsOnRestore_workloadIdentity(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

// Insert a Vault accessor that should be revoked
fsmState := s1.fsm.State()
va := mock.VaultAccessor()
err := fsmState.UpsertVaultAccessor(100, []*structs.VaultAccessor{va})
must.NoError(t, err)

// Do a restore
err = s1.revokeVaultAccessorsOnRestore()
must.NoError(t, err)
}

func TestLeader_revokeSITokenAccessorsOnRestore(t *testing.T) {
ci.Parallel(t)
r := require.New(t)
Expand Down
148 changes: 148 additions & 0 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,49 @@ func TestClientEndpoint_Deregister_Vault(t *testing.T) {
}
}

func TestClientEndpoint_Deregister_Vault_WorkloadIdentity(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
// Enable Vault config and don't set any connection info to use the
// workload identity flow.
c.VaultConfigs[structs.VaultDefaultCluster].Enabled = pointer.Of(true)
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Register mock node.
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)
must.NoError(t, err)

// Put some Vault accessors in the state store for that node
var accessors []*structs.VaultAccessor
for i := 0; i < 3; i++ {
va := mock.VaultAccessor()
va.NodeID = node.ID
accessors = append(accessors, va)
}
state := s1.fsm.State()
state.UpsertVaultAccessor(100, accessors)

// Deregister the mock node and verify no error happens when Vault tokens
// are revoked.
dereg := &structs.NodeDeregisterRequest{
NodeID: node.ID,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.GenericResponse
err = msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp2)
must.NoError(t, err)
}

func TestClientEndpoint_UpdateStatus(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
Expand Down Expand Up @@ -815,6 +858,50 @@ func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) {
}
}

func TestClientEndpoint_UpdateStatus_Vault_WorkloadIdentity(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
// Enable Vault config and don't set any connection info to use the
// workload identity flow.
c.VaultConfigs[structs.VaultDefaultCluster].Enabled = pointer.Of(true)
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Register mock node.
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.NodeUpdateResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)
must.NoError(t, err)

// Put some Vault accessors in the state store for the node.
var accessors []*structs.VaultAccessor
for i := 0; i < 3; i++ {
va := mock.VaultAccessor()
va.NodeID = node.ID
accessors = append(accessors, va)
}
state := s1.fsm.State()
state.UpsertVaultAccessor(100, accessors)

// Update the status to be down and verify no error when Vault tokens are
// revoked.
updateReq := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusDown,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.NodeUpdateResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", updateReq, &resp2)
must.NoError(t, err)
}

func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
ci.Parallel(t)

Expand Down Expand Up @@ -3225,6 +3312,67 @@ func TestClientEndpoint_UpdateAlloc_Vault(t *testing.T) {
}
}

func TestClientEndpoint_UpdateAlloc_VaultWorkloadIdentity(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
// Enable Vault config and don't set any connection info to use the
// workload identity flow.
c.VaultConfigs[structs.VaultDefaultCluster].Enabled = pointer.Of(true)
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the node register request.
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)
must.NoError(t, err)

// Inject allocation and a few Vault accessors.
alloc := mock.Alloc()
alloc.NodeID = node.ID
state := s1.fsm.State()
state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
err = state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc})
must.NoError(t, err)

var accessors []*structs.VaultAccessor
for i := 0; i < 3; i++ {
va := mock.VaultAccessor()
va.NodeID = node.ID
va.AllocID = alloc.ID
accessors = append(accessors, va)
}
err = state.UpsertVaultAccessor(101, accessors)
must.NoError(t, err)

// Inject mock job.
job := mock.Job()
job.ID = alloc.JobID
err = state.UpsertJob(structs.MsgTypeTestSetup, 101, nil, job)
must.NoError(t, err)

// Update alloc status and verify no error happens when the orphaned Vault
// tokens are revoked.
clientAlloc := new(structs.Allocation)
*clientAlloc = *alloc
clientAlloc.ClientStatus = structs.AllocClientStatusFailed

update := &structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{clientAlloc},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.NodeAllocsResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp2)
must.NoError(t, err)
}

func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
ci.Parallel(t)

Expand Down
23 changes: 20 additions & 3 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,24 @@ func (s *Server) Reload(newConfig *Config) error {

// Handle the Vault reload. Vault should never be nil but just guard.
if s.vault != nil {
if err := s.vault.SetConfig(newConfig.GetDefaultVault()); err != nil {
vconfig := newConfig.GetDefaultVault()

// Verify if the new configuration would cause the client type to
// change.
var err error
switch s.vault.(type) {
case *NoopVault:
if vconfig != nil && vconfig.Token != "" {
err = fmt.Errorf("setting a Vault token requires restarting the Nomad agent")
}
case *vaultClient:
if vconfig != nil && vconfig.Token == "" {
err = fmt.Errorf("removing the Vault token requires restarting the Nomad agent")
}
}
if err != nil {
_ = multierror.Append(&mErr, err)
} else if err := s.vault.SetConfig(newConfig.GetDefaultVault()); err != nil {
_ = multierror.Append(&mErr, err)
}
}
Expand Down Expand Up @@ -1192,8 +1209,8 @@ func (s *Server) setupConsul(consulConfigFunc consul.ConfigAPIFunc, consulACLs c
// setupVaultClient is used to set up the Vault API client.
func (s *Server) setupVaultClient() error {
vconfig := s.config.GetDefaultVault()
if vconfig != nil && vconfig.DefaultIdentity != nil {
s.vault = &NoopVault{}
if vconfig != nil && vconfig.Token == "" {
s.vault = NewNoopVault(s.logger)
return nil
}

Expand Down
25 changes: 12 additions & 13 deletions nomad/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,32 +205,31 @@ func TestServer_Regions(t *testing.T) {
func TestServer_Reload_Vault(t *testing.T) {
ci.Parallel(t)

token := uuid.Generate()
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.Region = "global"
c.GetDefaultVault().Token = token
})
defer cleanupS1()

if s1.vault.Running() {
t.Fatalf("Vault client should not be running")
}
must.False(t, s1.vault.Running())

tr := true
config := DefaultConfig()
config.GetDefaultVault().Enabled = &tr
config.GetDefaultVault().Token = uuid.Generate()
config.GetDefaultVault().Token = token
config.GetDefaultVault().Namespace = "nondefault"

if err := s1.Reload(config); err != nil {
t.Fatalf("Reload failed: %v", err)
}
err := s1.Reload(config)
must.NoError(t, err)

if !s1.vault.Running() {
t.Fatalf("Vault client should be running")
}
must.True(t, s1.vault.Running())
must.Eq(t, "nondefault", s1.vault.GetConfig().Namespace)

if s1.vault.GetConfig().Namespace != "nondefault" {
t.Fatalf("Vault client did not get new namespace")
}
// Removing the token requires agent restart.
config.GetDefaultVault().Token = ""
err = s1.Reload(config)
must.ErrorContains(t, err, "requires restarting the Nomad agent")
}

func connectionReset(msg string) bool {
Expand Down
26 changes: 20 additions & 6 deletions nomad/vault_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
vapi "github.com/hashicorp/vault/api"
Expand All @@ -17,6 +18,13 @@ import (
type NoopVault struct {
l sync.Mutex
config *config.VaultConfig
logger log.Logger
}

func NewNoopVault(logger log.Logger) *NoopVault {
return &NoopVault{
logger: logger.Named("vault-noop"),
}
}

func (v *NoopVault) SetActive(_ bool) {}
Expand All @@ -37,19 +45,25 @@ func (v *NoopVault) GetConfig() *config.VaultConfig {
}

func (v *NoopVault) CreateToken(_ context.Context, _ *structs.Allocation, _ string) (*vapi.Secret, error) {
return nil, errors.New("Vault client not able to create tokens")
return nil, errors.New("Nomad server is not configured to create tokens")
}

func (v *NoopVault) LookupToken(_ context.Context, _ string) (*vapi.Secret, error) {
return nil, errors.New("Vault client not able to lookup tokens")
return nil, errors.New("Nomad server is not configured to lookup tokens")
}

func (v *NoopVault) RevokeTokens(_ context.Context, _ []*structs.VaultAccessor, _ bool) error {
return errors.New("Vault client not able to revoke tokens")
func (v *NoopVault) RevokeTokens(_ context.Context, tokens []*structs.VaultAccessor, _ bool) error {
for _, t := range tokens {
v.logger.Debug("Vault token is no longer used, but Nomad is not able to revoke it. The token may need to be revoked manually or will expire once its TTL reaches zero.", "accessor", t.Accessor, "ttl", t.CreationTTL)
}
return nil
}

func (v *NoopVault) MarkForRevocation(accessors []*structs.VaultAccessor) error {
return errors.New("Vault client not able to revoke tokens")
func (v *NoopVault) MarkForRevocation(tokens []*structs.VaultAccessor) error {
for _, t := range tokens {
v.logger.Debug("Vault token is no longer used, but Nomad is not able to mark it for revocation. The token may need to be revoked manually or will expire once its TTL reaches zero.", "accessor", t.Accessor, "ttl", t.CreationTTL)
}
return nil
}

func (v *NoopVault) Stop() {}
Expand Down
21 changes: 15 additions & 6 deletions website/content/docs/integrations/vault/acl.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -751,18 +751,26 @@ $ VAULT_TOKEN=s.H39hfS7eHSbb1GpkdzOQLTmz.fvuLy nomad job run vault.nomad
## Migrating to Using Workload Identity with Vault

Migrating from the legacy (pre-1.7) workflow where workloads use the agent's
Vault token requires configuation on your Vault cluster and your Nomad server
Vault token requires configuration on your Vault cluster and your Nomad server
agents. It does not require updating your running Nomad jobs unless you wish to
specify a non-default role. To migrate:

* Create the Vault auth method, default role, and policies on your Vault
cluster.
* Enable [`vault.default_identity`][] blocks in your Nomad server agent
configurations.
* (Optionally) Add [`vault.role`][] fields to any Nomad jobs that will not use
the default role.
* (Optionally) add [`identity`][] blocks to your jobs if you want to use a
different identity because of how your auth method and roles are configured.
configurations, but **do not modify any of the existing Vault
configuration**.
* Upgrade your cluster following the documented [Upgrade
Process][docs_upgrade].
* Resubmit Nomad jobs that need access to Vault to redeploy them with a new
workload identity for Vault.
Comment on lines +765 to +766
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to think of ways we could help with this. Two ideas I had were:

  1. A command that looks for the following artifacts and tell users if their clusters are ready for the migration.
  • jobs with vault blocks (and maybe templates with Vault-related functions? A vault block is not always required to access Vault) but no Vault identities.
  • unused VaultAccessors in state store (and potentially clean them without needing a leadership transition)
  1. Some kind of metric that operators can monitor and wait until it reaches zero. But I'm not sure yet exactly what to measure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of a command... maybe we could bake this into a -check flag for nomad setup vault?

A metric would be nice, but the only reasonable place for it would be in the state store and we'd have to repopulate it during snapshot restore. Kind of messy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! I will try to think a bit more where to place the command. I was thinking somewhere under nomad operator to indicate that this may require a management token since it needs to look all over the state store.

It could also be used for other upgrade checks in the future.

* (Optionally) Add [`vault.role`][] fields to any Nomad jobs that will not
use the default role.
* (Optionally) add [`identity`][] blocks to your jobs if you want to use a
different identity because of how your auth method and roles are
configured.
* Once all jobs have been resubmitted, you may remove parameters no longer used
by the Nomad server agents from the [`vault`][config] configuration block.

[Variables]: /nomad/docs/concepts/variables
[Vault Namespaces]: /vault/docs/enterprise/namespaces
Expand All @@ -785,6 +793,7 @@ specify a non-default role. To migrate:
[allow_unauth]: /nomad/docs/configuration/vault#allow_unauthenticated
[auth]: /vault/docs/auth/token 'Vault Authentication Backend'
[config]: /nomad/docs/configuration/vault 'Nomad Vault Configuration Block'
[docs_upgrade]: /nomad/docs/upgrade#upgrade-process
[ent]: #enterprise-configuration
[img_vault_auth_method]: /img/vault-integration-auth-method.png
[img_vault_auth_overview]: /img/vault-integration-auth-overview.png
Expand Down
Loading