Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-137: Pulsar Client Shared State API #13490

Open
eolivelli opened this issue Dec 24, 2021 · 5 comments
Open

PIP-137: Pulsar Client Shared State API #13490

eolivelli opened this issue Dec 24, 2021 · 5 comments

Comments

@eolivelli
Copy link
Contributor

eolivelli commented Dec 24, 2021

Motivation

Sometimes in a distributed application or library that already uses Pulsar you need to some "state" across several instances of the application, for example:

  • metadata
  • dynamic configuration
  • assignments of tasks
  • build simple key-value database

Such cases are also very frequent while developing Pulsar IO Connectors or Pulsar Broker Protocol Handlers.

Currently you end up in adding some additional component to the application, like a Database, or in using the internal ZooKeeper or BookKeeper/Distributed Log components supporting Pulsar.
This is usually awkward both for the developers and for system administrators.

We can provide a built-in mechanism in the Pulsar client API to support building such shared data structures.

In fact since Pulsar 2.8.0 we have the Exclusive Producer, that allows you to use Pulsar as a consistent write-ahead-log for replicated state machines.

We can provide an API to handle a shared distributed Java Object: each client can access the Object and mutate the State,
ensuring consistency.

This is a sample implementation: https://github.com/eolivelli/pulsar-shared-state-manager

Goal

  • Add an API to the Java client that makes it easier to maintain a consistent Share State between instances of an application.
  • Provide some ready to use recipes, like a simple key-value store

It is not a goal to implement a Pulsar backed Database system

API Changes

public interface SharedStateManager<V, O> extends AutoCloseable {

    /**
     * Read from the current state.
     * @param reader a function that accesses current state and returns a value
     * @param latest ensure that the value is the latest
     * @return an handle to the result of the operation
     */
    <K> CompletableFuture<K> read(Function<V, K> reader, boolean latest);

    /*
     * Execute a mutation on the state.
     * The operationsGenerator generates a list of mutations to be
     * written to the log, the operationApplier function
     * is executed to mutate the state after each successful write
     * to the log. Finally the reader function can read from
     * the current status before releasing the write lock.
     * @param operationsGenerator generates a list of mutations
     * @param operationApplier apply each mutation to the current state
     * @param reader read from the status while inside the write lock
     * @param <K> the returned data type
     * @param <O> the operation type
     * @return a handle to the completion of the operation
     */
    <K> CompletableFuture<K> write(Function<V, List<O>> operationsGenerator,
                                     Function<V, K> reader);
                         
    @Override            
    void close();                            
    
    
    interface SharedStateManagerBuilder {

        <O> SharedStateManagerBuilder withOpSerializer(Function<O, byte[]> opSerializer);
        <O> SharedStateManagerBuilder withOpDeserializer(Function<byte[], O> opDeserializer);
        <V> SharedStateManagerBuilder withDatabaseInitializer(Supplier<V> databaseInitializer);
        <V, O> SharedStateManagerBuilder withChangeLogApplier(BiConsumer<V, O> changeLogApplier);
        <V, O> SharedStateManagerBuilder<V, O> build();
    }
    
}

PulsarMap recipe, interface:

public interface PulsarMap<K,V> extends AutoCloseable {

    /**
     * Get the value associated to a Key.
     * @param key the key
     * @param latest ensure that the value is the latest
     * @return an handle to the result of the operation
     */
    default CompletableFuture<V> get(K key, boolean latest) {
        return getOrDefault(key, null, latest);
    }

    /**
     * Get the value associated to a Key.
     * @param key the key
     * @param defaultValue a value in case that the key is not bound to any value
     * @param latest ensure that the value is the latest
     * @return an handle to the result of the operation
     */
    CompletableFuture<V> getOrDefault(K key, V defaultValue, boolean latest);

    /**
     * Scan the database
     * @param filter a filter on the key
     * @param processor the function to process the data
     * @param latest ensure that the value observed is the latest
     * @return an handle to the result of the operation
     */
    CompletableFuture<?> scan(Function<K, Boolean> filter, BiConsumer<K, V> processor, boolean latest);

    /**
     * Update a binding, the operation may be executed multiple times, until the operation succeeds.
     * If the operation returns null the  value will be removed
     * @param key the key
     * @param operation a function that modifies the value
     * @return an handle to the completion of the operation
     */
    CompletableFuture<V> update(K key, BiFunction<K, V, V> operation);

    /**
     * Update multiple bindings, the operation may be executed multiple times, until the operation succeeds.
     * For each key the operation returns null the value will be removed
     * @param filter a filter to skip processing some keys and reduce the usage of resources
     * @param operation a function that modifies the value
     * @return a handle to the completion of the operation
     */
    CompletableFuture<?> updateMultiple(Function<K, Boolean> filter, BiFunction<K, V, V> operation);

    /**
     * Delete all bindings.
     * @return a handle to the completion of the operation
     */
    CompletableFuture<?> clear();

