Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP-160] Make transactions work more efficiently by aggregation operation for transaction log and pending ack store #15370

Closed
poorbarcode opened this issue Apr 28, 2022 · 6 comments
Assignees

Comments

@poorbarcode
Copy link
Contributor

poorbarcode commented Apr 28, 2022

Motivation

Transaction coordinator and Pending Ack Store are two core components in Pulsar transactions. For how they work in Pulsar transactions, see PIP-31: Transactional Streaming

Both transaction coordinator and pending ack store use the managed ledger to record data.The transaction coordinator records the transaction metadata in a managed ledger(aka transaction log) to achieve transaction durability. And the pending ack store records all the message ack operations to guarantee the durability of message ack operations.

Currently, the transaction coordinator will append a new entry to the transaction log for each transaction metadata/state change. For example, for the life cycle of a transaction, both of the open transaction, add produced partition to a transaction, add consumed subscription to a transaction, and commit the transaction will append to the transaction log. If there are many parallel transactions, the number of entries added to the transaction log will also increase.

The pending ack store is similar to the transaction coordinator. Too many parallel message acks will add more entries to the managed ledger used by the pending ack store.

So this proposal tries to make the transaction work more efficiently by adding aggregation operations for transaction log and pending ack store.

Goal

Provide a mechanism allowing the Transaction Log Store and Pending Ack Store to aggregate multiple records into a batched record and persist into a single BK entry. This will make Pulsar transactions work more efficiently.

  • Reduce the number of entries written to Bookies
  • Reduce the entry indexes size of Bookie
  • Reduce the number of entries read from the bookie

The new mechanism will provide the following ability to control the batch.

Adjustable thresholds: trigger BookKeeper-write when reaching any one of the following conditions

  • Max size (bytes)
  • Max records count
  • Max delay time
  • Allow users to enable or disable batch-write.
  • Dynamic enable or disable the feature.

Approach

Aggregate multiple records and write to Bookies

We will create a Container(aka TxLogBufferedWriter ) to buffer requests and flush to Managed Ledger. Transaction Log Store And Pending Ack Store will no longer write to Managed Ledger directly, Change to using TxLogBufferedWriter to write Ledger data. The TxLogBufferedWriter caches “write requests” for a certain number(or a certain size of request data) and then writes them to the Managed Ledger in one go. After Managed Ledger has written complete, The TxLogBufferedWriter responds to each request-caller. In this process, Managed Ledger doesn't care how many records(or what to be written) in the Entry, it just treats them as a single block of data.

The first write-request by transaction components that write to TxLogBufferedWriter will take a long time to receive a response because The TxLogBufferedWriter must wait for subsequent requests to accumulate enough data to actually start writing to the Managed Ledger. To control the maximum latency, The TxLogBufferedWriter will mark the first request time for each batch, and additional timing triggers writes.

The TxLogBufferedWriter does not guarantee that logs in the batch are all from the same transaction. Instead, it does not matter which transaction the request belongs to. One batch corresponds to one Managed Ledger addEntryOp, so the Entry looks like the below:
截屏2022-06-16 22 35 33

When Transaction completes(committed or aborted), all Transaction log records and Pending ack log records of this transaction will be deleted(marked acknowledgment). To ensure that users can delete transaction records accurately, So when callback to “write request”, The TxLogBufferedWriter should tell “position(Entry position)”, “how many requests there are in a batch( aka batchSize )” and “the location of that record inside the batch (aka batchIndex)” to the caller.

After enabling this feature, some data is non-batch, and some data is batch which is in Ledger. When batch-data is read, it needs to specifically parse the data that Transaction Components can process. So we append a magic num in front of the Batched Log Data to confirm that it is a batched data and to ensure the future scalability, we will add a version identifier in front of the original Batched Log Data, just like this:

// combine multi records using protobuf record (defined below)
[Record0, Record1, … RecordN ] ⇒ [Records]
// append magic num and version at front of Batched Log (outside protobuf serialization)
// Magic Num 2 bytes
// Version 2 bytes
[Magic Num] [Version] [Records] ⇒ [Entry]

