Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP 107: Introduce the chunk message ID #12402

Closed
RobertIndie opened this issue Oct 18, 2021 · 1 comment
Closed

PIP 107: Introduce the chunk message ID #12402

RobertIndie opened this issue Oct 18, 2021 · 1 comment

Comments

@RobertIndie
Copy link
Member

RobertIndie commented Oct 18, 2021

Motivation

Currently, when we send chunked messages, the producer returns the message-id of the last chunk. This can cause some problems. For example, when we use this message-id to seek, it will cause the consumer to consume from the position of the last chunk, and the consumer will mistakenly think that the previous chunks are lost and choose to skip the current message. If we use the inclusive seek, the consumer may skip the first message, which brings the wrong behavior.

Here is the simple code used to demonstrate the problem.

var msgId = producer.send(...); // eg. return 0:1:-1

var otherMsg = producer.send(...); // return 0:2:-1

consumer.seek(msgId); // inclusive seek

var receiveMsgId = consumer.receive().getMessageId(); // it may skip the
first message and return like 0:2:-1

Assert.assertEquals(msgId, receiveMsgId); // fail

Earlier, we tried to fix the problem by having the producer and the consumer return the firstChunkMessageID.(Discussion and Draft pull requests). However, this may have some impact on the original business logic. If users rely on the feature of returning lastChunkMessageId, they will be affected. For this reason, we propose a new solution to minimize the impact. In this PIP, the expected impact for the original user will only occur when seeking the chunk message.

Goal

We can solve the above problem by introducing chunk message ID to the producer and consumer. Here are some goals for this PIP:

  • Compatibility: When the Producer and the consumer are processing the chunk Message, the chunk message-id is returned to the user. In order to achieve better compatibility with the original business logic, the chunk message-id need to be consistent with the original behavior.
  • New Feature: The user can get the message-id of the first chunk and the last chunk by the chunk message-id.
  • Fix for consumer.seek: To fix the above problem, the consumer will use firstChunkMessageId if the message-id passed in is a chunk message id when seeking.

API Changes and Implementation

  1. Introduce a new Message ID type: Chunk Message ID. The chunk message id inherits from MessageIdImpl and adds two new methods: getFirstChunkMessageId and getLastChunkMessageID. For other method implementations, the lastChunkMessageID is called directly, which is compatible with much of the existing business logic.

Here is the simple demo codes for the ChunkMessageID:

public class ChunkMessageIdImpl extends MessageIdImpl implements MessageId {
    private final MessageIdImpl firstChunkMsgId;

    public ChunkMessageIdImpl(MessageIdImpl firstChunkMsgId, MessageIdImpl lastChunkMsgId) {
        super(lastChunkMsgId.getLedgerId(), lastChunkMsgId.getEntryId(), lastChunkMsgId.getPartitionIndex());
        this.firstChunkMsgId = firstChunkMsgId;
    }

    public MessageIdImpl getFirstChunkMessageId() {
        return firstChunkMsgId;
    }

    public MessageIdImpl getLastChunkMessageId() {
        return this;
    }
}
  1. The chunk message-id is returned to the user when the Producer produces the chunk message or when the consumer consumes the chunk message.

  2. In cosumer.seek, use the first chunk message-id of the chunk message-id. This will solve the problem caused by seeking chunk messages. This is also the impact of this PIP on the original business logic.

  3. In order to make the chunkMessaegId serializable and deserializable, we need to change the proto definition of MessageIdData. Add the first_chunk_message_id optional field to the MessageIdData in proto file:

message MessageIdData {
    required uint64 ledgerId = 1;
    required uint64 entryId  = 2;
    optional int32 partition = 3 [default = -1];
    optional int32 batch_index = 4 [default = -1];
    repeated int64 ack_set = 5;
    optional int32 batch_size = 6;

    // For the chunk message id, we need to specify the first chunk message id.
    optional MessageIdData first_chunk_message_id = 7;
}
  1. ChunkMessageId.tostirng() will return both firstChunkMessageId and lastChunkMessageId.

Compatibility

For serialization and deserialization of MessageId, it is both forward compatibility and backward compatibility.

The old version of message-id raw data for deserialization, regardless of whether it is a chunk message, will all be serialized into MessageIdImpl in the current client version. In this case, the problem of seeking chunk messages mentioned above will still exist.
Older versions of the client will have no impact when serializing newer versions of chunk message id raw data.

Here is the PR to demonstrate this PIP: #12403.

codelipenghui pushed a commit that referenced this issue Dec 29, 2021
Master Issue: #12402

### Motivation

This is an implementation for the PIP: #12402

### Modifications

* Introduce a new Message-ID type: Chunk Message-ID. The chunk message-id inherits from MessageIdImpl and adds two new methods: getFirstChunkMessageId and getLastChunkMessageID. For other method implementations, the lastChunkMessageID is called directly, which is compatible with much of the existing business logic.

* Return the chunk message-id to the user when the Producer produces the chunk message or when the consumer consumes the chunk message.

* In cosumer.seek, use the first chunk message-id of the chunk message-id. This will solve the problem caused by seeking chunk messages. This is also the impact of this PIP on the original business logic.
wuzhanpeng pushed a commit to wuzhanpeng/pulsar that referenced this issue Jan 5, 2022
Master Issue: apache#12402

### Motivation

This is an implementation for the PIP: apache#12402

### Modifications

* Introduce a new Message-ID type: Chunk Message-ID. The chunk message-id inherits from MessageIdImpl and adds two new methods: getFirstChunkMessageId and getLastChunkMessageID. For other method implementations, the lastChunkMessageID is called directly, which is compatible with much of the existing business logic.

* Return the chunk message-id to the user when the Producer produces the chunk message or when the consumer consumes the chunk message.

* In cosumer.seek, use the first chunk message-id of the chunk message-id. This will solve the problem caused by seeking chunk messages. This is also the impact of this PIP on the original business logic.
@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@RobertIndie RobertIndie self-assigned this Jun 20, 2022
RobertIndie added a commit that referenced this issue Jun 22, 2022
…essageID (#16154)

### Motivation

This is the same problem as when the consumer inclusive seeks the chunked message.

See more detail in [PIP-107](#12402)

### Modifications

* Use the first chunk message id as the startMessageId when creating the consumer/reader.
codelipenghui pushed a commit that referenced this issue Jun 28, 2022
…essageID (#16154)

### Motivation

This is the same problem as when the consumer inclusive seeks the chunked message.

See more detail in [PIP-107](#12402)

### Modifications

* Use the first chunk message id as the startMessageId when creating the consumer/reader.

(cherry picked from commit 33cf2d0)
nicoloboschi pushed a commit to datastax/pulsar that referenced this issue Jul 4, 2022
…essageID (apache#16154)

### Motivation

This is the same problem as when the consumer inclusive seeks the chunked message.

See more detail in [PIP-107](apache#12402)

### Modifications

* Use the first chunk message id as the startMessageId when creating the consumer/reader.

(cherry picked from commit 33cf2d0)
(cherry picked from commit d7f996f)
BewareMyPower pushed a commit to apache/pulsar-client-cpp that referenced this issue Dec 20, 2022
Fixes #79

### Motivation

This is the C++ implementation for apache/pulsar#12402

### Modifications

* Add ChunkMessageIdImpl
* Return ChunkMessageId when the Producer produces the chunk message or when the consumer consumes the chunk message.
* In cosumer.seek, use the first chunk message-id of the chunk message-id. This will solve the problem caused by seeking chunk messages. This is also the impact of this PIP on the original business logic.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
1 participant