Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-195: New bucket based delayed message tracker #16763

Closed
coderzc opened this issue Jul 25, 2022 · 13 comments
Closed

PIP-195: New bucket based delayed message tracker #16763

coderzc opened this issue Jul 25, 2022 · 13 comments
Assignees
Milestone

Comments

@coderzc
Copy link
Member

coderzc commented Jul 25, 2022

PIP-195: New bucket based delayed message tracker

Discussion Mailing list thread: https://lists.apache.org/thread/1krdhrvs803kb6rqzdh17q0f199nroz4
Vote Mailing list thread: https://lists.apache.org/thread/51n8kp64d16vxwh9h6klvyh1zo0owf91

Motivation

Scheduled and delayed message delivery is a widespread feature in messaging systems. Pulsar has supported delayed messages[0] in 2.4.0, which uses an in-memory delayed message tracker to track all the delayed message indexes with a priority queue. The blog "Apache pulsar delay message delivery analysis"[1] provided details of delayed messages and shared some challenges for the current implementation.

The memory limitation of the priority queue

A broker's memory is not infinite. For scenarios where users need to store many delayed messages, the in-memory priority queue might be a bottleneck for maintaining an extensive delayed index.

Suppose you want to scale the delayed message capacity. In that case, you can add more partitions so that the delayed index can be distributed to multiple brokers, but this does not change the fact that a lot of broker memory is used.

A topic might have several subscriptions, and the in-memory delayed indexes can't be used across the subscriptions; this also affects the broker's memory overhead.

Expensive delayed index rebuilding

To rebuild the delayed index, the broker needs to read all delayed messages of a topic. If there are too many delayed messages on a topic, the index rebuilding might take a long time, a few minutes to a few hours. As long as the subscription is under the delay index rebuilding situation, the consumers can't consume messages from the topic; this will bring more consumer unavailable time.

This proposal focuses on the following two major problems of delayed message implementation.

Goal

  • Support delayed message index snapshot to avoid the high index rebuild costs.
  • Makes the scale of a delayed message not limited by the memory.

Approach

The solution is to introduce introduces a new bucket-based delayed message tracker which splits the whole delayed message index into multiple buckets based on the ledgers and creates multiple immutable segment snapshots per bucket. The bucket-based delayed message tracker writes all bucket snapshots to the bookie storage node and the tracker only loads the segment that will be used for each bucket into memory to make the scale of a delayed message not limited by the memory.

Delayed message index bucket

The delayed message index bucket contains the indexes of a couple of Ledgers. Each bucket mainly includes two parts, the Bitset for each Ledger for checking if a message ID is a delayed message (contained by the delayed message index) and the priority queue for getting the scheduled messages.

Figure 2: Delayed message index bucket

A topic can have multiple delayed message index buckets, and the maximum number of buckets is configurable. The delayed message tracker will load the first segment(This part will be introduced later, one segment will map to an entry of the bucket snapshot) of each bucket to a shared priority queue. To get the topic’s scheduled messages by poll messages from the shared priority queue. After all the messages of a bucket segment have been processed, load the next segment of this bucket.

The delayed message tracker contains a special bucket (LastMutableBucket), it records the current last ledger range delayed message index by using an extra priority queue(last mutable delayed message priority queue) When the tracker receives a message ID of ledgerId > LastMutableBucket.endLegerId tracker will create an immutable bucket and clear LastMutableDelayedMessagePriorityQueue. The delayed message tracker will move scheduled messages from LastMutableDelayedMessagePriorityQueue to the shared delayed message queue when the regular task is triggered or poll the message IDs from the delayed message tracker.

The dispatcher of a subscription reads messages by the cursor to dispatch messages to consumers. For the delayed messages, the cursor needs to filter out them based on the delayed message index bucket. For example, if we have ten messages [0,9], messages [1,8] are delayed. The cursor should only read messages 0 and 9 from the bookies. Note that the current implementation reads all ten messages and filters [1,8] out in the broker, which we need to improve.

So if the messages are not in the delayed message tracker and reach the delivery time, the broker can dispatch the messages to the consumer directly. If the messages are not in the delayed tracker but do not reach the delivery time, the subscription just needs to skip them because they will be added back to the delayed message tracker.

Delayed message index bucket snapshot

