Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP178:Multiple snapshots for transaction buffer #16042

Closed
liangyepianzhou opened this issue Jun 13, 2022 · 2 comments
Closed

PIP178:Multiple snapshots for transaction buffer #16042

liangyepianzhou opened this issue Jun 13, 2022 · 2 comments

Comments

@liangyepianzhou
Copy link
Contributor

liangyepianzhou commented Jun 13, 2022

Motivation

Transaction buffer stores aborted transaction IDs to filter messages which are aborted. In order to recover, the Transaction buffer will take snapshots to store the aborted transaction IDs in the bookkeeper, but the size of aborted transaction IDs is not limited. When the size of aborted transaction IDs is bigger than the size that a bookkeeper entry can store, the Transaction buffer needs to store multiple-snapshot into multiple entries to store aborted transaction IDs.
image

Challenges

Due to compression and incomplete sending, there are some challenges to achieve multiple-snapshot.

  1. Due to broker restart, transaction buffer may only write a part of multiple snapshots.
    • eg. Transaction buffer needs to write multiple-snapshot(1,2,3). but the transaction buffer only write snapshot 1, 2,and then broker restart.

image

2. Due to compression, the new snapshot will cover the old snapshot with the same key. * eg. This will make a multiple-snapshot(1, 2 , 3) may have snapshot 1, 2 writed the second time, and snapshot writed the first time.

image

Approach

Implement

  1. change aborts from LinkedMap to ConcurrentSkipListMap.
  2. send multiple snapshots with key (topicName-1, topicName-2 .... topicName-end) and send normal snapshots with key (topicName-end).
  3. Only store maxReadPosition into the snapshots with key (topicName-end), and others are earliest.

Goal

  1. Make aborted transaction IDs be sorted by the position of the aborted marker. And then aborts will be a FIFO map.
  2. And then the new snapshot covering the old snapshot will not make an error.
  3. There always is a snapshot with the right maxReadPosition to recover.

Examples

Normal Flow

The first snapshot is taken when new a producer to send message, So there must be a snasphot with key (topicName-end) which has maxReadPosition to recover.
image

Write incompletely

When transaction IDs are sorted by the position of the aborted marker and transaction IDs have not been deleted from aborts, the txn IDs stored in snapshots are the same for the snapshot same key (Exclude key topicName-end).
image

Write incompletely and have transaction IDs been removed due to the ledger deleted

Because it is deleted in the order of the position of the aborted marker, no message will be lost when compressing with the new snapshot. There always is a valid maxReadPsoition that can be used to recover.
As you can see in the figure below, the ledger where txn025 is located has been deleted, and the corresponding txn025 have also been removed from aborts. But this does not affect the information in the snapshot.
image

Code Implement

handleSnapshot

public void handleSnapshot(TransactionBufferSnapshot snapshot) {
     PositionImpl newMaxReadPosition = PositionImpl.get(snapshot.getMaxReadPositionLedgerId(),
            snapshot.getMaxReadPositionEntryId());
    if (newMaxReadPosition.compareTo(maxReadPosition) > 0) {
        maxReadPosition = newMaxReadPosition;
    }
    if (snapshot.getAborts() != null) {
        snapshot.getAborts().forEach(abortTxnMetadata ->
                aborts.put(new TxnID(abortTxnMetadata.getTxnIdMostBits(),
                                abortTxnMetadata.getTxnIdLeastBits()),
                        PositionImpl.get(abortTxnMetadata.getLedgerId(),
                                abortTxnMetadata.getEntryId())));
    }
}

takeSnapshot

 private CompletableFuture<Void> takeSnapshot() {
        changeMaxReadPositionAndAddAbortTimes.set(0);
        return takeSnapshotWriter.thenCompose(writer -> {
            TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot();
            ArrayList<TransactionBufferSnapshot> snapshots = new ArrayList<>();
            synchronized (TopicTransactionBuffer.this) {
                List<AbortTxnMetadata> list = new ArrayList<>();
                aborts.forEach((k, v) -> {
                    AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata();
                    abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits());
                    abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits());
                    abortTxnMetadata.setLedgerId(v.getLedgerId());
                    abortTxnMetadata.setEntryId(v.getEntryId());
                    list.add(abortTxnMetadata);
                });
                while (list.size() > maxSize) {
                    List<AbortTxnMetadata> newList = new ArrayList<>();
                    while (newList.size() < maxSzie) {
                        newList.add(list.remove(0));
                    }
                    snapshot.setAborts(newList);
                    snapshot.setTopicName(topic.getName());
                    snapshots.add(snapshot);
                    snapshot = new TransactionBufferSnapshot();
                }
                snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId());
                snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId());
                snapshot.setAborts(list);
                snapshot.setTopicName(topic.getName());
                snapshots.add(snapshot);
            }
            List <CompletableFuture<Void>> completableFutures = new LinkedList<>();
            snapshots.forEach(snapshot_ -> {
                completableFutures.add(writer.writeAsync(snapshot_).thenAccept(messageId-> {
                    this.lastSnapshotTimestamps = System.currentTimeMillis();
                    if (log.isDebugEnabled()) {
                        log.debug("[{}]Transaction buffer take snapshot success! "
                                + "messageId : {}", topic.getName(), messageId);
                    }
                }).exceptionally(e -> {
                    log.warn("[{}]Transaction buffer take snapshot fail! ", topic.getName(), e);
                    return null;
                }));
            });
            return FutureUtil.waitForAll(completableFutures);

        });

Reject Alternatives

Add a snapshotEntryCounts field in TransactionBufferSnapshot

Add a snapshotEntryCounts field for each transactionBufferSnapshot. For the normal transactionBufferSnapshot, snapshotEntryCount will be set to 1; for the multiple-snapshot, snapshotEntryCount will be set to the number of entries to store the snapshot.

public class TransactionBufferSnapshot {
    private String topicName;
    private long maxReadPositionLedgerId;
    private long maxReadPositionEntryId;
    private long snapshotEntryCount;
    private List<AbortTxnMetadata> aborts;
}

marked multiple-snapshot with null field

For the multiple-snapshot, we only write the data of aborts and maxRead Position in the front entries without setting topicName . Only set topicName in the last entry. When the reader reads TopicName = null, it means the beginning of a multiple-snapshot, and read topicName! =null is the end of this multiple-snapshot.
image

API changes

interface Writer<T> {
    /**
     * Write event to the system topic.
     * @param t pulsar event
     * @param topic the topicName for the pulsar event
     * @return message id
     * @throws PulsarClientException exception while write event cause
     */
    MessageId write(T t, String Topic) throws PulsarClientException;

    /**
     * Async write event to the system topic.
     * @param t pulsar event
     * @param topic the topicName for the pulsar event
     * @return message id future
     */
    CompletableFuture<MessageId> writeAsync(T t, String topic);

Implement

TransactionBufferSnapshotWriter
public CompletableFuture<MessageId> writeAsync(TransactionBufferSnapshot transactionBufferSnapshot, String topicName) {
    return producer.newMessage().key(topicName)
            .value(transactionBufferSnapshot).sendAsync();
}
@Anonymitaet
Copy link
Member

Hi @liangyepianzhou this PIP NO. is the same to #15797 which was created 21 days ago. Would you like to choose another PIP NO.? Thank you

@liangyepianzhou liangyepianzhou changed the title PIP176:Multiple snapshots for transaction buffer Jun 16, 2022
@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
2 participants