From e27caadca6ed14bf67318098b658377778f50157 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Fri, 1 Jun 2018 14:48:25 -0500 Subject: [PATCH 1/2] Fix unnecessary deregistration in consul sync This commit fixes an issue where if a nomad client and server shared the same consul instance, the server would deregister any services and checks registered by clients for running tasks. --- command/agent/consul/client.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 6a0c789dc9b..6d561402130 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -232,6 +232,10 @@ type ServiceClient struct { // checkWatcher restarts checks that are unhealthy. checkWatcher *checkWatcher + + // agentRoleLock guards state about whether this agent is a client + agentRoleLock sync.Mutex + isClientAgent bool } // NewServiceClient creates a new Consul ServiceClient from an existing Consul API @@ -433,7 +437,7 @@ func (c *ServiceClient) sync() error { // Known service, skip continue } - if !isNomadService(id) { + if !isNomadService(id) || !c.IsClient() { // Not managed by Nomad, skip continue } @@ -470,7 +474,7 @@ func (c *ServiceClient) sync() error { // Known check, leave it continue } - if !isNomadService(check.ServiceID) { + if !isNomadService(check.ServiceID) || !c.IsClient() { // Service not managed by Nomad, skip continue } @@ -519,6 +523,12 @@ func (c *ServiceClient) sync() error { return nil } +func (c *ServiceClient) IsClient() bool { + c.agentRoleLock.Lock() + defer c.agentRoleLock.Unlock() + return c.isClientAgent +} + // RegisterAgent registers Nomad agents (client or server). The // Service.PortLabel should be a literal port to be parsed with SplitHostPort. // Script checks are not supported and will return an error. Registration is @@ -528,6 +538,12 @@ func (c *ServiceClient) sync() error { func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) error { ops := operations{} + if role == "client" { + c.agentRoleLock.Lock() + c.isClientAgent = true + c.agentRoleLock.Unlock() + } + for _, service := range services { id := makeAgentServiceID(role, service) From a7668cd4ec4f08152275a4429a35f98b5f895ec2 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Fri, 1 Jun 2018 15:59:53 -0500 Subject: [PATCH 2/2] Fix tests and move isClient to constructor --- client/task_runner_test.go | 4 ++-- command/agent/agent.go | 2 +- command/agent/consul/client.go | 37 ++++++++++++++++--------------- command/agent/consul/int_test.go | 2 +- command/agent/consul/unit_test.go | 2 +- 5 files changed, 24 insertions(+), 23 deletions(-) diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 1c5dae6bbad..dc22abcaf53 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -143,7 +143,7 @@ func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocat vclient := vaultclient.NewMockVaultClient() cclient := consul.NewMockAgent() - serviceClient := consul.NewServiceClient(cclient, logger) + serviceClient := consul.NewServiceClient(cclient, logger, true) go serviceClient.Run() tr := NewTaskRunner(logger, conf, db, upd.Update, taskDir, alloc, task, vclient, serviceClient) if !restarts { @@ -1860,7 +1860,7 @@ func TestTaskRunner_CheckWatcher_Restart(t *testing.T) { // backed by a mock consul whose checks are always unhealthy. consulAgent := consul.NewMockAgent() consulAgent.SetStatus("critical") - consulClient := consul.NewServiceClient(consulAgent, ctx.tr.logger) + consulClient := consul.NewServiceClient(consulAgent, ctx.tr.logger, true) go consulClient.Run() defer consulClient.Shutdown() diff --git a/command/agent/agent.go b/command/agent/agent.go index c44b5125325..3b77ddc2225 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -958,7 +958,7 @@ func (a *Agent) setupConsul(consulConfig *config.ConsulConfig) error { a.consulCatalog = client.Catalog() // Create Consul Service client for service advertisement and checks. - a.consulService = consul.NewServiceClient(client.Agent(), a.logger) + a.consulService = consul.NewServiceClient(client.Agent(), a.logger, a.Client() != nil) // Run the Consul service client's sync'ing main loop go a.consulService.Run() diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 6d561402130..7ca1fc845d7 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -233,14 +233,16 @@ type ServiceClient struct { // checkWatcher restarts checks that are unhealthy. checkWatcher *checkWatcher - // agentRoleLock guards state about whether this agent is a client - agentRoleLock sync.Mutex + // isClientAgent specifies whether this Consul client is being used + // by a Nomad client. isClientAgent bool } // NewServiceClient creates a new Consul ServiceClient from an existing Consul API -// Client and logger. -func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient { +// Client, logger and takes whether the client is being used by a Nomad Client agent. +// When being used by a Nomad client, this Consul client reconciles all services and +// checks created by Nomad on behalf of running tasks. +func NewServiceClient(consulClient AgentAPI, logger *log.Logger, isNomadClient bool) *ServiceClient { return &ServiceClient{ client: consulClient, logger: logger, @@ -259,6 +261,7 @@ func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient agentServices: make(map[string]struct{}), agentChecks: make(map[string]struct{}), checkWatcher: newCheckWatcher(logger, consulClient), + isClientAgent: isNomadClient, } } @@ -437,7 +440,12 @@ func (c *ServiceClient) sync() error { // Known service, skip continue } - if !isNomadService(id) || !c.IsClient() { + + // Ignore if this is not a Nomad managed service. Also ignore + // Nomad managed services if this is not a client agent. + // This is to prevent server agents from removing services + // registered by client agents + if !isNomadService(id) || !c.isClientAgent { // Not managed by Nomad, skip continue } @@ -474,7 +482,12 @@ func (c *ServiceClient) sync() error { // Known check, leave it continue } - if !isNomadService(check.ServiceID) || !c.IsClient() { + + // Ignore if this is not a Nomad managed check. Also ignore + // Nomad managed checks if this is not a client agent. + // This is to prevent server agents from removing checks + // registered by client agents + if !isNomadService(check.ServiceID) || !c.isClientAgent { // Service not managed by Nomad, skip continue } @@ -523,12 +536,6 @@ func (c *ServiceClient) sync() error { return nil } -func (c *ServiceClient) IsClient() bool { - c.agentRoleLock.Lock() - defer c.agentRoleLock.Unlock() - return c.isClientAgent -} - // RegisterAgent registers Nomad agents (client or server). The // Service.PortLabel should be a literal port to be parsed with SplitHostPort. // Script checks are not supported and will return an error. Registration is @@ -538,12 +545,6 @@ func (c *ServiceClient) IsClient() bool { func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) error { ops := operations{} - if role == "client" { - c.agentRoleLock.Lock() - c.isClientAgent = true - c.agentRoleLock.Unlock() - } - for _, service := range services { id := makeAgentServiceID(role, service) diff --git a/command/agent/consul/int_test.go b/command/agent/consul/int_test.go index ab4d7ec4343..230b97bb8fb 100644 --- a/command/agent/consul/int_test.go +++ b/command/agent/consul/int_test.go @@ -142,7 +142,7 @@ func TestConsul_Integration(t *testing.T) { consulClient, err := consulapi.NewClient(consulConfig) assert.Nil(err) - serviceClient := consul.NewServiceClient(consulClient.Agent(), logger) + serviceClient := consul.NewServiceClient(consulClient.Agent(), logger, true) defer serviceClient.Shutdown() // just-in-case cleanup consulRan := make(chan struct{}) go func() { diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index dcc5a1dcbf7..2cbb6488e23 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -117,7 +117,7 @@ func setupFake(t *testing.T) *testFakeCtx { fc := NewMockAgent() tt := testTask() return &testFakeCtx{ - ServiceClient: NewServiceClient(fc, testlog.Logger(t)), + ServiceClient: NewServiceClient(fc, testlog.Logger(t), true), FakeConsul: fc, Task: tt, MockExec: tt.DriverExec.(*mockExec),