Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panic in tailer when an ingester is removed from the ring while tailing #897

Merged
merged 1 commit into from
Aug 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,34 +321,50 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,

// passed to tailer for (re)connecting to new or disconnected ingesters
func (q *Querier) tailDisconnectedIngesters(ctx context.Context, req *logproto.TailRequest, connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) {
tailClients := make(map[string]logproto.Querier_TailClient)
for i := range connectedIngestersAddr {
tailClients[connectedIngestersAddr[i]] = nil
// Build a map to easily check if an ingester address is already connected
connected := make(map[string]bool)
for _, addr := range connectedIngestersAddr {
connected[addr] = true
}

disconnectedIngesters := []ring.IngesterDesc{}
// Get the current replication set from the ring
replicationSet, err := q.ring.GetAll()
if err != nil {
return nil, err
}

// Look for disconnected ingesters or new one we should (re)connect to
reconnectIngesters := []ring.IngesterDesc{}

for _, ingester := range replicationSet.Ingesters {
if _, isOk := tailClients[ingester.Addr]; isOk {
delete(tailClients, ingester.Addr)
} else {
disconnectedIngesters = append(disconnectedIngesters, ingester)
if _, ok := connected[ingester.Addr]; ok {
continue
}

// Skip ingesters which are leaving or joining the cluster
if ingester.State != ring.ACTIVE {
continue
}

reconnectIngesters = append(reconnectIngesters, ingester)
}

clients, err := q.forGivenIngesters(ring.ReplicationSet{Ingesters: disconnectedIngesters}, func(client logproto.QuerierClient) (interface{}, error) {
if len(reconnectIngesters) == 0 {
return nil, nil
}

// Instance a tail client for each ingester to re(connect)
reconnectClients, err := q.forGivenIngesters(ring.ReplicationSet{Ingesters: reconnectIngesters}, func(client logproto.QuerierClient) (interface{}, error) {
return client.Tail(ctx, req)
})
if err != nil {
return nil, err
}

for i := range clients {
tailClients[clients[i].addr] = clients[i].response.(logproto.Querier_TailClient)
reconnectClientsMap := make(map[string]logproto.Querier_TailClient)
for _, client := range reconnectClients {
reconnectClientsMap[client.addr] = client.response.(logproto.Querier_TailClient)
}
return tailClients, nil

return reconnectClientsMap, nil
}
9 changes: 9 additions & 0 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,15 @@ func mockReadRingWithOneActiveIngester() *readRingMock {
})
}

func mockIngesterDesc(addr string, state ring.IngesterState) ring.IngesterDesc {
return ring.IngesterDesc{
Addr: addr,
Timestamp: time.Now().UnixNano(),
State: state,
Tokens: []uint32{1, 2, 3},
}
}

// mockStreamIterator returns an iterator with 1 stream and quantity entries,
// where entries timestamp and line string are constructed as sequential numbers
// starting at from
Expand Down
92 changes: 92 additions & 0 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -162,6 +163,97 @@ func TestQuerier_Tail_QueryTimeoutConfigFlag(t *testing.T) {
store.AssertExpectations(t)
}

func TestQuerier_tailDisconnectedIngesters(t *testing.T) {
t.Parallel()

tests := map[string]struct {
connectedIngestersAddr []string
ringIngesters []ring.IngesterDesc
expectedClientsAddr []string
}{
"no connected ingesters and empty ring": {
connectedIngestersAddr: []string{},
ringIngesters: []ring.IngesterDesc{},
expectedClientsAddr: []string{},
},
"no connected ingesters and ring containing new ingesters": {
connectedIngestersAddr: []string{},
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE)},
expectedClientsAddr: []string{"1.1.1.1"},
},
"connected ingesters and ring contain the same ingesters": {
connectedIngestersAddr: []string{"1.1.1.1", "2.2.2.2"},
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("2.2.2.2", ring.ACTIVE), mockIngesterDesc("1.1.1.1", ring.ACTIVE)},
expectedClientsAddr: []string{},
},
"ring contains new ingesters compared to the connected one": {
connectedIngestersAddr: []string{"1.1.1.1"},
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE), mockIngesterDesc("2.2.2.2", ring.ACTIVE), mockIngesterDesc("3.3.3.3", ring.ACTIVE)},
expectedClientsAddr: []string{"2.2.2.2", "3.3.3.3"},
},
"connected ingesters contain ingesters not in the ring anymore": {
connectedIngestersAddr: []string{"1.1.1.1", "2.2.2.2", "3.3.3.3"},
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE), mockIngesterDesc("3.3.3.3", ring.ACTIVE)},
expectedClientsAddr: []string{},
},
"connected ingesters contain ingesters not in the ring anymore and the ring contains new ingesters too": {
connectedIngestersAddr: []string{"1.1.1.1", "2.2.2.2", "3.3.3.3"},
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE), mockIngesterDesc("3.3.3.3", ring.ACTIVE), mockIngesterDesc("4.4.4.4", ring.ACTIVE)},
expectedClientsAddr: []string{"4.4.4.4"},
},
"ring contains ingester in LEAVING state not listed in the connected ingesters": {
connectedIngestersAddr: []string{"1.1.1.1"},
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE), mockIngesterDesc("2.2.2.2", ring.LEAVING)},
expectedClientsAddr: []string{},
},
"ring contains ingester in PENDING state not listed in the connected ingesters": {
connectedIngestersAddr: []string{"1.1.1.1"},
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE), mockIngesterDesc("2.2.2.2", ring.PENDING)},
expectedClientsAddr: []string{},
},
}

for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
req := logproto.TailRequest{
Query: "{type=\"test\"}",
Regex: "",
DelayFor: 0,
Limit: 10,
Start: time.Now(),
}

// For this test's purpose, whenever a new ingester client needs to
// be created, the factory will always return the same mock instance
ingesterClient := newQuerierClientMock()
ingesterClient.On("Tail", mock.Anything, &req, mock.Anything).Return(newTailClientMock(), nil)

q, err := newQuerier(
mockQuerierConfig(),
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
newReadRingMock(testData.ringIngesters),
newStoreMock())
require.NoError(t, err)

actualClients, err := q.tailDisconnectedIngesters(context.Background(), &req, testData.connectedIngestersAddr)
require.NoError(t, err)

actualClientsAddr := make([]string, 0, len(actualClients))
for addr, client := range actualClients {
actualClientsAddr = append(actualClientsAddr, addr)

// The returned map of clients should never contain nil values
assert.NotNil(t, client)
}

assert.ElementsMatch(t, testData.expectedClientsAddr, actualClientsAddr)
})
}
}

func mockQuerierConfig() Config {
return Config{
TailMaxDuration: 1 * time.Minute,
Expand Down