Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP-143] Support split bundle by specified boundaries #13761

Closed
aloyszhang opened this issue Jan 14, 2022 · 2 comments · Fixed by #13796
Closed

[PIP-143] Support split bundle by specified boundaries #13761

aloyszhang opened this issue Jan 14, 2022 · 2 comments · Fixed by #13796
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages type/PIP

Comments

@aloyszhang
Copy link
Contributor

aloyszhang commented Jan 14, 2022

Motivation

As we all know, a namespace bundle may contain lots of topic partitions belonging to different topics.
The throughput of these topics may vary greatly. Some topics may have a very high rate/throughput while other topics have a very low rate/throughput.

These partitions with high rate/throughput can cause broker overload and bundle unloading.
At this point, if we split bundle manually with range_equally_divide or topic_count_equally_divide split algorithm, there may need many times split before these high rate/through partitions assigned to different bundles.

For convenience, we call these high throughput topics outstanding topic and their partitions outstanding partition in this PIP.

Goal

Our goal is to make it easier to split outstanding partition into new bundles. So we raised up this PIP to introduce a more flexible algorithm to split namespace bundle.

The main idea is, for topics in a bundle, we can get their hash position for every topic first. After getting these hash positions, it's much easier for us to decide the position to split the bundle. We can split the bundle into either two throughput-equally bundles or multi throughput-equally bundles.

For example, there is bundle with boundaries 0x00000000 to 0x00000200, and four topics : t1 , t2 , t3 , t4 .

Step one. Get the hash position of these topics

t1 with hashcode 10

t2 with hashcode 20

t3 with hashcode 80

t4 with hashcode 90

Step two. Split the bundle

Here we have multi choices, like :

  • split the bundle into two topics equally bundles like the topic_count_equally_divide way, we can split at position between 21 ~ 80
  • split the bundle into four bundles and each bundle has one topic, we can split at the positions 15, 50, 85
  • split base on topic's throughput
  • ...

API Changes

We need two API changes for this PIP.

  1. Add a new API to get the positions for one ore more topics
     /**
     * Get positions for topic list in a bundle.
     *
     * @param namespace
     * @param bundle range of bundle
     * @param topicList
     * @return hash positions for all topics in topicList
     * @throws PulsarAdminException
     */
TopicHashPositions getTopicHashPositions(String namespace, String bundle, List<String> topicList) throws PulsarAdminException;
  1. Change the bundle split API to supporting split bundle at one or more specified hash positions
     /**
     * Split namespace bundle.
     *
     * @param namespace
     * @param bundle range of bundle to split
     * @param unloadSplitBundles
     * @param splitAlgorithmName
     * @param splitBoundaries
     * @throws PulsarAdminException
     */
    void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles,
                              String splitAlgorithmName, List<Long> splitBoundaries) throws PulsarAdminException;

Implementation

New API for getting topics positions

Add a new admin command GetTopicHashPositions for CmdNamespaces,

private class GetTopicHashPositions extends CliCommand {

        @Parameter(
                names = { "--bundle", "-b" },
                description = "{start-boundary}_{end-boundary} format namespace bundle",
                required = false)
        private String bundle;

        @Parameter(
                names = { "--topic-list",  "-tl" },
                description = "The list of topics to get posisions in this bunel",
                required = false)
        private List<String> topicList;
}

Add a new GET method getTopicHashPositions for Namespaces

  @GET
    @Path("/{tenant}/{namespace}/{bundle}/topicHashPositions")
    @ApiOperation(value = "Get hash positions for topics")
    @ApiResponses(value = {
            @ApiResponse(code = 403, message = "Don't have admin permission"),
            @ApiResponse(code = 404, message = "Namespace does not exist")})
    public TopicHashPositions getTopicHashPositions(
            @PathParam("tenant") String tenant,
            @PathParam("namespace") String namespace,
            @PathParam("bundle") String bundleRange,
            @QueryParam("topicList") List<String> topicList) {
            validateNamespaceName(tenant, namespace);
            return internalGetTopicHashPositions(bundleRange, new ArrayList<>(topicList));
    }

Add support for the split bundle by specified hash positions

Change the admin API to support split bundle by specified hash positions(split boundaries) in CmdNamespaces,

    private class SplitBundle extends CliCommand {

        @Parameter(names = { "--split-algorithm-name", "-san" }, description = "Algorithm name for split "
                + "namespace bundle. Valid options are: [range_equally_divide, topic_count_equally_divide, "
                + "specified_positions_divide]. Use broker side config if absent", required = false)
        private String splitAlgorithmName;

        @Parameter(names = { "--split-boundaries",
                "-sb" }, description = "Specified split boundary for bundle split, will split one bundle "
                + "to multi bundles only works with specified_positions_divide algorithm", required = false)
        private List<Long> splitBoundaries;

Change the method of Namespaces, adding a parameter for split boundaries.

public void splitNamespaceBundle(
            @Suspended final AsyncResponse asyncResponse,
            @PathParam("tenant") String tenant,
            @PathParam("namespace") String namespace,
            @PathParam("bundle") String bundleRange,
            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
            @QueryParam("unload") @DefaultValue("false") boolean unload,
            @QueryParam("splitAlgorithmName") String splitAlgorithmName,
            @QueryParam("splitBoundaries") List<Long> splitBoundaries) {

For code consistency, encapsulates all the parameters for bundle split into a new class BundleSplitOption

public class BundleSplitOption {
    private NamespaceService service;
    private NamespaceBundle bundle;
    private List<Long> positions;
}

Then add a new NamespaceBundleSplitAlgorithm named SpecifiedPositionsBundleSplitAlgorithm which can valid the split boundaries and return the final split boundaries.

public class SpecifiedPositionsBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm{
    @Override
    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOption) {
        NamespaceService service = bundleSplitOption.getService();
        NamespaceBundle bundle = bundleSplitOption.getBundle();
        List<Long> positions = bundleSplitOption.getPositions();
        if (positions == null || positions.size() == 0) {
            return CompletableFuture.completedFuture(null);
        }
        // sort all positions
        Collections.sort(positions);
        return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
            if (topics == null || topics.size() <= 1) {
                return CompletableFuture.completedFuture(null);
            }
            List<Long> splitBoundaries = positions
                    .stream()
                    .filter(position -> position > bundle.getLowerEndpoint() && position < bundle.getUpperEndpoint())
                    .collect(Collectors.toList());

            if (splitBoundaries.size() == 0) {
                return CompletableFuture.completedFuture(null);
            }
            return CompletableFuture.completedFuture(splitBoundaries);
        });
    }
}

Also, add the new bundle split algorithm to conf/broker.conf

supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_positions_divide

Reject Alternatives

Splitting the bundle by outstanding topic which will split the bundle into two new bundles and each new bundle contains an equally outstanding partition once a time. This algorithm has a disadvantage, it can only deal with one outstanding topic.

@aloyszhang aloyszhang added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Jan 14, 2022
@aloyszhang aloyszhang changed the title Support split paritions belonging to specified topic in a bundle Feb 17, 2022
@aloyszhang
Copy link
Contributor Author

@aloyszhang aloyszhang changed the title [PIP-143] Support split paritions belonging to specified topics in a bundle Feb 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages type/PIP
1 participant