    /**
     * List all keys.
     * @param latest ensure that we are up-to-date
     * @return a handle to the completion of the operation
     */
    default CompletableFuture<Collection<K>> listKeys(boolean latest) {
        List<K> result = new CopyOnWriteArrayList<>();
        return scan((k) -> true, (k,v) -> {
            result.add(k);
        }, latest).thenApply(___ -> result);
    }

    /**
     * Delete a binding
     * @param key the key
     * @return a handle to the completion of the operation
     */
    default CompletableFuture<V> delete(K key) {
        return update(key, (k, v) -> null);
    }

    /**
     * Update a binding only if the value matches the expected value
     * @param key the key
     * @param expectedValue the expected value, null means that the binding does not exist
     * @return a handle to the completion of the operation
     */
    default CompletableFuture<V> replace(K key, V expectedValue, V value) {
        return update(key, (k,v ) -> Objects.equals(v, expectedValue) ? value : v);
    }
    /**
     * Update a binding
     * @param key the key
     * @return a handle to the completion of the operation
     */
    default CompletableFuture<V> put(K key, V value) {
        return update(key, (k,v )-> value);
    }

    /**
     * Update a binding only the key is not not bound.
     * @param key the key
     * @return a handle to the completion of the operation
     */
    default CompletableFuture<V> putIfAbsent(K key, V value) {
        return replace(key, null, value);
    }

}

Implementation

The proposal is to add this SharedStateManager API as part of the Java Pulsar Client API:

  • the interfaces will be in the pulsar-client-api module
  • the implementation will be in the pulsar-client implementation module

This way the API and the implementation will be available to every Pulsar Client user and also for Pulsar IO Connectors and Pulsar Broker Protocol Handlers.

An alternative is to put it in the pulsar-adapters repository, but that would make it harder to discover the API and also it will require Pulsar IO Adapters and Broker Protocol Handlers to bundle copies of this new API into the .nar files.

The SharedStateManager holds in memory a reference to a Java object, that represents the State.
There is a non-partitioned Pulsar topic that stores all the changes on the Java object.

In order to update the State the local SharedStateManager performs these steps:

  • Acquire an Exclusive producer on the topic
  • Read fully the topic (using the Reader API)
  • Write to the Topic each Mutation
  • Apply the changes to the local copy of the Object

When you are reading the State you have two ways:

  • Read the local value
  • Read fully the topic (using the Reader API) and then read the local value

If you want to ensure strong consistency you perform a "read" operation together with a dummy write operation, so inside the implicit Lock acquired by the Exclusive producer.

at bootstrap we read fully the topic (from the beginning to the tail) in order to build the State.
We do not want to require to the Client application to store locally the State.

This sample PulsarMap implementation, describes how to use the SharedStateManager:

public class PulsarMapImpl <K,V> implements PulsarMap<K,V> {
    private static ObjectMapper mapper = new ObjectMapper();
    private final PulsarSharedStateManager<Map<K,V>, MapOp> stateManager;
    private final SerDe<K> keySerDe;
    private final SerDe<V> valueSerDe;

    public PulsarMapImpl(PulsarSharedStateManager.PulsarSharedStateManagerBuilder builder,
                         SerDe<K> keySerDe,
                         SerDe<V> valueSerDe
    ) {
        this.keySerDe = keySerDe;
        this.valueSerDe = valueSerDe;
        this.stateManager = builder
                .withOpSerializer(this::serializeOp)
                .withOpDeserializer(this::deserializeOp)
                .withDatabaseInitializer(() -> new ConcurrentHashMap<K,V>())
                .<Map<K,V>, MapOp>withChangeLogApplier(this::applyOp)
                .build();
    }

    @Override
    public CompletableFuture<V> getOrDefault(K key, V defaultValue, boolean latest) {
        return stateManager.read(map -> map.getOrDefault(key, defaultValue), latest);
    }

    @Override
    public CompletableFuture<?> scan(Function<K, Boolean> filter, BiConsumer<K, V> processor, boolean latest) {
        return stateManager.read(map -> {
            map.forEach((k,v) -> {
                if (filter.apply(k)) {
                    processor.accept(k, v);
                }
            });
            return null;
        }, latest);
    }

    @Override
    public CompletableFuture<V> update(K key, BiFunction<K, V, V> operation) {
        return stateManager.write(map -> {
            V currentValue = map.get(key);
            V finalValue = operation.apply(key, currentValue);
            if (finalValue == null) {
                return Collections.singletonList(MapOp.DELETE(key));
            } else {
                return Collections.singletonList(MapOp.PUT(key, finalValue));
            }
        }, map -> map.get(key));
    }

