Skip to content

Commit

Permalink
Add more logs to inspect OpenSearch missing updates issue (#6364)
Browse files Browse the repository at this point in the history
  • Loading branch information
neil-xie authored Oct 15, 2024
1 parent 473ccf1 commit 5270ea8
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 0 deletions.
19 changes: 19 additions & 0 deletions service/worker/indexer/esProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,25 @@ func (p *ESProcessorImpl) shadowBulkAfterAction(id int64, requests []bulk.Generi
tag.ESResponseStatus(err.Status),
tag.ESRequest(request.String()))
}
return
}
responseItems := response.Items
for i := 0; i < len(requests) && i < len(responseItems); i++ {
key := p.retrieveKafkaKey(requests[i])
if key == "" {
continue
}
responseItem := responseItems[i]
// It is possible for err to be nil while the responses in response.Items might still contain errors or unsuccessful statuses for individual requests.
// This is because the err variable refers to the overall bulk request operation, but each individual request in the bulk operation has its own status code.
for _, resp := range responseItem {
if !isResponseSuccess(resp.Status) {
wid, rid, domainID := p.getMsgWithInfo(key)
p.logger.Error("ES request failed in secondary processor",
tag.ESResponseStatus(resp.Status), tag.ESResponseError(getErrorMsgFromESResp(resp)), tag.WorkflowID(wid), tag.WorkflowRunID(rid),
tag.WorkflowDomainID(domainID))
}
}
}
}

Expand Down
46 changes: 46 additions & 0 deletions service/worker/indexer/esProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,3 +539,49 @@ func (s *esProcessorSuite) TestBulkAfterAction_Nack_Shadow_WithError() {
// Mocking secondary processor to test shadowBulkAfterAction with error
s.esProcessor.shadowBulkAfterAction(0, requests, response, mockErr)
}

func (s *esProcessorSuite) TestBulkAfterAction_Shadow_Fail_WithoutError() {
version := int64(3)
testKey := "testKey"
request := &mocks2.GenericBulkableRequest{}
request.On("String").Return("")
request.On("Source").Return([]string{string(`{"delete":{"_id":"testKey"}}`)}, nil)
requests := []bulk.GenericBulkableRequest{request}

mFailed := map[string]*bulk.GenericBulkResponseItem{
"index": {
Index: testIndex,
Type: testType,
ID: testID,
Version: version,
Status: 400,
},
}
response := &bulk.GenericBulkResponse{
Took: 3,
Errors: false,
Items: []map[string]*bulk.GenericBulkResponseItem{mFailed},
}

wid := "test-workflowID"
rid := "test-runID"
domainID := "test-domainID"
payload := s.getEncodedMsg(wid, rid, domainID)

mockKafkaMsg := &msgMocks.Message{}
mapVal := newKafkaMessageWithMetrics(mockKafkaMsg, &testStopWatch)
s.esProcessor.mapToKafkaMsg.Put(testKey, mapVal)

// Add mocked secondary processor
secondaryProcessor := &mocks2.GenericBulkProcessor{}
s.esProcessor.bulkProcessor = append(s.esProcessor.bulkProcessor, secondaryProcessor)

// Mock Kafka message Nack and Value
mockKafkaMsg.On("Nack").Return(nil).Once()
mockKafkaMsg.On("Value").Return(payload).Once()
s.mockScope.On("IncCounter", mock.AnythingOfType("int")).Return()
// Execute bulkAfterAction for primary processor with error
s.esProcessor.bulkAfterAction(0, requests, response, nil)
// Mocking secondary processor to test shadowBulkAfterAction with error
s.esProcessor.shadowBulkAfterAction(0, requests, response, nil)
}

0 comments on commit 5270ea8

Please sign in to comment.