The bucket snapshot can reduce the costs(replay all the original messages) of rebuilding the delayed message index. We can use a Ledger to store the bucket snapshot data and maintain the bucket snapshot list by the cursor(build delayed message index cursor) properties. We can know how many delayed index buckets the topic has and read the snapshot from the persistent Ledger.

Figure 3: Delayed message index bucket snapshot

The delayed index bucket snapshot data write to multiple segments according to the delivery time and index quantity limit. We can only load the first valid segment in the memory. After all the delayed messages of the current segment are scheduled, the delayed message tracker loads the delayed messages from the next segment. Here we will not make any changes to the snapshot data.

The delayed index bucket snapshot data will be stored starting from Entry1, because Entry0 recorded the metadata for snapshot, then introduced that metadata.

The maxScheduleTimestamps is used to find the first snapshot segment(which has messages not reach the delivery time). The bucket will skip the snapshot segment if all the messages in the snapshot segment reach the delivery time when recovering the delayed message index (because the broker can dispatch the messages to the consumer directly).

The delayedIndexBitMaps is used to check if the message ID is present in the bucket or not. It records the BitSet key pairs for the delay message indexes per snapshot segment. When loading a snapshot segment in the memory, the delayed message tracker will merge BitSet key pairs from the current snapshot segment to the last snapshot segment.

Merge delayed message index buckets

We can configure the max buckets of a topic. If the number of buckets reaches the max buckets limitation, before seal newly immutable bucket will trigger the buckets merging.

The delayed message tracker will find two adjacent buckets with the least delayed messages to merge them. Such as exist five buckets and number of messages they contains is [5,3,2,4,3], so we should be merge second and third bucket, the startLedgerId of merged bucket will updated to startLedgerId of second bucket, the endLedgerId of merged bucket will updated to endLedgerId of third bucket.

Delete delayed message index bucket snapshot

We should delete the bucket snapshot when the following happens

When all the delayed messages of the snapshot in the bucket are scheduled the tracker will delete that bucket snapshot.
The specific approach is for each bucket to record the entry of the current snapshot segment, when loading the next snapshot segment if the snapshotEntryId > lastSnapshotEntryId will trigger the deletion of the bucket snapshot.

After merging buckets, the delayed message tracker will delete the old bucket.

We also should delete all bucket snapshots before deleting the cursor.

Share the delayed message index across subscriptions (Optional)

A topic can have several subscriptions. The current implementation is building the delayed message indexes for each subscription which will increase broker memory overhead and the overhead of replaying the log build index multiple times.

Instead, we can use a separate cursor to build the shared delayed message indexes. So that all the subscriptions under the topic can reuse the same delayed message indexes.

Any subscription that triggers the delayed message checking will poll the message IDs from the delayed message tracker. But unlike the current implementation, the scheduled message IDs need to add to the replay queue of all subscriptions. The dispatcher of the subscription will take care of the newly added message IDs and perform the message delivery.

The subscriptions have different mark delete positions. If the scheduled messages are before the mark delete position, the cursor read operation will filter out them.

A risk here is the scheduled messages will remove from the delayed message tracker. If the broker crashes before delivering the scheduled messages to consumers, the messages will not add back to the delayed tracker again. So the broker will not redeliver the messages to consumers. But this is not a problem because when replaying the message to rebuild the delayed index the broker will skip messages that have expired and send them directly to the consumer.

Config Changes

broker.conf

# Enable bucket based delayed message index tracker
delayedDeliveryTrackerFactoryClassName=org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory

# The delayed message index bucket min index count. When the index count of the current bucket is more than this value and all message indexes of current ledger have already been added to the tracker we will seal the bucket.
delayedDeliveryMinIndexCountPerBucket=50000

# The delayed message index bucket time step(in seconds) in per bucket snapshot segment, after reaching the max time step limitation, the snapshot segment will be cut off.
delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds=300

# The max number of delayed message index in per bucket snapshot segment, -1 means no limitation
# after reaching the max number limitation, the snapshot segment will be cut off.
delayedDeliveryMaxIndexesPerBucketSnapshotSegment=5000

# The max number of delayed message index bucket,after reaching the max buckets limitation, the adjacent buckets will be merged. (disable with value -1)
delayedDeliveryMaxNumBuckets=-1

# Enable share the delayed message index across subscriptions
delayedDeliverySharedIndexEnabled=false

Metrics Changes

