Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-180: Shadow Topic, an alternative way to support readonly topic ownership. #16153

Open
Jason918 opened this issue Jun 21, 2022 · 5 comments
Assignees
Milestone

Comments

@Jason918
Copy link
Contributor

Jason918 commented Jun 21, 2022

discuss mail-thread: https://lists.apache.org/thread/o9k7trfmxrz89b0woybnshonpkq8ybw1

Motivation

The motivation is the same as PIP-63, with a new broadcast use case of supporting 100K subscriptions in a single topic.

  1. The bandwidth of a broker limits the number of subscriptions for a single topic.
  2. Subscriptions are competing for the network bandwidth on brokers. Different subscriptions might have different levels of severity.
  3. When synchronizing cross-city message reading, cross-city access needs to be minimized.
  4. [New] Broadcast with 100K subscriptions. There is a limitation of the subscription number of a single topic. It's tested by Hongjie from NTT Lab that with 40K subscriptions in a single topic, the client needs about 20min to start all client connections, and under 1 msg/s message producer rate, the average end to end latency is about 2.9s. And for 100K subscriptions, the time of start connection and E2E latency is beyond consideration.

However, it's too complicated to implement with original PIP-63 proposal, the changed code is already over 3K+ lines, see #11960, and there are still some problems left,

  1. The LAC in readonly topic is updated in a polling pattern, which increases the bookie load bookie.
  2. The message data of readonly topic won't be cached in broker. Increase the network usage between broker and bookie when there are more than one subscriber is tail-reading.
  3. All the subscriptions is managed in original writable-topic, so the support max subscription number is not scaleable.

This PIP tries to come up with a simpler solution to support readonly topic ownership and solve the problems the previous PR left. The main idea of this solution is to reuse the feature of geo-replication, but instead of duplicating storage, it shares underlying bookie ledgers between different topics.

Goal