e.g, Aggregate multiple records and write to Bookies

Request: 
    → [Log 1]
    → [Log 2]
    → …
    → [Log N]
Aggregate to one:
   → [Log1, Log2 … LogN ] ⇒ [Batched Log]
   → [Magic Num] [version] [Batched Log] ⇒ [Entry]
 Managed Ledger  async write Entry:
   → async write [Entry]
   → callback: {ledgerId=1, entryId=1}
Request-Callback:
   → Callback 1: {ledgerId=1, entryId=1, batchSize=N, batchIndex=0}
   → Callback 2: {ledgerId=1, entryId=1, batchSize=N, batchIndex=1}
   → …
   → Callback N: {ledgerId=1, entryId=1, batchSize=N, batchIndex=N-1}

—---------------------
Transaction Log Store Logs and Pending Ack Store logs are stored in different ledgers, so they each hold their own TxLogBufferedWriter objects.

Acknowledge entry with the aggregated records

In the previous section, we described how to merge multiple logs into a single block and write them to the Bookie. We also described how log-records from multiple transactions are mixed into the same Entry, but each log-record deleted (acknowledged) at a different time (because each transaction ends at a different time). It is possible that some transactions have been completed for a long time, and some transactions are still in progress. In this case, a mechanism is needed to mark each log in the (now batched) Entry as invalid. When all records are invalid, the Entry can be marked for deletion.

We can use an existing mechanism(PIP 45 supported batch index delete) to solve this problem. As described in the previous section, after writing to the transaction log, the transaction component is told two properties to indicate the specific location of the log in the Entry: batchIndex and batchSize. Transaction Components could manage the mapping of transaction-id and log-position after the data has been written(Yes, That's what it's doing now at ​​MLTransactionLogImpl, the difference is that the code does not manage the batchSize and batchIndex. The MLPendingAckStore uses a more subtle mechanism to maintain log-position). We can rely on features provided by PIP-45 to identify which logs are ready to be deleted, and ultimately to delete the whole Entry. When the topic reloads or the broker crashes, Transaction Components can read logs from the ledger to rebuild The Mapping, so we do not worry about state loss (in fact, that's what it's doing now).

Note: TxLogBufferedWriter does not support reading and deleting data. Transaction Components should control the data reading and deleting by itself.

Changes

API Changes in CmdTransactions

Enable/disable transaction coordinator batch log

pulsar-admin transactions –set-txn-tc-batch-log –enable [boolean]

Get transaction coordinator batch log stat

pulsar-admin transactions –get-txn-tc-batch-log-stat

Response like this:

{
  "txnTcBatchLogEnabled": true,
  "coodinatorArray": [{
    "1": true
   },
  {
    "2": true
  }]
}

Note: If the broker has just restarted and no messages have been processed using transaction, $.coodinatorArray will be empty.

Enable/disable transaction pending ack batch log

pulsar-admin transactions –set-txn-pending-ack-batch-log –enable [boolean]

Get transaction pending ack batch log stat

pulsar-admin transactions –get-txn-pending-ack-batch-log-stat

Response like this:

{
  "txnPendingAckBatchLogEnabled": true,
  "subscriptionArray": [{
    "my-topic-my-subscription-1": true
   },
  {
    "my-topic-my-subscription-2": true
  }]
}

Note: If the broker has just restarted and no messages have been processed using transaction, $.subscriptionArray will be empty.

Configuration Changes

broker.conf

transactionLogBatchedWriteEnabled = false;
transactionLogBatchedWriteMaxRecords= 512;
transactionLogBatchedWriteMaxSize= 1024 * 1024 * 4;
transactionLogBatchedWriteMaxDelayInMillis= 1;
     
transactionPendingAckBatchedWriteEnabled = false;
transactionPendingAckBatchedWriteMaxRecords= 512;
transactionPendingAckBatchedWriteMaxSize= 1024 * 1024 * 4;
transactionPendingAckBatchedWriteMaxDelayInMillis= 1;

Protocol Changes