We need to add some metrics to the delayed index buckets and snapshots will help users to tune the configurations.

The new metrics and change metrics:

Name Type Description
pulsar_delayed_message_index_bucket_total Gauge The number of delayed message index buckets (immutable buckets + LastMutableBucket )
pulsar_delayed_message_index_loaded Gauge The total number of delayed message indexes for in the memory.
pulsar_delayed_message_index_bucket_op_count Counter The total number of operate delayed message index bucket snapshot. The state label can be succeed,failed,all (the all means is the total number of all states) and the type label can be create,load,delete,merge.
pulsar_delayed_message_index_bucket_snapshot_size_bytes Gauge The total size of delayed message index bucket snapshot (in bytes).
pulsar_delayed_message_index_bucket_op_latency_ms Histogram The latency of delayed message index bucket snapshot operation with a given quantile (threshold). The labeltype label can be create,load,delete,merge
The label quantile can be:
  • quantile="50" is operation latency between (0ms, 50ms]
  • quantile="100" is operation latency between (50ms, 100ms]
  • quantile="500" is operation between (100ms, 500ms]
  • quantile="1000" is operation latency between (500ms, 1s]
  • quantile="5000" is operation latency between (1s, 5s]
  • quantile="30000" is operation latency between (5s, 30s]
  • quantile="60000" is operation latency between (30s, 60s]
  • quantile="overflow" is operation latency > 1m

Note: If enabled share the delayed message index across subscriptions will can't get precise metrics of subscriptions level

Implementation

  • Add a new Prototbuf for bucket snapshot

    • DelayedMessageIndexBucketMetadata.proto
    syntax = "proto2";
    
    package pulsar.delay;
    option java_package = "org.apache.pulsar.broker.delayed.proto";
    option optimize_for = SPEED;
    option java_multiple_files = true;
    
    message SnapshotSegmentMetadata {
        map<uint64, bytes> delayed_index_bit_map = 1;
        required uint64 max_schedule_timestamp = 2;
        required uint64 min_schedule_timestamp = 3;
    }
    
    message SnapshotMetadata {
        repeated SnapshotSegmentMetadata metadata_list = 1;
    }
    • DelayedMessageIndexBucketSegment.proto
    syntax = "proto2";
    
    package pulsar.delay;
    option java_package = "org.apache.pulsar.broker.delayed.proto";
    option optimize_for = SPEED;
    option java_multiple_files = true;
    
    message DelayedIndex {
        required uint64 timestamp = 1;
        required uint64 ledger_id = 2;
        required uint64 entry_id = 3;
    }
    
    message SnapshotSegment {
        repeated DelayedIndex indexes = 1;
    }
  • Add a interface BucketSnapshotStorage to store delayed message index bucket snapshot

public interface BucketSnapshotStorage {

