Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

ISSUE-12402: PIP 107: Introduce the chunk message ID #3161

Open
sijie opened this issue Oct 18, 2021 · 0 comments
Open

ISSUE-12402: PIP 107: Introduce the chunk message ID #3161

sijie opened this issue Oct 18, 2021 · 0 comments

Comments

@sijie
Copy link
Member

sijie commented Oct 18, 2021

Original Issue: apache#12402


Motivation

Currently, when we send chunked messages, the producer returns the message-id of the last chunk. This can cause some problems. For example, when we use this message-id to seek, it will cause the consumer to consume from the position of the last chunk, and the consumer will mistakenly think that the previous chunks are lost and choose to skip the current message. If we use the inclusive seek, the consumer may skip the first message, which brings the wrong behavior.

Here is the simple code used to demonstrate the problem.

var msgId = producer.send(...); // eg. return 0:1:-1

var otherMsg = producer.send(...); // return 0:2:-1

consumer.seek(msgId); // inclusive seek

var receiveMsgId = consumer.receive().getMessageId(); // it may skip the
first message and return like 0:2:-1

Assert.assertEquals(msgId, receiveMsgId); // fail

Earlier, we tried to fix the problem by having the producer and the consumer return the firstChunkMessageID.(Discussion and Draft pull requests). However, this may have some impact on the original business logic. If users rely on the feature of returning lastChunkMessageId, they will be affected. For this reason, we propose a new solution to minimize the impact. In this PIP, the expected impact for the original user will only occur when seeking the chunk message.

Goal

We can solve the above problem by introducing chunk message ID to the producer and consumer. Here are some goals for this PIP:

  • Compatibility: When the Producer and the consumer are processing the chunk Message, the chunk message-id is returned to the user. In order to achieve better compatibility with the original business logic, the chunk message-id need to be consistent with the original behavior.
  • New Feature: The user can get the message-id of the first chunk and the last chunk by the chunk message-id.
  • Fix for consumer.seek: To fix the above problem, the consumer will use firstChunkMessageId if the message-id passed in is a chunk message id when seeking.

API Changes and Implementation

  1. Introduce a new Message ID type: Chunk Message ID. The chunk message id inherits from MessageIdImpl and adds two new methods: getFirstChunkMessageId and getLastChunkMessageID. For other method implementations, the lastChunkMessageID is called directly, which is compatible with much of the existing business logic.

Here is the simple demo codes for the ChunkMessageID:

public class ChunkMessageIdImpl extends MessageIdImpl implements MessageId {
    private final MessageIdImpl firstChunkMsgId;

    public ChunkMessageIdImpl(MessageIdImpl firstChunkMsgId, MessageIdImpl lastChunkMsgId) {
        super(lastChunkMsgId.getLedgerId(), lastChunkMsgId.getEntryId(), lastChunkMsgId.getPartitionIndex());
        this.firstChunkMsgId = firstChunkMsgId;
    }

    public MessageIdImpl getFirstChunkMessageId() {
        return firstChunkMsgId;
    }

    public MessageIdImpl getLastChunkMessageId() {
        return this;
    }
}
  1. The chunk message-id is returned to the user when the Producer produces the chunk message or when the consumer consumes the chunk message.

  2. In cosumer.seek, use the first chunk message-id of the chunk message-id. This will solve the problem caused by seeking chunk messages. This is also the impact of this PIP on the original business logic.

  3. In order to make the chunkMessaegId serializable and deserializable, we need to change the proto definition of MessageIdData. Add the first_chunk_message_id optional field to the MessageIdData in proto file:

message MessageIdData {
    required uint64 ledgerId = 1;
    required uint64 entryId  = 2;
    optional int32 partition = 3 [default = -1];
    optional int32 batch_index = 4 [default = -1];
    repeated int64 ack_set = 5;
    optional int32 batch_size = 6;

    // For the chunk message id, we need to specify the first chunk message id.
    optional MessageIdData first_chunk_message_id = 7;
}
  1. ChunkMessageId.tostirng() will return both firstChunkMessageId and lastChunkMessageId.

Compatibility

For serialization and deserialization of MessageId, it is both forward compatibility and backward compatibility.

The old version of message-id raw data for deserialization, regardless of whether it is a chunk message, will all be serialized into MessageIdImpl in the current client version. In this case, the problem of seeking chunk messages mentioned above will still exist.
Older versions of the client will have no impact when serializing newer versions of chunk message id raw data.

Here is the PR to demonstrate this PIP: apache#12403.

@sijie sijie added the PIP label Oct 18, 2021
@sijie sijie added the Stale label Feb 27, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant