diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 036ce9223e89d..2c00fd2cdf715 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -139,8 +139,15 @@ public void initiate() { lastInitTime = System.nanoTime(); if (ml.getManagedLedgerInterceptor() != null) { long originalDataLen = data.readableBytes(); - payloadProcessorHandle = ml.getManagedLedgerInterceptor() - .processPayloadBeforeLedgerWrite(this.getCtx(), duplicateBuffer); + try { + payloadProcessorHandle = ml.getManagedLedgerInterceptor() + .processPayloadBeforeLedgerWrite(this.getCtx(), duplicateBuffer); + } catch (Exception e) { + ReferenceCountUtil.safeRelease(duplicateBuffer); + log.error("[{}] Error processing payload before ledger write", ml.getName(), e); + this.failed(new ManagedLedgerException.ManagedLedgerInterceptException(e)); + return; + } if (payloadProcessorHandle != null) { duplicateBuffer = payloadProcessorHandle.getProcessedPayload(); // If data len of entry changes, correct "dataLength" and "currentLedgerSize". diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java index 74a88382b0e0e..26b2d52c194ff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java @@ -21,9 +21,13 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.fail; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -431,4 +435,49 @@ public boolean test(@Nullable Entry entry) { } } + @Test(timeOut = 3000) + public void testManagedLedgerPayloadInputProcessorFailure() throws Exception { + var config = new ManagedLedgerConfig(); + final String failureMsg = "failed to process input payload"; + config.setManagedLedgerInterceptor(new ManagedLedgerInterceptorImpl( + Collections.emptySet(), Set.of(new ManagedLedgerPayloadProcessor() { + @Override + public Processor inputProcessor() { + return new Processor() { + @Override + public ByteBuf process(Object contextObj, ByteBuf inputPayload) { + throw new RuntimeException(failureMsg); + } + + @Override + public void release(ByteBuf processedPayload) { + // no-op + fail("the release method can't be reached"); + } + }; + } + }))); + + var ledger = factory.open("testManagedLedgerPayloadProcessorFailure", config); + var countDownLatch = new CountDownLatch(1); + var expectedException = new ArrayList(); + ledger.asyncAddEntry("test".getBytes(), 1, 1, new AsyncCallbacks.AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + entryData.release(); + countDownLatch.countDown(); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + // expected + expectedException.add(exception); + countDownLatch.countDown(); + } + }, null); + countDownLatch.await(); + assertEquals(expectedException.size(), 1); + assertEquals(expectedException.get(0).getCause().getMessage(), failureMsg); + } + }