From fbd6754fb5dcb42df3207b27d67b3e2b2eeeeb60 Mon Sep 17 00:00:00 2001 From: ebartkus Date: Fri, 12 Jul 2019 12:17:24 +0300 Subject: [PATCH] KCLRecordProcessor handles multiple shutdown calls gracefully In some occasions KCL library calls shutdown multiple times for the same shards KCLRecordProcessor. We have to make sure it doesn't fail because closed dependencies. --- .../dynamodb/kcl/KclRecordProcessor.java | 30 +++++++++++-------- .../dynamodb/kcl/KclRecordProcessorTests.java | 21 +++++++++++++ 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclRecordProcessor.java b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclRecordProcessor.java index a95fabe..2a9901e 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclRecordProcessor.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclRecordProcessor.java @@ -208,18 +208,19 @@ public void shutdown(ShutdownInput shutdownInput) { * Called on ShutdownReason.TERMINATE */ private void onTerminate(ShutdownInput shutdownInput) throws InvalidStateException, ShutdownException, InterruptedException { - if (!lastProcessedSeqNo.isEmpty()) { - ShardInfo processorRegister = shardRegister.get(this.shardId); - - while (!processorRegister.getLastCommittedRecordSeqNo().equals(this.lastProcessedSeqNo)) { - LOGGER.info( - "Shard ended. Waiting for all data table: {} from shard: {} to be committed. " + - "lastCommittedRecordSeqNo: {} lastProcessedSeqNo: {}", - tableName, - shardId, - processorRegister.getLastCommittedRecordSeqNo(), - this.lastProcessedSeqNo); - Thread.sleep(1000); + if (lastProcessedSeqNo != null && !lastProcessedSeqNo.isEmpty()) { + ShardInfo processorRegister = shardRegister.getOrDefault(this.shardId, null); + if (processorRegister != null) { + while (!processorRegister.getLastCommittedRecordSeqNo().equals(this.lastProcessedSeqNo)) { + LOGGER.info( + "Shard ended. Waiting for all data table: {} from shard: {} to be committed. " + + "lastCommittedRecordSeqNo: {} lastProcessedSeqNo: {}", + tableName, + shardId, + processorRegister.getLastCommittedRecordSeqNo(), + this.lastProcessedSeqNo); + Thread.sleep(500); + } } } @@ -231,7 +232,10 @@ private void onTerminate(ShutdownInput shutdownInput) throws InvalidStateExcepti "Shard ended. All data committed. Checkpoint and proceed to next one. Table: {} ShardID: {}", tableName, shardId); - shutdownInput.getCheckpointer().checkpoint(); + IRecordProcessorCheckpointer checkpointer = shutdownInput.getCheckpointer(); + if (checkpointer != null) { + checkpointer.checkpoint(); + } } /** diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclRecordProcessorTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclRecordProcessorTests.java index 0398565..694f4ad 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclRecordProcessorTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclRecordProcessorTests.java @@ -264,6 +264,27 @@ void shutdownWaitsForLastRecordToBeCommittedOnShardEndBeforeCheckpoint() throws verify(checkpointer, only()).checkpoint(); } + @Test + void shutdownSucceddsIfCalledMultipleTimes() throws InvalidStateException, ShutdownException { + // Arrange + IRecordProcessorCheckpointer checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class); + ProcessRecordsInput processRecordsInput = getProcessRecordsInput("SQ1").withCheckpointer(checkpointer); + shardRegister.get(shardId).setLastCommittedRecordSeqNo("SQ1"); + + // Act + processor.processRecords(processRecordsInput); + ShutdownInput shutdownInput = new ShutdownInput() + .withShutdownReason(ShutdownReason.TERMINATE) + .withCheckpointer(checkpointer); + + processor.shutdown(shutdownInput); + processor.shutdown(shutdownInput); + + // Assert + verify(checkpointer, times(2)).checkpoint(); + assertFalse(shardRegister.containsKey(shardId)); + } + @Test void shutdownCheckpointsIfNoDataWasReceivedFromThisShard() throws InvalidStateException, ShutdownException { // Arrange