From 439ad8d14215b3f2512f86ae61c9b3646011d93d Mon Sep 17 00:00:00 2001 From: Sergei Veselev Date: Thu, 31 Oct 2024 18:39:55 -0400 Subject: [PATCH 1/2] CNDE-1861 fix multivalued data for coded, reason and txt --- .../util/ProcessObservationDataUtil.java | 24 +++-- .../ObservationDataProcessTests.java | 97 +++++++++---------- .../service/ObservationServiceTest.java | 6 +- 3 files changed, 68 insertions(+), 59 deletions(-) diff --git a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/util/ProcessObservationDataUtil.java b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/util/ProcessObservationDataUtil.java index 0d6428ad..a270378d 100644 --- a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/util/ProcessObservationDataUtil.java +++ b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/util/ProcessObservationDataUtil.java @@ -65,6 +65,7 @@ public ObservationTransformed transformObservationData(Observation observation) observationTransformed.setObservationUid(observation.getObservationUid()); observationTransformed.setReportObservationUid(observation.getObservationUid()); + observationKey.setObservationUid(observation.getObservationUid()); String obsDomainCdSt1 = observation.getObsDomainCdSt1(); transformPersonParticipations(observation.getPersonParticipations(), obsDomainCdSt1, observationTransformed); @@ -320,6 +321,9 @@ private void transformActIds(String actIds, ObservationTransformed observationTr } private void transformObservationCoded(String observationCoded) { + // Tombstone message to delete previous observation coded data for specified uid + sendToKafka(observationKey, null, codedTopicName, observationKey.getObservationUid(), null); + try { JsonNode observationCodedJsonArray = parseJsonArray(observationCoded); @@ -342,9 +346,8 @@ private void transformObservationDate(String observationDate) { JsonNode observationDateJsonArray = parseJsonArray(observationDate); for (JsonNode jsonNode : observationDateJsonArray) { - ObservationDate coded = objectMapper.treeToValue(jsonNode, ObservationDate.class); - observationKey.setObservationUid(coded.getObservationUid()); - sendToKafka(observationKey, coded, dateTopicName, coded.getObservationUid(), "Observation Date data (uid={}) sent to {}"); + ObservationDate obsDate = objectMapper.treeToValue(jsonNode, ObservationDate.class); + sendToKafka(observationKey, obsDate, dateTopicName, obsDate.getObservationUid(), "Observation Date data (uid={}) sent to {}"); } } catch (IllegalArgumentException ex) { logger.info(ex.getMessage(), "ObservationDate"); @@ -376,7 +379,6 @@ private void transformObservationNumeric(String observationNumeric) { for (JsonNode jsonNode : observationNumericJsonArray) { ObservationNumeric numeric = objectMapper.treeToValue(jsonNode, ObservationNumeric.class); - observationKey.setObservationUid(numeric.getObservationUid()); sendToKafka(observationKey, numeric, numericTopicName, numeric.getObservationUid(), "Observation Numeric data (uid={}) sent to {}"); } } catch (IllegalArgumentException ex) { @@ -388,6 +390,9 @@ private void transformObservationNumeric(String observationNumeric) { private void transformObservationReasons(String observationReasons) { try { + // Tombstone message to delete previous observation coded data for specified uid + sendToKafka(observationKey, null, reasonTopicName, observationKey.getObservationUid(), null); + JsonNode observationReasonsJsonArray = parseJsonArray(observationReasons); ObservationReasonKey reasonKey = new ObservationReasonKey(); @@ -405,6 +410,9 @@ private void transformObservationReasons(String observationReasons) { } private void transformObservationTxt(String observationTxt) { + // Tombstone message to delete previous observation coded data for specified uid + sendToKafka(observationKey, null, txtTopicName, observationKey.getObservationUid(), null); + try { JsonNode observationTxtJsonArray = parseJsonArray(observationTxt); @@ -424,9 +432,13 @@ private void transformObservationTxt(String observationTxt) { private void sendToKafka(Object key, Object value, String topicName, Long uid, String message) { String jsonKey = jsonGenerator.generateStringJson(key); - String jsonValue = jsonGenerator.generateStringJson(value); + String jsonValue = Optional.ofNullable(value).map(jsonGenerator::generateStringJson).orElse(null); kafkaTemplate.send(topicName, jsonKey, jsonValue) - .whenComplete((res, e) -> logger.info(message, uid, topicName)); + .whenComplete((res, e) -> { + if (message != null) { + logger.info(message, uid, topicName); + } + }); } private JsonNode parseJsonArray(String jsonString) throws JsonProcessingException, IllegalArgumentException { diff --git a/observation-service/src/test/java/gov/cdc/etldatapipeline/observation/ObservationDataProcessTests.java b/observation-service/src/test/java/gov/cdc/etldatapipeline/observation/ObservationDataProcessTests.java index c207a556..9bb775d0 100644 --- a/observation-service/src/test/java/gov/cdc/etldatapipeline/observation/ObservationDataProcessTests.java +++ b/observation-service/src/test/java/gov/cdc/etldatapipeline/observation/ObservationDataProcessTests.java @@ -64,9 +64,21 @@ class ObservationDataProcessTests { void setUp() { closeable = MockitoAnnotations.openMocks(this); transformer = new ProcessObservationDataUtil(kafkaTemplate); + + transformer.setCodedTopicName(CODED_TOPIC); + transformer.setEdxTopicName(EDX_TOPIC); + transformer.setDateTopicName(DATE_TOPIC); + transformer.setMaterialTopicName(MATERIAL_TOPIC); + transformer.setNumericTopicName(NUMERIC_TOPIC); + transformer.setReasonTopicName(REASON_TOPIC); + transformer.setTxtTopicName(TXT_TOPIC); + Logger logger = (Logger) LoggerFactory.getLogger(ProcessObservationDataUtil.class); listAppender.start(); logger.addAppender(listAppender); + + when(kafkaTemplate.send(anyString(), anyString(), isNull())).thenReturn(CompletableFuture.completedFuture(null)); + when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null)); } @AfterEach @@ -79,7 +91,7 @@ void tearDown() throws Exception { @Test void consolidatedDataTransformationTest() { Observation observation = new Observation(); - observation.setActUid(100000001L); + observation.setObservationUid(100000001L); observation.setObsDomainCdSt1("Order"); observation.setPersonParticipations(readFileData(FILE_PREFIX + "PersonParticipations.json")); @@ -109,7 +121,7 @@ void consolidatedDataTransformationTest() { @Test void testPersonParticipationTransformation() { Observation observation = new Observation(); - observation.setActUid(100000001L); + observation.setObservationUid(100000001L); observation.setObsDomainCdSt1("Order"); final var expected = getObservationTransformed(); @@ -122,11 +134,13 @@ void testPersonParticipationTransformation() { @Test void testMorbReportTransformation() { Observation observation = new Observation(); - observation.setActUid(100000001L); + observation.setObservationUid(100000001L); observation.setObsDomainCdSt1("Order"); final var expected = new ObservationTransformed(); + expected.setObservationUid(100000001L); + expected.setReportObservationUid(100000001L); expected.setPatientId(10000055L); expected.setMorbPhysicianId(10000033L); expected.setMorbReporterId(10000044L); @@ -139,7 +153,7 @@ void testMorbReportTransformation() { @Test void testOrganizationParticipationTransformation() { Observation observation = new Observation(); - observation.setActUid(100000001L); + observation.setObservationUid(100000001L); observation.setObsDomainCdSt1("Result"); observation.setOrganizationParticipations(readFileData(FILE_PREFIX + "OrganizationParticipations.json")); @@ -157,24 +171,21 @@ void testOrganizationParticipationTransformation() { @Test void testObservationMaterialTransformation() throws JsonProcessingException { Observation observation = new Observation(); - observation.setActUid(100000003L); observation.setObservationUid(100000003L); observation.setObsDomainCdSt1("Order"); observation.setMaterialParticipations(readFileData(FILE_PREFIX + "MaterialParticipations.json")); - transformer.setMaterialTopicName(MATERIAL_TOPIC); - when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null)); ObservationMaterial material = constructObservationMaterial(100000003L); ObservationTransformed observationTransformed = transformer.transformObservationData(observation); - verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); - assertEquals(MATERIAL_TOPIC, topicCaptor.getValue()); + verify(kafkaTemplate, times(4)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); + assertEquals(MATERIAL_TOPIC, topicCaptor.getAllValues().getFirst()); assertEquals(10000005L, observationTransformed.getMaterialId()); List logs = listAppender.list; assertTrue(logs.get(2).getFormattedMessage().contains("Observation Material data (uid=10000005) sent to "+MATERIAL_TOPIC)); var actualMaterial = objectMapper.readValue( - objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(), ObservationMaterial.class); + objectMapper.readTree(messageCaptor.getAllValues().getFirst()).path("payload").toString(), ObservationMaterial.class); assertEquals(material, actualMaterial); } @@ -183,7 +194,6 @@ void testObservationMaterialTransformation() throws JsonProcessingException { @CsvSource({"'Order'", "'Result'"}) void testParentObservationsTransformation(String domainCd) { Observation observation = new Observation(); - observation.setActUid(100000003L); observation.setObservationUid(100000003L); observation.setParentObservations("[{\"parent_type_cd\":\"MorbFrmQ\",\"parent_uid\":234567888,\"parent_domain_cd_st_1\":\"R_Order\"}]"); @@ -197,10 +207,8 @@ void testParentObservationsTransformation(String domainCd) { @Test void testObservationCodedTransformation() throws JsonProcessingException { Observation observation = new Observation(); - observation.setActUid(10001234L); observation.setObservationUid(10001234L); observation.setObsCode(readFileData(FILE_PREFIX + "ObservationCoded.json")); - transformer.setCodedTopicName(CODED_TOPIC); ObservationCoded coded = new ObservationCoded(); coded.setObservationUid(observation.getObservationUid()); @@ -211,16 +219,14 @@ void testObservationCodedTransformation() throws JsonProcessingException { coded.setOvcAltCd("A-124"); coded.setOvcAltCdDescTxt("NORMAL"); - when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null)); - transformer.transformObservationData(observation); - verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); - assertEquals(CODED_TOPIC, topicCaptor.getValue()); + verify(kafkaTemplate, times(4)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); + assertEquals(CODED_TOPIC, topicCaptor.getAllValues().get(1)); List logs = listAppender.list; assertTrue(logs.get(6).getFormattedMessage().contains("Observation Coded data (uid=10001234) sent to "+CODED_TOPIC)); var actualCoded = objectMapper.readValue( - objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(), ObservationCoded.class); + objectMapper.readTree(messageCaptor.getAllValues().get(1)).path("payload").toString(), ObservationCoded.class); assertEquals(coded, actualCoded); } @@ -228,25 +234,21 @@ void testObservationCodedTransformation() throws JsonProcessingException { @Test void testObservationDateTransformation() throws JsonProcessingException { Observation observation = new Observation(); - observation.setActUid(10001234L); observation.setObservationUid(10001234L); observation.setObsDate(readFileData(FILE_PREFIX + "ObservationDate.json")); - transformer.setDateTopicName(DATE_TOPIC); ObservationDate obd = new ObservationDate(); obd.setObservationUid(observation.getObservationUid()); obd.setOvdFromDate("2024-08-16T00:00:00"); - when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null)); - transformer.transformObservationData(observation); - verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); - assertEquals(DATE_TOPIC, topicCaptor.getValue()); + verify(kafkaTemplate, times(4)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); + assertEquals(DATE_TOPIC, topicCaptor.getAllValues().get(1)); List logs = listAppender.list; assertTrue(logs.get(7).getFormattedMessage().contains("Observation Date data (uid=10001234) sent to "+DATE_TOPIC)); var actualObd = objectMapper.readValue( - objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(), ObservationDate.class); + objectMapper.readTree(messageCaptor.getAllValues().get(1)).path("payload").toString(), ObservationDate.class); assertEquals(obd, actualObd); } @@ -257,23 +259,20 @@ void testObservationEdxTransformation() throws JsonProcessingException { observation.setActUid(10001234L); observation.setObservationUid(10001234L); observation.setEdxIds(readFileData(FILE_PREFIX + "ObservationEdx.json")); - transformer.setEdxTopicName(EDX_TOPIC); ObservationEdx edx = new ObservationEdx(); edx.setEdxDocumentUid(10101L); edx.setEdxActUid(observation.getActUid()); edx.setEdxAddTime("2024-09-30T21:08:19.017"); - when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null)); - transformer.transformObservationData(observation); - verify(kafkaTemplate, times(2)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); - assertEquals(EDX_TOPIC, topicCaptor.getValue()); + verify(kafkaTemplate, times(5)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); + assertEquals(EDX_TOPIC, topicCaptor.getAllValues().get(1)); List logs = listAppender.list; assertTrue(logs.get(8).getFormattedMessage().contains("Observation Edx data (edx doc uid=10101) sent to "+EDX_TOPIC)); var actualEdx = objectMapper.readValue( - objectMapper.readTree(messageCaptor.getAllValues().getFirst()).path("payload").toString(), ObservationEdx.class); + objectMapper.readTree(messageCaptor.getAllValues().get(1)).path("payload").toString(), ObservationEdx.class); assertEquals(edx, actualEdx); } @@ -281,10 +280,8 @@ void testObservationEdxTransformation() throws JsonProcessingException { @Test void testObservationNumericTransformation() throws JsonProcessingException { Observation observation = new Observation(); - observation.setActUid(10001234L); observation.setObservationUid(10001234L); observation.setObsNum(readFileData(FILE_PREFIX + "ObservationNumeric.json")); - transformer.setNumericTopicName(NUMERIC_TOPIC); ObservationNumeric numeric = new ObservationNumeric(); numeric.setObservationUid(observation.getObservationUid()); @@ -296,16 +293,14 @@ void testObservationNumericTransformation() throws JsonProcessingException { numeric.setOvnNumericUnitCd("mL"); numeric.setOvnSeparatorCd(":"); - when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null)); - transformer.transformObservationData(observation); - verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); - assertEquals(NUMERIC_TOPIC, topicCaptor.getValue()); + verify(kafkaTemplate, times(4)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); + assertEquals(NUMERIC_TOPIC, topicCaptor.getAllValues().get(1)); List logs = listAppender.list; assertTrue(logs.get(9).getFormattedMessage().contains("Observation Numeric data (uid=10001234) sent to "+NUMERIC_TOPIC)); var actualNumeric = objectMapper.readValue( - objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(), ObservationNumeric.class); + objectMapper.readTree(messageCaptor.getAllValues().get(1)).path("payload").toString(), ObservationNumeric.class); assertEquals(numeric, actualNumeric); } @@ -313,26 +308,22 @@ void testObservationNumericTransformation() throws JsonProcessingException { @Test void testObservationReasonTransformation() throws JsonProcessingException { Observation observation = new Observation(); - observation.setActUid(10001234L); observation.setObservationUid(10001234L); observation.setObsReason(readFileData(FILE_PREFIX + "ObservationReason.json")); - transformer.setReasonTopicName(REASON_TOPIC); ObservationReason reason = new ObservationReason(); reason.setObservationUid(observation.getObservationUid()); reason.setReasonCd("80008"); reason.setReasonDescTxt("PRESENCE OF REASON"); - when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null)); - transformer.transformObservationData(observation); - verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); - assertEquals(REASON_TOPIC, topicCaptor.getValue()); + verify(kafkaTemplate, times(4)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); + assertEquals(REASON_TOPIC, topicCaptor.getAllValues().get(2)); List logs = listAppender.list; assertTrue(logs.get(10).getFormattedMessage().contains("Observation Reason data (uid=10001234) sent to "+REASON_TOPIC)); var actualReason = objectMapper.readValue( - objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(), ObservationReason.class); + objectMapper.readTree(messageCaptor.getAllValues().get(2)).path("payload").toString(), ObservationReason.class); assertEquals(reason, actualReason); } @@ -340,10 +331,8 @@ void testObservationReasonTransformation() throws JsonProcessingException { @Test void testObservationTxtTransformation() throws JsonProcessingException { Observation observation = new Observation(); - observation.setActUid(10001234L); observation.setObservationUid(10001234L); observation.setObsTxt(readFileData(FILE_PREFIX + "ObservationTxt.json")); - transformer.setTxtTopicName(TXT_TOPIC); ObservationTxt txt = new ObservationTxt(); txt.setObservationUid(observation.getObservationUid()); @@ -351,16 +340,14 @@ void testObservationTxtTransformation() throws JsonProcessingException { txt.setOvtTxtTypeCd("N"); txt.setOvtValueTxt("RECOMMENDED IN SUCH INSTANCES."); - when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null)); - transformer.transformObservationData(observation); - verify(kafkaTemplate, times(2)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); - assertEquals(TXT_TOPIC, topicCaptor.getValue()); + verify(kafkaTemplate, times(5)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); + assertEquals(TXT_TOPIC, topicCaptor.getAllValues().get(2)); List logs = listAppender.list; assertTrue(logs.get(11).getFormattedMessage().contains("Observation Txt data (uid=10001234) sent to "+TXT_TOPIC)); var actualTxt = objectMapper.readValue( - objectMapper.readTree(messageCaptor.getAllValues().getFirst()).path("payload").toString(), ObservationTxt.class); + objectMapper.readTree(messageCaptor.getAllValues().get(3)).path("payload").toString(), ObservationTxt.class); assertEquals(txt, actualTxt); } @@ -368,6 +355,7 @@ void testObservationTxtTransformation() throws JsonProcessingException { @Test void testTransformNoObservationData() { Observation observation = new Observation(); + observation.setObservationUid(10001234L); observation.setOrganizationParticipations("{\"act_uid\": 10000003}"); transformer.transformObservationData(observation); @@ -380,6 +368,7 @@ void testTransformObservationDataError(){ Observation observation = new Observation(); String invalidJSON = "invalidJSON"; + observation.setObservationUid(10001234L); observation.setPersonParticipations(invalidJSON); observation.setOrganizationParticipations(invalidJSON); observation.setMaterialParticipations(invalidJSON); @@ -402,6 +391,7 @@ void testTransformObservationDataError(){ @Test void testTransformObservationInvalidDomainError(){ Observation observation = new Observation(); + observation.setObservationUid(10001234L); String dummyJSON = "[{\"type_cd\":\"PRF\",\"subject_class_cd\":\"ORG\",\"entity_id\":45678901,\"domain_cd_st_1\":\"Result\"}]"; String invalidDomainCode = "Check"; @@ -426,6 +416,7 @@ void testTransformObservationInvalidDomainError(){ void testTransformObservationNullError(String payload){ Observation observation = new Observation(); + observation.setObservationUid(10001234L); observation.setObsDomainCdSt1("Order"); observation.setPersonParticipations(payload); observation.setOrganizationParticipations(payload); @@ -441,6 +432,8 @@ void testTransformObservationNullError(String payload){ private @NotNull ObservationTransformed getObservationTransformed() { ObservationTransformed expected = new ObservationTransformed(); + expected.setObservationUid(100000001L); + expected.setReportObservationUid(100000001L); expected.setPatientId(10000066L); expected.setOrderingPersonId(10000055L); expected.setAssistantInterpreterId(10000077L); diff --git a/observation-service/src/test/java/gov/cdc/etldatapipeline/observation/service/ObservationServiceTest.java b/observation-service/src/test/java/gov/cdc/etldatapipeline/observation/service/ObservationServiceTest.java index 33274af8..af1ee613 100644 --- a/observation-service/src/test/java/gov/cdc/etldatapipeline/observation/service/ObservationServiceTest.java +++ b/observation-service/src/test/java/gov/cdc/etldatapipeline/observation/service/ObservationServiceTest.java @@ -57,6 +57,9 @@ void setUp() { observationService = new ObservationService(observationRepository, kafkaTemplate, transformer); observationService.setObservationTopic(inputTopicName); observationService.setObservationTopicOutputReporting(outputTopicName); + transformer.setCodedTopicName("ObservationCoded"); + transformer.setReasonTopicName("ObservationReason"); + transformer.setTxtTopicName("ObservationTxt"); } @AfterEach @@ -74,6 +77,7 @@ void testProcessMessage() throws JsonProcessingException { Observation observation = constructObservation(observationUid, obsDomainCdSt); when(observationRepository.computeObservations(String.valueOf(observationUid))).thenReturn(Optional.of(observation)); when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null)); + when(kafkaTemplate.send(anyString(), anyString(), isNull())).thenReturn(CompletableFuture.completedFuture(null)); validateData(payload, observation); @@ -105,7 +109,7 @@ private void validateData(String payload, Observation observation) throws JsonPr var reportingModel = constructObservationReporting(observation.getObservationUid(), observation.getObsDomainCdSt1()); - verify(kafkaTemplate, times(2)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); + verify(kafkaTemplate, times(5)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); String actualTopic = topicCaptor.getValue(); String actualKey = keyCaptor.getValue(); String actualValue = messageCaptor.getValue(); From 1b3fb810fa00819ed5a46ac742b4f25f883187ee Mon Sep 17 00:00:00 2001 From: Sergei Veselev Date: Fri, 1 Nov 2024 15:41:11 -0400 Subject: [PATCH 2/2] CNDE-1861 minor fix for comments --- .../observation/util/ProcessObservationDataUtil.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/util/ProcessObservationDataUtil.java b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/util/ProcessObservationDataUtil.java index a270378d..3ff60434 100644 --- a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/util/ProcessObservationDataUtil.java +++ b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/util/ProcessObservationDataUtil.java @@ -390,7 +390,7 @@ private void transformObservationNumeric(String observationNumeric) { private void transformObservationReasons(String observationReasons) { try { - // Tombstone message to delete previous observation coded data for specified uid + // Tombstone message to delete previous observation reason data for specified uid sendToKafka(observationKey, null, reasonTopicName, observationKey.getObservationUid(), null); JsonNode observationReasonsJsonArray = parseJsonArray(observationReasons); @@ -410,7 +410,7 @@ private void transformObservationReasons(String observationReasons) { } private void transformObservationTxt(String observationTxt) { - // Tombstone message to delete previous observation coded data for specified uid + // Tombstone message to delete previous observation txt data for specified uid sendToKafka(observationKey, null, txtTopicName, observationKey.getObservationUid(), null); try {