The goal is to introduce Shadow Topic as a new type of topic to support readonly topic ownership. Just as its name implies, a shadow topic is the shadow of some normal persistent topic (let's call it source topic here). The source topic and the shadow topic must have the same number of partitions or both non-partitioned. Multiply shadow topics can be created from a source topic.

Shadow topic shares the underlying bookie ledgers from its source topic. User can't produce any messages to shadow topic directly and shadow topic don't create any new ledger for messages, all messages in shadow topic come from source topic.

Shadow topic have its own subscriptions and don't share with its source topic. This means the shadow topic have its own cursor ledger to store persistent mark-delete info for each persistent subscriptions.

The message sync procedure of shadow topic is supported by shadow replication, which is very like geo-replication, with these difference:

  1. Geo-replication only works between topic with the same name in different broker clusters. But shadow topic have no naming limitation and they can be in the same cluster.
  2. Geo-replication duplicates data storage, but shadow topic don't.
  3. Geo-replication replicates data from each other, it's bidirectional, but shadow replication only have one way data flow.

API Changes

  1. PulsarApi.proto
    Shadow topic need to know the original message id of the replicated messages, in order to update new ledger and lac.
    So we need add a message_id in CommandSend for replicator.
message CommandSend {
   // ... old fields.

   // Message id of this message, currently is used in replicator for shadow topic.
   optional MessageIdData message_id = 9;
}
  1. Admin API for managing shadow topics with source topic in org.apache.pulsar.client.admin.Topics.
void createShadowTopic(String sourceTopicName, String shadowTopicName);
void deleteShadowTopic(String sourceTopicName, String shadowTopicName);
List<String> admin.topics().getShadowTopics(String sourceTopicName);

//And their async version methods.

This requires new REST interfaces in admin server, where

PATH = "/{tenant}/{namespace}/{topic}/shadowTopics";
METHOD = POST/DELETE/GET;
  1. Service configuration
int shadowReplicatorAutoResetBacklogEntries = 0;

Once backlog entry number exceeded this threshold, the shadow replicator will reset
the cursor to LATEST automatically.

Implementation

image

There are some key changes for implementation:

How to replicate messages to shadow topics.

This part is mostly implemented by ShadowReplicator, which extends PersistentReplicator introduced in geo-replication. The shadow topic list is added as a new topic policy of the source topic. Source topic manage the lifecycle of all the replicators. The key is to add message_id when produce message to shadow topics.

How shadow topic manage shared ledgers info.

This part is mostly implemented by ShadowManagedLedger, which extends current ManagedLedgerImpl with two key override methods.

  1. initialize(..)
    a. Fetch ManagedLedgerInfo of source topic instead of current shadow topic. The source topic name is stored in the topic policy of the shadow topic.
    b. Open the last ledger and read the explicit LAC from bookie, instead of creating new ledger. Reading LAC here requires that the source topic must enable explicit LAC feature by set bookkeeperExplicitLacIntervalInMills to non-zero value in broker.conf.
    c. Do not start checkLedgerRollTask, which tries roll over ledger periodically
  2. internalAsyncAddEntry()
    Instead of write entry data to bookie, It only update metadata of ledgers, like currentLedger, lastConfirmedEntry and put the replicated message into EntryCache.

Besides, some other problems need to be taken care of.

  • Any ledger metadata updates need to be synced to shadow topic, including ledger offloading or ledger deletion. Shadow topic needs to watch the ledger info updates with metadata store and update in time.
  • The local cached LAC of LedgerHandle won't updated in time, so we need refresh LAC when a managed cursor requests entries beyond known LAC.

How to handle schema for shadow topic consumptions.

Schema is part of a topic's metadata. So shadow topic won't
have it's own schema, but it shares the schema info of source topic.

For consumers, we need to support GetSchema command for shadow topic, and there are
two interface for this.

  1. Binary protocol, which handles in CommandGetSchema in
    ServerCnx#handleGetSchema. We only need to replace the requested shadow
    topic 's schemaName to the schemaName of source topic, and the
    underlying read operation is supported by
    SchemaRegistry#getSchema(String, SchemaVersion).

  2. HTTP protocol, which handles in SchemasResource#getSchema(...). Similar
    with the approach in binary protocol, replace the schemaId with source
    topic in SchemasResourceBase#getSchemaId.

For admins, we can support other "read" ops besides getSchema, including
getAllSchemas and getVersionBySchema, which all can be supported by the
same way as getSchema.

How to handle topic truncation and deletion

  1. Truncation, from command bin/pulsar-admin topics truncate source-topic.
    For source topic truncation, nothing changes. It still move all cursors to the
    end of the topic and delete all inactive ledgers.
    As shadow topic will watch ManagedLedgerInfo in metadata store, once it
    knows ledgers deleted, all cursors will skip all deleted ledgers.

  2. Deletion, from command bin/pulsar-admin topics delete source-topic.
    Like geo-replication, topic deletion is forbidden if topic have shadow
    replicators, users have to delete shadow topics first.

How to handle security permissions about the shadow topic

As shadow topic is usually in another namespace, it would have its own
independent permission settings, and we can configure different permissions
for source topic and shadow topic. So there would be no guarantee that you are
allowed to consume shadow topic if you have permission to consume source
topic.

On the other hand, we uses topic policy to store shadow topic settings, so a
new policy permission item needs be added as PolicyName.SHADOW_TOPIC, and user
must have PolicyOperation.WRITE to this policy to create/delete shadow topics.

How do Shadow Topics work with Offloaded ledgers?

Offloading a ledger is a kind of writing operation to topic's metadata, so
shadow topic can't offload ledgers to other long term storage. However, for
ledgers thats are already offloaded by source topic, it's expected to support
reading from offload ledgers in shadow topic, just like read from source
topic.

The implementation depends on shadow topic watching ManagedLedgerInfo in
metadata store, and if LedgerInfo.offloadContext is updated by source topic
offloader, shadow topic can get fully information to get a readHandle from
ledgerOffload. And of course, the pre-condition is the shadow topic must have
the same offload driver settings.

How to handle changes in the number of partitions

The updates on partition number will be synced to the shadow topic.
A source topic or partition will be responsible for the creation and deletion
of its corresponding shadow topic partitions.

New topic stats

For topic stats on source topic, as shadow replicator will reuse most of current
PersistentReplicator, the ReplicatorStatsImpl also can be applied to shadow replicators.
And we need to add a new field in TopicStatsImpl like geo-replication:

Map<String /*shadow topic name*/, ReplicatorStatsImpl> shadowReplication;

As for topic stats on shadow topic, previous TopicStatsImpl still applies.
And I don't see any other stats need to be added at this point.

Reject Alternatives

See PIP-63.

@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@RobertIndie
Copy link
Member

Hi, @Jason918. What is the status of this PIP? I see that the basic function is ready for this PIP. So my understanding is that some features are still left to be completed, right?

I move it to the 3.1.0 milestone first. Feel free to ping me if there are any problems.

@RobertIndie RobertIndie modified the milestones: 3.0.0, 3.1.0 Apr 11, 2023
@Jason918
Copy link
Contributor Author

Hi, @Jason918. What is the status of this PIP? I see that the basic function is ready for this PIP. So my understanding is that some features are still left to be completed, right?

I move it to the 3.1.0 milestone first. Feel free to ping me if there are any problems.

OK, let's move it to 3.1 first. We need add doc and more test of this. @StevenLuMT will continue the work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
3 participants