PulsarTransactionMetadata.proto
New protobuf record to aggregate multi TransactionMetadataEntry records, add to an existing file PulsarTransactionMetadata.proto. After serialization, Magic Num and Version are prefixed with 2 bytes each, and finally written to Bookie.

message BatchedTransactionMetadataEntry{
  // Array for buffer transaction log data.
  repeated TransactionMetadataEntry transaction_logs = 1;
}

TransactionPendingAck.proto
New protobuf record to aggregate multi PendingAckMetadataEntry records, add to an existing file TransactionPendingAck.proto. After serialization, Magic Num and Version are prefixed with 2 bytes each, and finally written to Bookie.

message BatchedPendingAckMetadataEntry{
  // Array for buffer pending ack data.
  repeated PendingAckMetadataEntry pending_ack_logs = 1;
}

Metrics Changes

Pulsar transaction

When the batch feature is enabled, the Configuration needs to be adjusted to optimize performance. We need to provide many metrics to help users understand the current state:
The percentage of the triggering actions of each threshold( The definition of thresholds is explained in Goal

  • How many logs are in each batch.
  • Bytes size of logs per batched log.
  • The time of the oldest record was spent in the buffer before being sent.

Here are the new metrics we want to add:

Name Type Description Label
pulsar_txn_tc_log_records_count_per_entry Histogram count of records in per transaction log batch.
Available bucket:
  le="10" number of  transaction logs per batch to be processed between (0, 10]
  le="50" number of  transaction logs per batch to be processed between (10, 50]
  le="100" number of  transaction logs per batch to be processed between (50, 100]
  le="200" number of  transaction logs per batch to be processed between (100, 200]
  le="500" number of  transaction logs per batch to be processed between (100, 500]
  le="1000" number of  transaction logs per batch to be processed between (500, 1000]
  le="overflow" number of  transaction logs per batch to be processed between (1000, ∞ ]
clustercoordinator_id
pulsar_txn_tc_batched_log_entry_size_bytes Histogram The added entry size of a ledger with a given bucket.
  Available bucket:
  le="0_128" is EntrySize between (0byte, 128byte]
  le="128_512" is EntrySize between (128byte, 512byte]
  le="512_1024" is EntrySize between (512byte, 1KB]
  le="1024_2048" is EntrySize between (1KB, 2KB]
  le="2048_4096" is EntrySize between (2KB, 4KB]le="4096_16384" is EntrySize between (4KB, 16KB]
  le="16384_12400" is EntrySize between (16KB, 100KB]
  le="12400_1232896" is EntrySize between (100KB, 1MB]
  le="overflow" is EntrySize between (1MB, ∞ ]
clustercoordinator_id
pulsar_txn_tc_batched_log_olderst_record_delay_time_seconds Histogram The time of the oldest transaction log spent in the buffer before being sent.
Available bucket:
  le="1" time of the oldest transaction log spent in the buffer before being sent between (0s, 0.001s]
  le="5" time of the oldest transaction log spent in the buffer before being sent between (0.001s, 0.005s]
  le="10" time of the oldest transaction log spent in the buffer before being sent between (0.001s, 0.01s]
  le="overflow" number of  transaction logs per batch to be processed between (0.01s, ∞ ]
clustercoordinator_id
pulsar_txn_tc_batched_log_triggering_count_by_records Counter The count of the triggering transaction log batch flush actions by ${transactionLogBatchedWriteMaxRecords} clustercoordinator_id
pulsar_txn_tc_batched_log_triggering_count_by_size Counter The count of the triggering transaction log batch flush actions by ${transactionLogBatchedWriteMaxSize} clustercoordinator_id
pulsar_txn_tc_batched_log_triggering_count_by_delay_time Counter The count of the triggering transaction log batch flush actions by ${transactionLogBatchedWriteMaxDelayInMillis} clustercoordinator_id
pulsar_pending_ack_batched_log_records_count_per_entry Histogram count of records in per pending ack log batch.
Available bucket:
  le="10" number of  pending ack logs per batch to be processed between (0, 10]
  le="50" number of  pending ack logs per batch to be processed between (10, 50]
  le="100" number of  pending ack logs per batch to be processed between (50, 100]
  le="200" number of  transaction logs per batch to be processed between (100, 200]
  le="500" number of  pending ack logs per batch to be processed between (100, 500]
  le="1000" number of  pending ack logs per batch to be processed between (500, 1000]
  le="overflow" number of  pending ack logs per batch to be processed between (1000, ∞ ]
cluster
pulsar_pending_ack_batched_log_entry_size_bytes   The added entry size of a ledger with a given bucket.
Available bucket:
  le="0_128" is EntrySize between (0byte, 128byte]
  le="128_512" is EntrySize between (128byte, 512byte]
  le="512_1024" is EntrySize between (512byte, 1KB]
  le="1024_2048" is EntrySize between (1KB, 2KB]
  le="2048_4096" is EntrySize between (2KB, 4KB]
  le="4096_16384" is EntrySize between (4KB, 16KB]
  le="16384_12400" is EntrySize between (16KB, 100KB]
  le="12400_1232896" is EntrySize between (100KB, 1MB]
  le="overflow" is EntrySize between (1MB, ∞ ]
 
pulsar_pending_ack_batched_log_olderst_record_delay_time_seconds Histogram The time of the oldest pending ack log spent in the buffer before being sent.
Available bucket:
  le="1" time of the oldest pending ack log spent in the buffer before being sent between (0s, 0.001s]
  le="5" time of the oldest pending ack log spent in the buffer before being sent between (0.001s, 0.005s]
  le="10" time of the oldest pending ack log spent in the buffer before being sent between (0.005s, 0.01s]
  le="overflow" number of  pending ack logs per batch to be processed between (0.005s, ∞ ]
cluster
pulsar_pending_ack_batched_log_triggering_count_by_records Counter The count of the triggering pending ack log batch flush actions by ${transactionPendingAckBatchedWriteMaxRecords} cluster
pulsar_pending_ack_batched_log_triggering_count_by_size Counter The count of the triggering pending ack log batch flush actions by ${transactionPendingAckBatchedWriteMaxSize} cluster
pulsar_pending_ack_batched_log_triggering_count_by_delay_time Counter The count of the triggering pending ack log batch flush actions by ${transactionPendingAckBatchedWriteMaxDelayInMillis} cluster

Label Description

  • cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name that you have configured in the broker.conf file.
  • coordinator_id: coordinator_id=${coordinator_id}. ${coordinator_id} is the transaction coordinator id.

Compatibility

This feature is not forward compatible because the new feature writes new data structures to ledger that the old version of the broker could not parse. To ensure that users can upgrade and downgrade broker versions smoothly, we provide the following sections:

Upgrade

After this feature is released, users can upgrade to the new version (since this feature is turned off by default). Enabling this feature will write new data to Bookie that the old version of the Broker cannot resolve, so do not enable this feature if any nodes in the cluster have not been upgraded to the new version. If you want to use this feature, the correct steps would look like this:
Rolling upgrade broker nodes to the new version, keep batch-log feature disabled.

  1. Waiting for all broker nodes have been upgraded finish.
  2. Rolling enable the batch-log feature of all brokers. see: Enable/disable transaction coordinator batch log and Enable/disable transaction pending ack batch log.
  3. Waiting for all broker nodes have been enabled batch-log feature.
  4. Ensure all transaction coordinators and all pending ack stores has been enabled the feature. see: Get transaction coordinator batch log stat and Get transaction pending ack batch log stat

Downgrade

After the batch feature is enabled, if users need to use an older version of the broker, please strictly follow this step(the old version of the broker cannot recognize the new data with new format):

  1. Disable the Batch Transaction Log feature for all broker nodes.
  1. Wait for all batch logs to be invalidated(this process will be quick, depending on transaction Max timeout and managed ledger max rollover time).
    2-1. lookup coordinator id list
 ./pulsar-admin topics partitioned-lookup persistent://pulsar/system/transaction_coordinator_assign

Response like this:

persistent://pulsar/system/transaction_coordinator_assign-partition-0    pulsar://127.0.0.1:6650
persistent://pulsar/system/transaction_coordinator_assign-partition-1    pulsar://127.0.0.1:6650
persistent://pulsar/system/transaction_coordinator_assign-partition-2    pulsar://127.0.0.1:6650
persistent://pulsar/system/transaction_coordinator_assign-partition-3    pulsar://127.0.0.1:6650

In this response, 0, 1, 2, and 3 are all coordinator ids

2-2. View the ledger list of the current transaction meta log

./pulsar-admin transactions coordinator-internal-stats --coordinator-id [coordinator-id] 

The properties of $.transactionLogStats.ledgers of Response like this:

    [{
        "ledgerId" : 48,
        "entries" : 10,
        "size" : 20,
        "offloaded" : false,
        "underReplicated" : false
      },{
        "ledgerId" : 49,
        "entries" : 10,
        "size" : 20,
        "offloaded" : false,
        "underReplicated" : false
      }]

Note-1: The smallest ledgerId of ledgers means the oldest transaction log ledger which has not been deleted (aka OldestExistsTransactionLogLedger)

Note-2: When query transaction stats immediately after disabled this feature, The largest ledgerId of ledgers means the last batched transaction log ledger (aka LastBatchedTransactionLogLedger)

2-3. Perform step 2-2 for all coordinator ids and remember all LastBatchedTransactionLogLedger.

2-4. Find all subscription names using the transaction feature.

List all topics in the specified namespace.

./pulsar-admin namespaces topics public/default

Response like this:

persistent://public/default/__change_events
persistent://public/default/__transaction_buffer_snapshot
persistent://public/default/my-topic
persistent://public/default/my-topic-my-subscription__transaction_pending_ack

The topic name ending with _pending_ack is actually created by the subscription using transaction feature, this topic named by rule:

 {target topic name}-{subscription-name}__transaction_pending_ack

2-5. View the ledger list of the current pending ack log

./pulsar-admin transactions pending-ack-internal-stats --topic [topic] --sub-name [subscription name]

The properties of $.pendingAckLogStats.ledgers of Response like this:

 [{
    "ledgerId" : 79,
    "entries" : 10,
    "size" : 20,
    "offloaded" : false,
    "underReplicated" : false
  },{
    "ledgerId" : 80,
    "entries" : 10,
    "size" : 20,
    "offloaded" : false,
    "underReplicated" : false
  }]

Note-1: The smallest ledgerId of ledgers means the oldest pending ack log ledger which has not been deleted (aka OldestExistsPendingAckLogLedger)

Note-2: When query transaction stats immediately after disabled this feature, The largest ledgerId of ledgers means the last batched transaction log ledger (aka LastBatchedPendingAckLogLedger)

2-6. Perform step 2-5 for all subscriptions which use transactions feature and remember all LastBatchedPendingAckLogLedger.

2-7. Wait until OldestExistsTransactionLogLedger is greater than LastBatchedTransactionLogLedger already remember, and Wait until OldestExistsPendingAckLogLedger is greater than LastBatchedPendingAckLogLedger already remember. That means all batch logs to be invalidated.

  1. Downgrade all broker nodes to the previous version.

Test plan

The test should cover the following cases:

  • Aggregate multiple records and write to Bookies is correct, and the returned position, batchSize, batchIndex for writing data is correct.
  • The batch mechanism works abides by the total count, total size, and max delay limitation.
  • Delete Batched Transaction Log is correct after transaction end.
  • Metrics data is correct.
  • Performance tests and compare before-after improvement.
@poorbarcode poorbarcode changed the title Add batched ManagedLedger Apr 28, 2022
@poorbarcode poorbarcode changed the title [PIP] Add batched ManagedLedger Apr 29, 2022
@poorbarcode poorbarcode changed the title [PIP-160] Add batched ManagedLedger May 9, 2022
@poorbarcode poorbarcode changed the title [PIP-160] ManagedLedger decorator for batch append enties Jun 10, 2022
@poorbarcode
Copy link
Contributor Author

poorbarcode commented Jun 10, 2022

@poorbarcode poorbarcode changed the title [PIP-160] Batch writing ledger for transaction operation Jun 16, 2022
@eolivelli
Copy link
Contributor

@codelipenghui
Copy link
Contributor

message BatchedTransactionMetadataEntry{
// Array for buffer transaction log data.
repeated TransactionMetadataEntry transaction_log = 1;
}

message BatchedPendingAckMetadataEntry{
// Array for buffer pending ack data.
repeated PendingAckMetadataEntry pending_ack_log=1;
}

we should use plural names for the repeated fields such as entries or records.

@poorbarcode
Copy link
Contributor Author

We should use plural names for the repeated fields such as entries or records.

OK

liangyepianzhou pushed a commit that referenced this issue Jul 17, 2022
…for transaction batch log (#16617)

### Modifications

I will complete proposal #15370 with these pull requests( *current pull request is the step-2* ): 

1. Write the batch transaction log handler: `TxnLogBufferedWriter`
2. Configuration changes and protocol changes.
3. `MLPendingAckStore` and `MLTransactionLogImpl` support reading of batched logs.
4. `MLPendingAckStore` and `MLTransactionLogImpl` support the writing of batched logs and support dynamic configuration.
5. Append admin API for transaction batch log and docs( admin and configuration doc ).
6. Append metrics support for transaction batch log.
congbobo184 pushed a commit that referenced this issue Jul 20, 2022
… leads to an avalanche (#16679)

Master Issue: #15370

### Motivation

see #15370

### Modifications

####  Managed Ledger I/O thread gets busier and busier.

In origin implementation, `TxnLogBufferedWriter` has two thread pools:

- `ExecutorService singleThreadExecutorForWrite` is A single-threaded actuator, used to perform Managed Ledger I/O operations. Includes the following tasks:
  - `internalAsyncAddData` Each execution of the `asyncAddData` adds a task.
  - `doFlush` The execution of `trigFlush` sometimes add a task, and there is also a scheduled task that adds tasks.
- `ScheduledExecutorService scheduledExecutorService` is used to periodically add `doFlush` tasks to the `singleThreadExecutorForWrite`, whether the `singleThreadExecutorForWrite` is idle or not.

If `singleThreadExecutorForWrite` is busy for a period of time, `scheduledExecutorService` will keep adding `doFlush` tasks during that period. Then `singleThreadExecutorForWrite` will be busy with the newly added `doFlush` tasks and allocate fewer resources to `internalAsyncAddData` while `scheduledExecutorService` is adding `doFlush` tasks, which will cause `singleThreadExecutorForWrite` to accumulate more and more tasks, even if that the `doFlush` task appended by `scheduledExecutorService` possible only triggers the check and does nothing else. The net result is that `singleThreadExecutorForWrite` gets busier and busier.

#### Imprecise timing task
If new data is added immediately after a scheduled task is executed, the data cannot be flushed in the next scheduled task. Aka. we set the max delay time to 1 min, the scheduled tasks and new data tasks are executed in the following sequence: 

```
1. scheduled task running at 12:00:00.000
2. scheduled task running at 12:00:01.000
3. add data at 12:00:01.005
4. scheduled task running at 12:00:02.000
```
In the step-4, the flush task will not flush the data to the bookie, because the result of expression `System.currentTimeMillis() - firstAsyncAddArgs.addedTime >= batchedWriteMaxDelayInMillis` is `false`, this data will actually flushed at next scheduled task `12:00:03.000`


### Changes:  

####  Managed Ledger I/O thread gets busier and busier.

Since all C tasks are executed in the same thread, we can change that "after a scheduled task is executed, then add the next one". This reduces the density of `doFlush trig by timer` task execution, thereby somewhat losing timing accuracy (the original implementation was not completely accurate either).

This change can only avoid the task accumulation in `TxnLogBufferedWriter` caused by too many scheduled tasks, but cannot improve the write performance of Managed Ledger.

#### Imprecise timing task
Flush triggered by the scheduled task no longer determines whether the time of the first node reaches the condition. To avoid imprecise scheduled task execution time, the maximum delay time is still checked and flushed in the Flush task triggered by the `asyncAddData`.
congbobo184 pushed a commit that referenced this issue Jul 22, 2022
…ure (#16685)

Master Issue: #15370

### Motivation

see #15370

### Modifications

I will complete proposal #15370 with these pull requests( *current pull request is the step-3* ): 

1. Write the batch transaction log handler: `TxnLogBufferedWriter`
2. Configuration changes and protocol changes.
3. Transaction log store enables the batch feature.
4. Pending ack log store enables the batch feature.
5. Supports dynamic configuration.
6. Append admin API for transaction batch log and docs( admin and configuration doc ).
7. Append metrics support for transaction batch log.
congbobo184 pushed a commit that referenced this issue Jul 27, 2022
…re (#16707)

Master Issue: #15370

### Motivation

see #15370

### Modifications

I will complete proposal #15370 with these pull requests( *current pull request is the step-4* ): 

1. Write the batch transaction log handler: `TxnLogBufferedWriter`
2. Configuration changes and protocol changes.
3. Transaction log store enables the batch feature.
4. Pending ack log store enables the batch feature.
5. Supports dynamic configuration.
6. Append admin API for transaction batch log and docs( admin and configuration doc ).
7. Append metrics support for transaction batch log.
@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Jul 31, 2022
Gleiphir2769 pushed a commit to Gleiphir2769/pulsar that referenced this issue Aug 4, 2022
…re (apache#16707)

Master Issue: apache#15370

### Motivation

see apache#15370

### Modifications

I will complete proposal apache#15370 with these pull requests( *current pull request is the step-4* ): 

1. Write the batch transaction log handler: `TxnLogBufferedWriter`
2. Configuration changes and protocol changes.
3. Transaction log store enables the batch feature.
4. Pending ack log store enables the batch feature.
5. Supports dynamic configuration.
6. Append admin API for transaction batch log and docs( admin and configuration doc ).
7. Append metrics support for transaction batch log.
congbobo184 pushed a commit that referenced this issue Sep 14, 2022
…16758)

Master Issue: #15370

### Motivation

see #15370

### Modifications

I will complete proposal #15370 with these pull requests( *current pull request is a part of step 7-1* ): 

1. Write the batch transaction log handler: `TxnLogBufferedWriter`
2. Configuration changes and protocol changes.
3. Transaction log store enables the batch feature.
4. Pending ack log store enables the batch feature.
5. Supports dynamic configuration.
6. Append admin API for transaction batch log and docs( admin and configuration doc ).
   GET /admin/v3/transactions/coordinatorStats
   GET /admin/v3/transactions/pendingAckStats/:tenant/:namespace:/:topic:/:subName
7. Append metrics support for transaction batch log.
  7-1. Metrics of Txn Buffered Writer.
  7-2. `TransactionLog` and `PendingAckStore` enables the Metrics of Txn Buffered Writer

----

### The desired effect

`TransactionLog` should create `TxnLogBufferedWriter` with params: 

```JSON
{
  "metricsPrefix": "pulsar_txn_tc",
  "labelNames": "coordinatorId",
  "labelValues": "1"
}
```

The  metrics output of `TransactionLog` will like this: 

```
# A metrics for how many batches were triggered due to threshold "batchedWriteMaxRecords".
# TYPE pulsar_txn_tc_batched_log_batched_log_triggering_count_by_records Counter
pulsar_txn_tc_batched_log_batched_log_triggering_count_by_records{coordinatorId="1"} 15
...
...
...
# pulsar_txn_tc_batched_log_records_count_per_entry A metrics for how many records in per batch written by the component[pulsar_txn_tc] per batch.
# TYPE pulsar_txn_tc_batched_log_records_count_per_entry Histogram
pulsar_txn_tc_batched_log_records_count_per_entry_bucket{coordinatorId="1", le="10"} 1
pulsar_txn_tc_batched_log_records_count_per_entry_bucket{coordinatorId="1", le="50"} 3
pulsar_txn_tc_batched_log_records_count_per_entry_bucket{coordinatorId="1", le="100"} 5
pulsar_txn_tc_batched_log_records_count_per_entry_bucket{coordinatorId="1", le="500"} 10
pulsar_txn_tc_batched_log_records_count_per_entry_bucket{coordinatorId="1", le="1000"} 10
pulsar_txn_tc_batched_log_records_count_per_entry_bucket{coordinatorId="1", le="+Inf"} 10
pulsar_txn_tc_batched_log_records_count_per_entry_count{coordinatorId="1", le="+Inf"} 10
pulsar_txn_tc_batched_log_records_count_per_entry_sum{coordinatorId="1", le="+Inf"} 5432
```

`PendingAckStore` is the same. But all the PendingackStores will not differentiate the Subscription labels (because there are too many)

----

### Manage the registered collectors ourselves.

To build Metrics Stat, we need to execute these two steps:
1. Create `Collector` and register to `CollectorRegistry`, perhaps the Collector is `Histogram` or `Counter` 
2. Register labels to `Collector` and get `Collector.child`(holds by Metrics Stat). This step can also be omitted because we can execute `collector.labels(labelValues)` to get `Collector.child`. 

In the Transaction log scenario, multiple Transaction Logs share the same `Collector`, and each has its own `Collector.Child`, so when we build metrics stat for each Transaction Log, we call `collector.labels(labelValues)` to get the `Collector.Child`. However, the CollectorRegistry does not provide an API like this:

```java
public Collector getRegistedCollector(String name);
```

and it will throw IllegalArgumentException when we registering collector with the same name more than once, see:


https://github.com/prometheus/client_java/blob/1966186deaaa83aec496d88ff604a90795bad688/simpleclient/src/main/java/io/prometheus/client/CollectorRegistry.java#L49-L65

So we have to manage the registered collectors ourselves.

----

#### Holds the `Collector.child` by each Metrics stat instance

To save the overhead of `collector.labels(labelValues)`, we make each Metrics Stat hold a reference of `Collector.child`, because this method is not light enough:

https://github.com/prometheus/client_java/blob/1966186deaaa83aec496d88ff604a90795bad688/simpleclient/src/main/java/io/prometheus/client/SimpleCollector.java#L63-L80

----

#### Code will be removed in the next PR (7-2)

In the complete design, we should have two implementations like UML blow, one for enabling the batch feature, and another for disabled:

![uml](https://user-images.githubusercontent.com/25195800/182418488-6469a38f-a96c-44e9-8ee6-01273b58b0cd.jpeg)


To reduce later maintenance costs, I'd like to ditch the 'DisabledMetricsStat' and we'll always use the implementation 'MetricsStatimpl' even if the Txn buffer writer disables the batch feature. This constructor without 'param-metricsStats' and these' null checks' will be removed in the next PR. This is compatible only with split PR, making each PR have less code

### Documentation


- [ ] `doc-required` 
  
- [x] `doc-not-needed` 
  
- [ ] `doc` 

- [ ] `doc-complete`
congbobo184 pushed a commit that referenced this issue Sep 29, 2022
…trics (#17701)

Master Issue: #15370

### Modifications

- Make transaction `MLTransactionMetadataStoreProvider` & `MLPendingAckStoreProvider` support buffered writer metrics.
  - Motivation: #15370

----

- Delete constructor of `TxnLogBufferedWriter` without parameter `metrics`.
  - Motivation: it is unnecessary.

---- 

- Add a default `DisabledTxnLogBufferedWriterMetricsStats` implementation.

----

- Previous PR remaining code to optimize: remove the check code `if (metrics != null)`. The motivation see:
  - Motivation: #16758 (comment)

----

- Make transaction log buffered writer only create by the `MLTransactionMetadataStoreProvider` & `MLPendingAckStoreProvider`. 
  - Motivation: #16758 (comment)

### Documentation

- [ ] `doc-required` 

- [x] `doc-not-needed` 

- [ ] `doc` 

- [ ] `doc-complete`


### Matching PR in forked repository

PR in forked repository: 

- poorbarcode#3
@poorbarcode poorbarcode modified the milestone: 3.0.0 Apr 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
3 participants