    @Override
    public CompletableFuture<?> updateMultiple(Function<K, Boolean> filter, BiFunction<K, V, V> operation) {
        return stateManager.write(map -> {
            List<MapOp> updates = new ArrayList<>();
            map.forEach((key, currentValue) -> {
                V finalValue = operation.apply(key, currentValue);
                if (finalValue == null) {
                    updates.add(MapOp.DELETE(key));
                } else {
                    updates.add(MapOp.PUT(key, finalValue));
                }
            });
            return updates;
        }, null);
    }

    @Override
    public CompletableFuture<?> clear() {
        return stateManager.write(map -> {
            return Collections.singletonList(MapOp.CLEAR());
        }, Function.identity());
    }

    @AllArgsConstructor
    @Data
    private static class MapOp<K,V> {
        private final static int TYPE_CLEAR = 0;
        private final static int TYPE_PUT =  1;
        private final static int TYPE_DELETE =  2;
        private final int type;
        private final K key;
        private final V value;

        static <K,V> MapOp<K,V> CLEAR() {
            return new MapOp(TYPE_CLEAR, null, null);
        }
        static <K,V> MapOp<K,V> PUT(K key, V value) {
            return new MapOp(TYPE_PUT, key, value);
        }
        static <K,V> MapOp<K,V> DELETE(K key) {
            return new MapOp(TYPE_DELETE, key, null);
        }
    }

    public <K,V> void applyOp(Map<K, V> map, MapOp<K,V> op) {
        switch (op.getType()) {
            case MapOp.TYPE_CLEAR:
                map.clear();
                break;
            case MapOp.TYPE_PUT:
                map.put(op.getKey(), op.getValue());
                break;
            case MapOp.TYPE_DELETE:
                map.remove(op.getKey());
                break;
            default:
                log.warn("Ignore MapOp {} on {}", op, this);
                break;
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    private static final class SerializedMapOp {
        private int type;
        private byte[] key;
        private byte[] value;
    }

    private byte[] serializeOp(MapOp<K,V> op) {
        try {
            SerializedMapOp ser = new SerializedMapOp(op.getType(),
                    op.getKey() != null ? keySerDe.serialize(op.getKey()) : null,
                    op.getValue() != null ? valueSerDe.serialize(op.getValue()) : null);
            return mapper.writeValueAsBytes(ser);
        } catch (IOException err) {
            throw new RuntimeException(err);
        }
    }

    private MapOp<K,V> deserializeOp(byte[] value) {
        try {
            SerializedMapOp ser =  mapper.readValue(value, SerializedMapOp.class);
            return new MapOp<K, V>(ser.getType(),
                    ser.getKey() != null ? keySerDe.deserialize(ser.getKey()) : null,
                    ser.getValue() != null ? valueSerDe.deserialize(ser.getValue()) : null);
        } catch (IOException err) {
            throw new RuntimeException(err);
        }
    }

    @Override
    public void close() {
        stateManager.close();
    }
}

Future works and other considerations

Depending on the implementation of the Shared State (this is up to the developer, so the user of the new API) you need to set infinite retention on the support topic, otherwise you may lose some changes from the commit log.

Pulsar is very flexible and initially we can let the user configure properly the system, this is because we want to provide the basic API to easily build a Shared State Manager, using the Exclusive Producer API together with the Reader API.

In the future we can implement more advanced features like making checkpoints or leveraging compacted topics, but this can be done as a follow up work.

Reject Alternatives

None

@eolivelli eolivelli self-assigned this Dec 24, 2021
@eolivelli eolivelli changed the title PIP-124: Pulsar Client Shared State API (draft) Dec 24, 2021
@RobertIndie
Copy link
Member

This PIP number is duplicated with #13408

@eolivelli
Copy link
Contributor Author

@mattisonchao yes you could, but most of the code is already in the attached github repo.

@eolivelli
Copy link
Contributor Author

we talked in the Community meeting about this PIP.

The main concern from @merlimat is the exposing a "put" operation will give a false illusion that you have something like a Map, but actually every "write" operation will be "slow" because we need to acquire the Producer lock.

@merlimat suggested to split this into two distinct interfaces: a API for readers and one for writers, possibly extending the work done on PIP-104 TableViews (#12356).

I am fine with extending PIP-104 in that direction (to add a writer side of the TableView), but I believe that the API proposed here is going to give a API to achieve the goal of having a generic "Java object" as share state, by letting the developer of the Shared State Object deal with the internal representation.
Using the TableView API will forces users to represent their Java object as a set of properties or a single object with one fixed key. It is feasible, but not straightforward from my point of view.
I really would like to mimic Pravega.io StateSynchronizer support, that does not force you to see your topic as a "table".

@merlimat
I will work in parallel on extending the TableView API to add writer support, I am sure it will bring a big value to the users.
But I still thing that we should add this "Recipe" to the Pulsar Adapters repository

@merlimat WDTY ?

@eolivelli eolivelli changed the title PIP-124: Pulsar Client Shared State API Jan 21, 2022
@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment