Skip to content

Commit

Permalink
improve transaction exceptions (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-zh authored May 16, 2021
1 parent 9997ed3 commit d2f1403
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/Exception/KafkaProducerTransactionAbortException.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@
class KafkaProducerTransactionAbortException extends \Exception
{
public const TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE =
'Produce failed. You need to abort your current transaction and start a new one';
'Produce failed. You need to abort your current transaction and start a new one (%s)';
}
2 changes: 1 addition & 1 deletion src/Exception/KafkaProducerTransactionFatalException.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@
class KafkaProducerTransactionFatalException extends \Exception
{
public const FATAL_TRANSACTION_EXCEPTION_MESSAGE =
'Produce failed with a fatal error. This producer instance cannot be used anymore.';
'Produce failed with a fatal error. This producer instance cannot be used anymore (%s)';
}
2 changes: 1 addition & 1 deletion src/Exception/KafkaProducerTransactionRetryException.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@

class KafkaProducerTransactionRetryException extends \Exception
{
public const RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE = 'Produce failed but can be retried';
public const RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE = 'Produce failed but can be retried (%s)';
}
21 changes: 18 additions & 3 deletions src/Producer/KafkaProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -255,18 +255,33 @@ private function handleTransactionError(SkcErrorException $e): void
{
if (true === $e->isRetriable()) {
throw new KafkaProducerTransactionRetryException(
KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE
sprintf(
KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE,
$e->getMessage()
),
$e->getCode(),
$e
);
} elseif (true === $e->transactionRequiresAbort()) {
throw new KafkaProducerTransactionAbortException(
KafkaProducerTransactionAbortException::TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE
sprintf(
KafkaProducerTransactionAbortException::TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE,
$e->getMessage()
),
$e->getCode(),
$e
);
} else {
$this->transactionInitialized = false;
// according to librdkafka documentation, everything that is not retriable, abortable or fatal is fatal
// fatal errors (so stated), need the producer to be destroyed
throw new KafkaProducerTransactionFatalException(
KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE
sprintf(
KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE,
$e->getMessage()
),
$e->getCode(),
$e
);
}
}
Expand Down
41 changes: 39 additions & 2 deletions tests/Unit/Producer/KafkaProducerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ public function testBeginTransactionConsecutiveSuccess(): void
public function testBeginTransactionWithRetriableError(): void
{
self::expectException(KafkaProducerTransactionRetryException::class);
self::expectExceptionMessage(
sprintf(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE, '')
);

$errorMock = $this->createMock(SkcErrorException::class);
$errorMock->expects(self::once())->method('isRetriable')->willReturn(true);
Expand All @@ -389,6 +392,9 @@ public function testBeginTransactionWithRetriableError(): void
public function testBeginTransactionWithAbortError(): void
{
self::expectException(KafkaProducerTransactionAbortException::class);
self::expectExceptionMessage(
sprintf(KafkaProducerTransactionAbortException::TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE, '')
);

$errorMock = $this->createMock(SkcErrorException::class);
$errorMock->expects(self::once())->method('isRetriable')->willReturn(false);
Expand All @@ -411,6 +417,9 @@ public function testBeginTransactionWithAbortError(): void
public function testBeginTransactionWithFatalError(): void
{
self::expectException(KafkaProducerTransactionFatalException::class);
self::expectExceptionMessage(
sprintf(KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE, '')
);

$errorMock = $this->createMock(SkcErrorException::class);
$errorMock->expects(self::once())->method('isRetriable')->willReturn(false);
Expand All @@ -435,6 +444,9 @@ public function testBeginTransactionWithFatalErrorWillTriggerInit(): void
$firstExceptionCaught = false;

self::expectException(KafkaProducerTransactionFatalException::class);
self::expectExceptionMessage(
sprintf(KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE, '')
);

$errorMock = $this->createMock(SkcErrorException::class);
$errorMock->expects(self::exactly(2))->method('isRetriable')->willReturn(false);
Expand Down Expand Up @@ -476,7 +488,9 @@ public function testAbortTransactionSuccess(): void
public function testAbortTransactionFailure(): void
{
self::expectException(KafkaProducerTransactionRetryException::class);
self::expectExceptionMessage(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE);
self::expectExceptionMessage(
sprintf(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE, 'test')
);

$exception = new SkcErrorException('test', 1, 'some failure', false, true, false);

Expand Down Expand Up @@ -507,7 +521,9 @@ public function testCommitTransactionSuccess(): void
public function testCommitTransactionFailure(): void
{
self::expectException(KafkaProducerTransactionRetryException::class);
self::expectExceptionMessage(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE);
self::expectExceptionMessage(
sprintf(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE, 'test')
);

$exception = new SkcErrorException('test', 1, 'some failure', false, true, false);

Expand All @@ -519,4 +535,25 @@ public function testCommitTransactionFailure(): void

$this->kafkaProducer->commitTransaction(10000);
}

/**
* @return void
*/
public function testCommitTransactionFailurePreviousException(): void
{
$exception = new SkcErrorException('test', 1, 'some failure', false, true, false);

$this->rdKafkaProducerMock
->expects(self::once())
->method('commitTransaction')
->with(10000)
->willThrowException($exception);

try {
$this->kafkaProducer->commitTransaction(10000);
} catch (KafkaProducerTransactionRetryException $e) {
self::assertSame($exception, $e->getPrevious());
}

}
}

0 comments on commit d2f1403

Please sign in to comment.