Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix KeyError on solitary abort marker. #782

Merged
merged 5 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion aiokafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ def _unpack_records(self):

if next_batch.is_control_batch:
if self._contains_abort_marker(next_batch):
self._aborted_producers.remove(next_batch.producer_id)
# Using `discard` instead of `remove`, because Kafka
# may return an abort marker for an otherwise empty
# topic-partition.
self._aborted_producers.discard(next_batch.producer_id)

if next_batch.is_transactional and \
next_batch.producer_id in self._aborted_producers:
Expand Down
28 changes: 27 additions & 1 deletion tests/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from kafka.protocol.offset import OffsetResponse
from aiokafka.record.legacy_records import LegacyRecordBatchBuilder
from aiokafka.record.default_records import DefaultRecordBatchBuilder
from aiokafka.record.memory_records import MemoryRecords

from aiokafka.protocol.fetch import (
FetchRequest_v0 as FetchRequest, FetchResponse_v0 as FetchResponse)
Expand All @@ -18,7 +20,7 @@
from aiokafka.client import AIOKafkaClient
from aiokafka.consumer.fetcher import (
Fetcher, FetchResult, FetchError, ConsumerRecord, OffsetResetStrategy,
PartitionRecords, READ_UNCOMMITTED
PartitionRecords, READ_COMMITTED, READ_UNCOMMITTED
)
from aiokafka.consumer.subscription_state import SubscriptionState
from aiokafka.util import create_future, create_task, get_running_loop
Expand Down Expand Up @@ -534,3 +536,27 @@ async def mock_send(node_id, request):
if cm is not None:
self.assertIn(
"Received unknown topic or partition error", cm.output[0])

@run_until_complete
async def test_solitary_abort_marker(self):
# An abort marker may not be preceded by any aborted messages

builder = DefaultRecordBatchBuilder(
magic=2, compression_type=0, is_transactional=1,
producer_id=3, producer_epoch=1, base_sequence=-1,
batch_size=999)
buffer = builder.build()

records = MemoryRecords(bytes(buffer))

partition_recs = PartitionRecords(
tp=TopicPartition('test-topic', 0),
records=records,
aborted_transactions=[],
fetch_offset=0,
key_deserializer=None,
value_deserializer=None,
check_crcs=True,
isolation_level=READ_COMMITTED)

self.assertEqual(len(list(partition_recs)), 0)
ods marked this conversation as resolved.
Show resolved Hide resolved