Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-186: Introduce two phase deletion protocol based on system topic #16569

Open
horizonzy opened this issue Jul 13, 2022 · 1 comment
Open

Comments

@horizonzy
Copy link
Member

horizonzy commented Jul 13, 2022

Motivation

Original issue: #13238

In the current ledger deletion, we divided it into three separate steps.

  • Mark the ledger Id list which will be removed.

  • Remove the ledger id list from whole ledgers and update the whole ledgers into a pulsar meta store.

  • In the meta store successful update callback, delete the ledgers corresponding to the ledger id from the storage system, such as BookKeeper and Tiered storage.

Examples:

  • TopicA holds ledgers [1,2,3,4,5]. All the consumers consume all messages to ledger 2, then the pulsar thinks the ledger [1] can be deleted.

  • ManagedLedger removes ledger 1 from [1,2,3,4,5], then TopicA updates the [2,3,4,5] into meta store.

  • Received update meta store successful event, invoke bookkeeper delete API to delete ledger 1.

The deletion will operate meta store(step2) and storage system(step3), we can’t ensure the operations is in a transaction. If the operation on the meta store succeeds, but the operation on the storage system fails, the ledger will exist in the storage system forever, the pulsar didn't index the ledger anymore, so there is no chance of deleting it.

The ManagedLedger, ManagedCursor, and SchemaStorage all have this problem.

High-Level Design

Based on the above, we introduce two-phase delete to fix it. The first phase is metadata deletion, the second phase is storage deletion.

image-20230304231011349

Deletion Trigger: Who wishes to delete a ledger, ManagedLedger, ManagedCursor or SchemaStorage

System-Topic: Store the waiting delete ledger

Meta store: Store the ledgers which pulsar indexes

image-20230304230918829

Consumer: Consume system-topic msg, trigger deletion

System-Topic: Store the waiting delete ledge

Storage System: Where the data is actually stored

The first phase: metadata delete

When we wish to delete ledgers we need to do it in a batch, to minimize the amount of metadata store updates to 1.
1st step, we mark the ledgers we wish to delete from the managed-ledger ledgers list.
2nd step, we build a message called PendingDeleteLedgerInfo for each ledger we wish to delete.
3rd step, we send those messages to a special system topic designated to that purpose.
4th step, we create a new list of ledgers, based on the original managed ledger list, but we remove each ledger ID we have managed to send the message for.
5th, we update the metadata store manage ledger list of ledger IDs based on the list of ledger IDs we have prepared in the previous step.

The system-topic: pulsar/system/persistent/__ledger_deletion it store the pending delete ledgers.

Implements

In the LedgerDeletionService start, it will create a producer for sending pending delete ledger. When deleting a ledger, the producer sends PendingDeleteLedgerInfo to the system-topic. If the sent succeeds, delete the ledger id from the ledger list. If the sent fails, do nothing. In the end, update the remaining ledger list to the meta store.

The role of LedgerDeletionService

  • Send PendingDeleteLedgerInfo to the system-topic, for the first phase.
  • Delete the data from the storage system, for the second phase.

Example:

  • TopicA holds ledgers [1,2,3,4,5]. All the consumers consume all messages to ledger 3, then the pulsar thinks the ledger [1,2] can be deleted.
  • Send a PendingDeleteLedgerInfo corresponding ledger 1 to the system-topic, succeed
  • Send a PendingDeleteLedgerInfo corresponding ledger 2 to the system-topic, failed.
  • Remove ledger 1 from whole ledgers, the remaining whole ledger [1,3,4,5]
  • Update ledger [1,3,4,5] to the pulsar meta store

Pseudocode


List<Long> ledgerIds = pickPendindDeleteLedgerIds();
List<PendingDeleteLedgerInfo> pendindDeleteLedgerInfos = buildPendindDeleteLedgerInfo(ledgerIds);
List<Long> succeedSendLedgerIds = sendPendinDeleteLedgerInfos(pendindDeleteLedgerInfos); //return succeed send
List<LedgerInfo> remainingLedgerInfos = Subtract(allLedgerInfos, succeedSendLedgerIds);
updateLedgerMetastore(remainingLedgerInfos);

Tips: We also can't ensure the send msg and update meta store in a transaction. So there still is the case: send PendingDeleteLedgerInfo to the system-topic, but updating the meta store failed, the ledger still can be read. The ledger is marked deletion in the system-topic, but the ledger is still in use at the pulsar side. In the second phase delete, will cover this case.

The PendingDeleteLedgerInfo definition
public class PendingDeleteLedgerInfo {

    private String topicName;

    private LedgerComponent ledgerComponent;
  
    private String cursorName;
  
    private Long schemaId;

    private LedgerType ledgerType;

    private Long ledgerId;

    private MLDataFormats.ManagedLedgerInfo.LedgerInfo.OffloadContext offloadContext;
}

topicName: the ledger where it is from.

ledgerComponent: sign the ledger using, Managed-Ledger to store message data. Managed-Corsor store consumer cursor data. Schema-Storage store topic schema data. When deleting a ledger, we need it to check the ledger metadata in bookkeeper side.(The second phase Delete at bookkeeper side will explain why need to check it)

cursorName: If the ledger is used for Managed-Cursor, it records the cursor name. The cursor name is equivalent to the consumer subscription name, a topic may have some different subscription groups, so we need the cursor name to differentiate it. We need it to check the ledger metadata on the bookkeeper side.

schemaId: If the ledger is used for Schema-Storage, it records the schema id. The schema id is the index for schema storage, the pulsar uses schemaId to get the schema entry location. We also need it to check the ledger metadata on the bookkeeper side.

ledgerType: Ledger or Offload-Ledger. in pulsar, it stores the message to the bookkeeper normally, the bookkeeper machine needs more cost, it stores the warm data. The cold data will be moved to tiered-storage, we call offload-ledger. See PIP-17 for detail.

ledgerId: the ledger id

offloadContext: if a ledger offloads to the tiered-storage(like GCS,S3), the pulsar will record the ledger tiered-storage info to offloadContext, and the offloadContext will be persist to the pulsar ledger metadata. When deleting an offload-ledger, we need offloadContext to delete the tiered-storage data. If a ledger is a normal ledger, offloadContext is null.

If a ledger exists on both the bookkeeper side and the tiered-storage side, when we want to delete the ledger, we should send two PendingDeleteLedgerInfo to the system topic. The One ledgerType is Ledger, another ledgerType is Offload-Ledger. Separating two messages is easier to handle, we can handle PendingDeleteLedgerInfo for bookkeeper or tiered-storage alone.

Process flow

image-20230304233419003

The second phase: storage delete

In the first phase, it sends PendingDeleteLedgerInfo to the system-topic. Just need to make sure PendingDeleteLedgerInfo gets deleted eventually at the second phase, the storage system won't exist orphan ledgers. If the PendingDeleteLedgerInfo deletes succeeds, ack the msg. If fails, it can consume later.

Implements

In the LedgerDeletionService start, it will start a consumer to consume PendingDeleteLedgerInfo, the consumer use SubscriptionType.Shared to subscribe.

When the consumer received a PendingDeleteLedgerInfo, it will start the deletion process.

1.Check whether the topic exists or not. to 2.1 or 2.2

2.1 If the topic exists, the corresponding ManagedLedger takes over the ledger deletion. to 2.1.1

2.1.1 The consumer uses pulsar admin to send PendingDeleteLedgerInfo by the broker restful API. to 2.1.1.1 (The consumer may not be the owner of the topic, we want the topic owner to take over the ledger deletion. So here, use the pulsar admin to send the PendingDeleteLedgerInfo to the broker, the PendingDeleteLedgerInfo will auto-route to the owner broker.)

2.1.1.1The broker received PendingDeleteLedgerInfo, check if the ledger is still in use, and then check the PendingDeleteLedgerInfo ledgerType. to 3.1 or 3.2

2.2 If the topic not exists, the consumer checks the PendingDeleteLedgerInfo ledgerType. to 3.1 or 3.2

3.1 If the ledgerType is Ledger, delete the data at the bookkeeper side.

3.2 If the ledgerType is Offload-Ledger, delete the data at the tiered-storage side.

Pseudocode

//Consumer side.
PendingDeleteLedgerInfo info = consumer.receive().getValue();
boolean topicExists = checkTheTopicIsExists(info.topic)
if (topicExists) {
    Response response = pulsarAdmin.topics().deleteLedger(info);
    if (response.isSucceed) {
        consumer.ackMessage(info);
    } else {
        if (response.errorMsg == "ledger still in use") {
            consumer.ackMessage(info);
        } else {
            consumer.reconsumeLater(info);
        }
    }
} else {
    boolean isSucceed = false;
    if (info.ledgerType == Ledger) {
        isSucceed = ledgerDeletionService.deleteLedger(info);
    } else if (info.ledgerType == Offload - Ledger) {
        isSucceed = ledgerDeletionService.deleteOffloadLedger(info);
    }
    if (isSucceed) {
        consumer.ackMessage(info);
    } else {
        consumer.reconsumeLater(info);
    }
}
//Broker side.
//The broker restful API received the request, it pass PendingDeleteLedgerInfo to ManagedLedger
if (theLedgerStillInTheLedgerLists(info.ledgerId)) {
    return httpResponse.ERROR("ledger still in use");
}
boolean isSucceed = false;
if (info.ledgerType == Ledger) {
    isSucceed = ledgerDeletionService.deleteLedger(info);
} else if (info.ledgerType == Offload - Ledger) {
    isSucceed = ledgerDeletionService.deleteOffloadLedger(info);
}
if (isSucceed) {
    return httpResponse.OK;
} else {
    return httpResponse.ERROR;
}

In the first phase mark, we mention the send succeeds and update meta store failed case. The 2.1.1.1 check the ledger still in the ledger lists cover this case. When the consumer consumes the msg, it uses pulsarAdmin to send a delete command to the broker, it might discover that the ledger ID is still in the ledger list in the metadata store, and the ledger may be used by consumers. So if we delete the ledger on the storage side, the consumer will throw an exception. The broker need to throw an exception to avoid the delete it, then respond to the error msg "ledger still in use" to the consumer.

Delete on the bookkeeper side

For security, the hacker may fake a PendingDeleteLedgerInfo and send it to the system-topic. So we need to do some checks work before deleting the data.

In pulsar, when creating a new ledger, it will append some pulsar info(KV structure) to the bookkeeper ledger metadata.

metadata example

//For Managed-Ledger
key:application, value:pulsar
key:component, value:managed-ledger
key:pulsar/managed-ledger, value: ledgerName

//For Managed-Cursor
key:application, value:pulsar
key:component, value:managed-ledger
key:pulsar/managed-ledger, value: ledgerName
key:pulsar/cursor, value: curSorName

//For Schema-Storage
key:application, value:pulsar
key:component, value:schema
key:pulsar/schemaId, value: schemaId

1.Fetch the bookkeeper ledger metadata

2.Check the bookkeeper ledger metadata whether matches the input param PendingDeleteLedgerInfo or not.

3.1If matches, delete the ledger.

3.2If not match, ignore the PendingDeleteLedgerInfo.

Pseudocode

Map<String, String> bkMetadata = fetchBookkeeperLedgerMetadata(info.ledgerId);
boolean isMatch = checkIsMatch(bkMetadata, info);
if (isMatch) {
	return deleteBookkeeperData(info.ledgerId);	
} else {
	//If the param didn't match, we also think it delete successfully, avoid the retry.
	return true;
}

A little optimization, at the mark phase, after sending msg to system-topic and updating the meta store, we can think the ledger will be removed at later, append the ledger id to the belivedLedgerIdSet. At the delete phase, if the topic exists, the consumer sends PendingDeleteLedgerInfo to the broker using pulsarAdmin. The request will route to the broker which owns the topic, so we can use belivedLedgerIdSet to check if the PendingDeleteLedgerInfo is believed or not. If the PendingDeleteLedgerInfo.ledgerId exists in belivedLedgerIdSet, we can skip the bookkeeper ledger metadata match check.

Delete at tiered-storage side

The ledger deletion is easy to implement. Build an offloader according to the PendingDeleteLedgerInfo.offloadContext, then use offloader.deleteOffloaded() directly.

Process flow

180195243-1237d813-b4f5-4b2b-9509-3f236914e44e

The deletion backoff

Either delete the bookkeeper data or the tiered-storage data, it may fail for some reason. We need the deletion backoff to retry the deletion until the deletion is successful.

We use the pulsar consumer deadLetterPolicy to implement it.

DeadLetterPolicy.builder()
                .retryLetterTopic("pulsar/system/persistent/__ledger_deletion-RETRY")
                .deadLetterTopic("pulsar/system/persistent/__ledger_deletion-DLQ")
                .maxRedeliverCount(10).build();

If the delete ledger failed, consumer.reconsumeLater(info, reconsumeLaterSeconds, TimeUnit.SECONDS), it will send PendingDeleteLedgerInfo to the retryLetterTopic, After reconsumeLaterSeconds(configurable) seconds, the consumer will receive the PendingDeleteLedgerInfo again, then start deletion process for it. If a PendingDeleteLedgerInfo fails and retry RedeliverCount (configurable) times, then sends it to deadLetterTopic, the consumer won't receive it again.

The Retry system-topic: pulsar/system/persistent/__ledger_deletion-RETRY it store the retry pending delete ledgers

The DLQ system-topic: pulsar/system/persistent/__ledger_deletion-DLQ stores the giving up delete pending delete ledgers

There are some errors but we think it was successful, and avoid retrying it again.

  • The Ledger is Still in use (If retry, the ledger is still in use, didn't change. )
  • The PendingDeleteLedgerInfo is not matched with the bookkeeper ledger metadata (If retry, the PendingDeleteLedgerInfo is still not matched)
  • The data is not exist in the bookkeeper or tiered-storage (The aim is to delete the data, if the data not exist, think it succeeds)
  • The PendingDeleteLedgerInfo param is not incorrect, miss some essential property (If retry, the PendingDeleteLedgerInfo param is also incorrect)
The deletion idempotent

There are some edge cases, the first deletion succeeds but not ack due to the pulsar restart before ack the msg, the same ledger may be deleted twice. The first deletion will delete the data, the second deletion will receive an exception which indicates the data already delete, we will ignore this exception, and think the second deletion is also successful. And we can handle the deletion without any locks.

Monitor

Metrics

How many ledgers send to the system-topic: After sending the msg to the system-topic, increase the value.

How many offload-ledger send to system-topic: After sending the msg to the system-topic, increase the value.

How many messages does the consumer received: After The consumer received one message, increase the value.

How many ledgers have been deleted: After the LedgerDeletionService deletes the ledger, increase the value.

How many offload-ledger have been deleted: After the LedgerDeletionService deletes the offload-ledger, increase the value.

How many ledger delete failed: After the LedgerDeletionService deletes the ledger failed, increase the value.

How many offload-ledger delete failed: After the LedgerDeletionService deletes the offload-ledger failed, increase the value.

how many messages were acked: After the consumer ack the message succeed, increase the value.

how many message reach the max retry delete count: If the msg retry deletes count reach twoPhaseDeletionMaxRetryDeleteCount(we can use the message property RECONSUMETIMES to get the count), increase the value.

increase the value.

Description api

How many estimated in-flight ledger deletions: If the backlog is 0, means all the ledgers have been deleted.

Add new description API in pulsarAdmin.BrokerStats for getting it.

The API implements.

  1. Get the number of system-topic partition
  2. Then invoke admin.getStat for every partitioned-topic
  3. sum every stat.backlogSize

API Changes

Introduce LedgerDeletionService interface
public interface LedgerDeletionService {

    void start() throws PulsarClientException, PulsarAdminException;

    CompletableFuture<?> sendPendingDeleteLedger(String topicName, long ledgerId, LedgerInfo context,
                                                   LedgerComponent component, LedgerType type);

    CompletableFuture<?> asyncDeleteLedger(String topicName, long ledgerId, LedgerComponent component,
                                           boolean skipAuth);

    CompletableFuture<?> asyncDeleteOffloadedLedger(String topicName, long ledgerId,
                                                    MLDataFormats.OffloadContext offloadContext);

    CompletableFuture<?> asyncClose();
}
Introduce delete ledger API in ManagedLedger

Broker received the pulsarAdmin deleteLedger restful request, pass it to ManagedLedger.

public interface ManagedLedger {

    CompletableFuture<?> asyncDeleteLedger(String topicName, long ledgerId, LedgerType ledgerType,
                                              MLDataFormats.OffloadContext offloadContext);

    void deleteLedger(String topicName, long ledgerId, LedgerType ledgerType,
                      MLDataFormats.OffloadContext offloadContext) throws InterruptedException, ManagedLedgerException;
}
Introduce admin api in Topics

Consumer use pulsar.admin().topics().asyncDeleteLedger() to send request to the broker

public interface Topics {

    CompletableFuture<Void> asyncDeleteLedger(DeleteLedgerPayload deleteLedgerPayload);

    void deleteLedger(DeleteLedgerPayload deleteLedgerPayload) throws PulsarAdminException;

}
Introduce admin API in BrokerStats
public interface BrokerStats {

		Long getEstimatedInflightDeletionLedgerCount() throws PulsarAdminException;
		
    CompletableFuture<Long> getEstimatedInflightDeletionLedgerCountAsync();
}

DeleteLedgerPayload

public class DeleteLedgerPayload {

    private long ledgerId;

    private String topicName;

    private String ledgerType;

    private String ledgerComponent;

    private String schemaId;

    private String cursorName;
    
    //Pass to broker, the broker uses lsb and msb to calculate uuid, 
    //uses driverName to build offloader, and uses metadata to locate the data segment.
    private OffloadContext offloadContext;
    
    public static class OffloadContext {
        private long lsb;
        private long msb;
        private String driverName;
        private Map<String, String> metadata;
    }
}

Configuration Changes

Add new property in ServiceConfiguration

public class ServiceConfiguration {

    private boolean topicTwoPhaseDeletionEnabled = false;
		
    private int twoPhaseDeletionLedgerDeletionParallelism = 4;

    private int twoPhaseDeletionReconsumeLaterInSeconds = 600;

    private int twoPhaseDeletionMaxRetryDeleteCount = 10;
}

twoPhaseDeletionLedgerDeletionParallelism: The system-topic number of the partition.

Compatibility

If the user upgrade and enable two-phase deletion, the ledger deletion msg will be sent to the system-topic, and the msg in the system-topic will be consumed and deleted the ledger in the future. But if the user rolls back to the old version and the system-topic msgs haven't been consumed, the ledger won't be deleted.

There are two tips for the users.

  1. You can use pulsarAdmin.BrokerStats().getEstimatedInflightDeletionLedgerCount() to get the number of How many estimated in-flight ledger deletions, if the value is 0. means that all the ledger already been deleted.
  2. If you roll back to the old version for a while, then upgrade to this version again, the ledger in the system-topic still be deleted
@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
3 participants