diff --git a/service/worker/indexer/esProcessor.go b/service/worker/indexer/esProcessor.go index 430c69bf14b..ea59e5b1257 100644 --- a/service/worker/indexer/esProcessor.go +++ b/service/worker/indexer/esProcessor.go @@ -266,6 +266,25 @@ func (p *ESProcessorImpl) shadowBulkAfterAction(id int64, requests []bulk.Generi tag.ESResponseStatus(err.Status), tag.ESRequest(request.String())) } + return + } + responseItems := response.Items + for i := 0; i < len(requests) && i < len(responseItems); i++ { + key := p.retrieveKafkaKey(requests[i]) + if key == "" { + continue + } + responseItem := responseItems[i] + // It is possible for err to be nil while the responses in response.Items might still contain errors or unsuccessful statuses for individual requests. + // This is because the err variable refers to the overall bulk request operation, but each individual request in the bulk operation has its own status code. + for _, resp := range responseItem { + if !isResponseSuccess(resp.Status) { + wid, rid, domainID := p.getMsgWithInfo(key) + p.logger.Error("ES request failed in secondary processor", + tag.ESResponseStatus(resp.Status), tag.ESResponseError(getErrorMsgFromESResp(resp)), tag.WorkflowID(wid), tag.WorkflowRunID(rid), + tag.WorkflowDomainID(domainID)) + } + } } } diff --git a/service/worker/indexer/esProcessor_test.go b/service/worker/indexer/esProcessor_test.go index d8b8f1c9cd5..6476eb5f6d4 100644 --- a/service/worker/indexer/esProcessor_test.go +++ b/service/worker/indexer/esProcessor_test.go @@ -539,3 +539,49 @@ func (s *esProcessorSuite) TestBulkAfterAction_Nack_Shadow_WithError() { // Mocking secondary processor to test shadowBulkAfterAction with error s.esProcessor.shadowBulkAfterAction(0, requests, response, mockErr) } + +func (s *esProcessorSuite) TestBulkAfterAction_Shadow_Fail_WithoutError() { + version := int64(3) + testKey := "testKey" + request := &mocks2.GenericBulkableRequest{} + request.On("String").Return("") + request.On("Source").Return([]string{string(`{"delete":{"_id":"testKey"}}`)}, nil) + requests := []bulk.GenericBulkableRequest{request} + + mFailed := map[string]*bulk.GenericBulkResponseItem{ + "index": { + Index: testIndex, + Type: testType, + ID: testID, + Version: version, + Status: 400, + }, + } + response := &bulk.GenericBulkResponse{ + Took: 3, + Errors: false, + Items: []map[string]*bulk.GenericBulkResponseItem{mFailed}, + } + + wid := "test-workflowID" + rid := "test-runID" + domainID := "test-domainID" + payload := s.getEncodedMsg(wid, rid, domainID) + + mockKafkaMsg := &msgMocks.Message{} + mapVal := newKafkaMessageWithMetrics(mockKafkaMsg, &testStopWatch) + s.esProcessor.mapToKafkaMsg.Put(testKey, mapVal) + + // Add mocked secondary processor + secondaryProcessor := &mocks2.GenericBulkProcessor{} + s.esProcessor.bulkProcessor = append(s.esProcessor.bulkProcessor, secondaryProcessor) + + // Mock Kafka message Nack and Value + mockKafkaMsg.On("Nack").Return(nil).Once() + mockKafkaMsg.On("Value").Return(payload).Once() + s.mockScope.On("IncCounter", mock.AnythingOfType("int")).Return() + // Execute bulkAfterAction for primary processor with error + s.esProcessor.bulkAfterAction(0, requests, response, nil) + // Mocking secondary processor to test shadowBulkAfterAction with error + s.esProcessor.shadowBulkAfterAction(0, requests, response, nil) +}