    /**
     * Create a delayed message index bucket snapshot with metadata and bucketSnapshotSegments.
     *
     * @param snapshotMetadata       the metadata of snapshot
     * @param bucketSnapshotSegments the list of snapshot segments
     * @param bucketKey              the key of bucket is used to generate custom storage metadata
     * @param topicName              the name of topic is used to generate custom storage metadata
     * @param cursorName             the name of cursor is used to generate custom storage metadata
     * @return the future with bucketId(ledgerId).
     */
    CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
                                                 List<SnapshotSegment> bucketSnapshotSegments,
                                                 String bucketKey, String topicName, String cursorName);

    /**
     * Get delayed message index bucket snapshot metadata.
     *
     * @param bucketId the bucketId of snapshot
     * @return the future with snapshot expanded metadata
     */
    CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId);

    /**
     * Get a sequence of delayed message index bucket snapshot segments.
     * @param bucketId the bucketId of snapshot
     * @param firstSegmentEntryId entryId of first segment of sequence
     * @param lastSegmentEntryId entryId of last segment of sequence
     * @return the future with snapshot segment
     */
    CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId, long lastSegmentEntryId);

    /**
     * Get total byte length of delayed message index bucket snapshot.
     * @param bucketId the bucketId of snapshot
     * @return the future with byte length of snapshot
     */
    CompletableFuture<Long> getBucketSnapshotLength(long bucketId);

    /**
     * Delete delayed message index bucket snapshot by bucketId.
     * @param bucketId the bucketId of snapshot
     */
    CompletableFuture<Void> deleteBucketSnapshot(long bucketId);

    /**
     * Start the bucket snapshot storage service.
     * @throws Exception
     */
    void start() throws Exception;

    /**
     * Close the bucket snapshot storage service.
     * @throws Exception
     */
    void close() throws Exception;
}
  • Abstract AbstractDelayedDeliveryTracker from InMemoryDelayedDeliveryTracker and implement a new delayed message tracker `BucketDelayedDeliveryTracker.

  • Add a new BucketDelayedDeliveryTrackerFactory to create BucketDelayedDeliveryTracker.

  • Add a method containsMessage in the BucketDelayedDeliveryTracker class to filter out the delayed messages.

public class BucketDelayedDeliveryTracker {
     //......
     boolean containsMessage(long ledgerId, long entryId);
}
  • The cursor will filter out all delayed messages based on the containsMessage and skip them when reading messages from bookies. The change will include make cursor and ManagedLedger support discontinuous read entries.

  • Use containsMessage to avoid tracker record repeated message index, when add a message to the delayed message tracker.

  • Use a separate cursor to build the delayed message tracker and add the scheduled message to the replay queue of all subscriptions when any subscription triggers the delayed message checking.

Compatibility

Upgrade

We can possible to enable the bucket based delayed message tracker by doing a rolling upgrade of the brokers, because the delayed message index in the old tracker only exists in the memory.

  1. Enable the bucket based delayed message tracker feature of all brokers.
  2. Waiting for all broker nodes have been upgraded finish.

And we also can possible to enable share delayed message index by doing a rolling upgrade of the brokers, but the delayed message bucket index will be rebuilt.

Downgrade

We can possible to disable the bucket based delayed message tracker and disable share delayed message index by doing a rolling downgrade of the brokers, because the previous memory tracker can rebuild the delayed message index.

  1. Disable share delayed message index feature and disable share delayed message index feature of all brokers.
  2. Waiting for all broker nodes have been downgrade finish.

References

[0] delayed message delivery
[1] Apache pulsar delay message delivery analysis

@ciyunruoshui
Copy link

I would like to know when this feature will be submitted

@coderzc
Copy link
Member Author

coderzc commented Aug 10, 2022

I would like to know when this feature will be submitted

The partial submission of PR will begin once the community vote is completed.

@codelipenghui codelipenghui added this to the 2.12.0 milestone Aug 11, 2022
@yapxue
Copy link
Contributor

yapxue commented Aug 15, 2022

Is there any recycle or compaction policies of Buckets? Bucket may hold a Delayed Message which is scheduled very long time later, this message can block bucket deletion, if we are unfortunate, every bucket have one such message, finally it can cuase too many buckets.

@coderzc
Copy link
Member Author

coderzc commented Aug 15, 2022

Is there any recycle or compaction policies of Buckets? Bucket may hold a Delayed Message which is scheduled very long time later, this message can block bucket deletion, if we are unfortunate, every bucket have one such message, finally it can cuase too many buckets.

We can limit the number of buckets by merging buckets.
More please see: Merge message index buckets and Delete message index bucket sections.

@coderzc
Copy link
Member Author

coderzc commented Aug 18, 2022

@codelipenghui @gaoran10 @liudezhi2098 @eolivelli
I have sent a vote email, please take a look. Thanks!
Vote Mailing list thread: https://lists.apache.org/thread/51n8kp64d16vxwh9h6klvyh1zo0owf91

@codelipenghui
Copy link
Contributor

codelipenghui commented Aug 18, 2022

Delete message index bucket
After merging buckets, the delayed message tracker will delete the old bucket.
Also, when all the delayed messages of all snapshots in the bucket are scheduled then tracker will delete that bucket.

We need to explain how snapshot can be guaranteed to be deleted without being missed.

Admin-API changes for clear snapshot

Could you please add more context about why we need this API? to rebuild the index?
It should be a risk if only clear the index without any cursor rewind or index rebuilding.
The consumer will not be able to receive delayed messages, no?

For the metrics we are using pulsar_delayed_message_index_* https://github.com/apache/pulsar/pull/15867/files#diff-b3f64ff76fdabb9e471435147298c3c707cbecae09023f0d86cf06f6ad78cbdaR352. It's better to keep the metrics name
consistent.

pulsar_delayed_message_index_bucket_total | Gauge
pulsar_delayed_message_index_snapshot_size_bytes | Gauge
pulsar_delayed_message_index_bucket_op_latency_ms {type="load/merge/del"} | Histogram
pulsar_delayed_message_index_bucket_op_failed_count | Counter

For the BucketSnapshotFormat.proto

Is it better to split SnapshotMetadata and SnapshotData into 2 commands? It's easier to read. The metadata message have bitset and scheduleTime, the data message only has the sorted list. And we'd better change BucketSnapshotFormat.proto to DelayedMessageIndexBucketSnapshotFormat.proto

CompletableFuture<Void> saveBucketAllSnapshots(long bucketId, List<BucketSnapshotFormat.BucketSnapshot> bucketSnapshots);

It's a little confusing for this one. For each bucket, we should only have one snapshot, for each entry in the snapshot, we should use a different name, snapshot record or something.

CompletableFuture createBucket();

I think this one is used to create a snapshot?

CompletableFuture deleteBucket(long bucketId);

is this one used to delete a snapshot?

void start() throws Exception;

What does this method really do? Does it look like initialize()?

BTW, It is hard to comment on the issue, it looks like we need to find a new approach to create and review the proposal.

@coderzc
Copy link
Member Author

coderzc commented Aug 18, 2022

Could you please add more context about why we need this API? to rebuild the index?
It should be a risk if only clear the index without any cursor rewind or index rebuilding.
The consumer will not be able to receive delayed messages, no?

Oh, It only is to clear residual data after operation downgrade, I think we can remove it and discuss the cleanup of residual data in the future.

For the metrics we are using pulsar_delayed_message_index_* https://github.com/apache/pulsar/pull/15867/files#diff-b3f64ff76fdabb9e471435147298c3c707cbecae09023f0d86cf06f6ad78cbdaR352. It's better to keep the metrics name
consistent.

Ok, I improve it.

Is it better to split SnapshotMetadata and SnapshotData into 2 commands? It's easier to read. The metadata message have bitset and scheduleTime, the data message only has the sorted list. And we'd better change BucketSnapshotFormat.proto to DelayedMessageIndexBucketSnapshotFormat.proto

Ok, I agree with it.

It's a little confusing for this one. For each bucket, we should only have one snapshot, for each entry in the snapshot, we should use a different name, snapshot record or something.

CompletableFuture createBucket();
I think this one is used to create a snapshot?
CompletableFuture deleteBucket(long bucketId);
is this one used to delete a snapshot?

Yes, I improve method name.

What does this method really do? Does it look like initialize()?

Yes, It is a initialized function.

@codelipenghui I already improve this proposal, please review it again.

@coderzc
Copy link
Member Author

coderzc commented Aug 30, 2022

@merlimat @gaoran10 @liudezhi2098 @eolivelli @mattisonchao @Technoboy- @poorbarcode Please take a look at the vote email and help complete the vote, Thanks!
Vote Mailing list thread: https://lists.apache.org/thread/51n8kp64d16vxwh9h6klvyh1zo0owf91

@github-actions
Copy link

github-actions bot commented Oct 4, 2022

The issue had no activity for 30 days, mark with Stale label.

@RobertIndie
Copy link
Member

Confirmed with @coderzc . This PIP has been implemented. Close this PIP as completed

@coderzc
Copy link
Member Author

coderzc commented Apr 11, 2023

It should be noted, I haven't implemented this part of Share the delayed message index across subscriptions yet, but this PIP still works well, although this part is not implemented, and I will consider implementing this part in the future.

@Anonymitaet
Copy link
Member

Hi @coderzc thanks for introducing this great feature!

I see some PRs related to this PIP were labeled with doc-not-need, so I want to double-check: for users, does this feature not pose any influence on usage? If it affects users, we need to add docs of “what is it” and “how to use it” at least. Thanks!

@coderzc
Copy link
Member Author

coderzc commented Apr 21, 2023

Hi @coderzc thanks for introducing this great feature!

I see some PRs related to this PIP were labeled with doc-not-need, so I want to double-check: for users, does this feature not pose any influence on usage? If it affects users, we need to add docs of “what is it” and “how to use it” at least. Thanks!

@Anonymitaet Thanks for your reminder, This feature is a performance improvement and does not change the API, I can submit some descriptions of this PIP to the site.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment