Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-187 Add API to analyze a subscription backlog and provide a accurate value #16597

Closed
eolivelli opened this issue Jul 14, 2022 · 1 comment
Assignees
Labels
Milestone

Comments

@eolivelli
Copy link
Contributor

eolivelli commented Jul 14, 2022

Motivation

Currently there is no way to have a accurate backlog for a subscription:

  • you have only the number of "entries", not messages
  • server side filters (PIP-105) may filter out some messages

Having the number of entries is sometimes not enough because with batch messages the amount of work on the Consumers is proportional to the number of messages, that may vary from entry to entry.

Goal

The idea of this patch is to provide a dedicate API (REST, pulsar-admin, and Java PulsarAdmin) to "analyze" a subscription and provide detailed information about that is expected to be delivered to Consumers.

The API will allow users to calculate the backlog since the latest unacked position (lastMarkDeletePosition) or since a given Position onwards.

The operation will be quite expensive because we have to load the messages from storage and pass them to the filters, but due to the dynamic nature of Pulsar subscriptions there is no other way to have this value.

One good strategy to do monitoring/alerting is to setup alerts on the usual "stats" and use this new API to inspect the subscription deeper, typically be issuing a manual command.

API Changes

internal ManagedCursor API:

CompletableFuture<ScanOutcome> scan(Optional<Position> startingPosition, Predicate<Entry> condition, long maxEntries, long timeOutMs);

This method scans the Cursor from the given position (or from lastMarkDelete position if startingPosition is not provided) to the tail.
There is a time limit and a maxEntries limit, these are needed in order to prevent huge (and useless) scans.
The Predicate can stop the scan, if it doesn't want to continue the processing for some reasons.

New REST API:

    @GET
    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog Backlog")
    @ApiOperation(value = "Analyze a subscription, by scanning all the unprocessed messages")
           
    public void analyzeBacklog SubscriptionBacklog(
           @Suspended final AsyncResponse asyncResponse,
            @ApiParam(value = "Specify the tenant", required = true)
            @PathParam("tenant") String tenant,
            @ApiParam(value = "Specify the namespace", required = true)
            @PathParam("namespace") String namespace,
            @ApiParam(value = "Specify topic name", required = true)
            @PathParam("topic") @Encoded String encodedTopic,
            @ApiParam(value = "Subscription", required = true)
            @PathParam("subName") String encodedSubName,
            @ApiParam(value = "Position", required = false)
            @QueryParam("position") String position,
            @ApiParam(value = "Is authentication required to perform this operation")
            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {

API response model:

public class AnalyzeSubscriptionBacklogResult {
    private long entries;
    private long messages;

    private long filterRejectedEntries;
    private long filterAcceptedEntries;
    private long filterRescheduledEntries;

    private long filterRejectedMessages;
    private long filterAcceptedMessages;
    private long filterRescheduledMessages;

    private boolean aborted;
    private Position startingPosition;
    private Position lastScannedPosition;

The response contains "aborted=true" is the request has been aborted by some internal limitations, like a timeout or the scan hit the max number of entries.
We are not going to provide more details about the reason of the stop. It will make the API too detailed and harder to maintain. Also, in the logs of the broker you will find the details.

New PulsarAdmin API:

/**
     * Analyze subscription backlog.
     * This is a potentially expensive operation, as it requires
     * to read the messages from storage.
     * This function takes into consideration batch messages
     * and also Subscription filters.
     * @param topic
     *            Topic name
     * @param subscriptionName
     *            the subscription
     * @param position the position to start the scanning from
     * @return an accurate analysis of the backlog
     * @throws PulsarAdminException
     *            Unexpected error
     */
    AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName, Position position)
            throws PulsarAdminException;

    /**
     * Analyze subscription backlog.
     * This is a potentially expensive operation, as it requires
     * to read the messages from storage.
     * This function takes into consideration batch messages
     * and also Subscription filters.
     * @param topic
     *            Topic name
     * @param subscriptionName
     *            the subscription
     * @param position the position to start the scanning from
     * @return an accurate analysis of the backlog
     * @throws PulsarAdminException
     *            Unexpected error
     */
    CompletableFuture<AnaliseSubscriptionBacklogResult> analiseSubscriptionBacklogAsync(String topic,
                                                                                        String subscriptionName, Position position);

A pulsar-admin command will be added as well as usual.

New configuration entries in broker.conf:

@FieldContext(
         category = CATEGORY_POLICIES,
         doc = "Maximum time to spend while scanning a subscription to calculate the accurate backlog"
 )
 private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
 @FieldContext(
         category = CATEGORY_POLICIES,
         doc = "Maximum number of entries to process while scanning a subscription to calculate the accurate backlog"
 )
 private long subscriptionBacklogScanMaxEntries = 10_000;

Implementation

The implementation is pretty straightforward:

  • add a new API in ManagedCursor to do the Scan
  • add the REST API
  • implement in PersistentSubscription a analiseBacklog method that does the scan

The the PersistentSubscription runs the scan:

  • it applies the filters if they are present
  • it considers individuallyDeletedMessages

Reject Alternatives

  1. We could store somehow some counter about the number of logical messages during writes. But that does not work for a few reasons:
  • you cannot know which subscriptions will be created in a topic
  • subscription can be created from the past (Earliest)
  • subscription filters may change over time: they are usually configured using Subscription Properties, and those properties are dynamic
  • doing computations on the write path (like running filters) kills latency and thoughtput
  1. Use a client to clone the subscription and consume data.
    This doesn't work because you have to transfer the data to the client, and this is possibly a huge amount of work and a waste of resources.
@codelipenghui
Copy link
Contributor

Close since the PR has merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
2 participants