Skip to content

Commit

Permalink
[feat][broker] Segmented transaction buffer snapshot segment and inde…
Browse files Browse the repository at this point in the history
…x system topic (#16931)

Master Issue: #16913
### Motivation
Implement system topic client for snapshot segment topic and index topic to send segment snapshots or indexes.
The configuration `transactionBufferSegmentedSnapshotEnabled` is used in the Transaction Buffer to determine which `AbortedTxnProcessor` is adopted by this TB.
### Modification

In the new implementation of the Transaction Buffer Snapshot System topic, because the system topic that needs to be processed has changed from the original one to three with different schemes, we have added generics to the TransactionBufferSnapshotBaseSystemTopicClient class and the SystemTopicTxnBufferSnapshotService<T> class.
And Pulsar Service maintains a factory class TransactionBufferSnapshotServiceFactory used to obtain SystemTopicTxnBufferSnapshotService.
This way, we can obtain the required System topic client through pulsarService to read and send snapshots.
<img width="1336" alt="image" src="https://user-images.githubusercontent.com/55571188/197467173-9028e58a-79cc-4fe4-81e2-c299c568caee.png">
  • Loading branch information
liangyepianzhou authored Oct 24, 2022
1 parent 39270f0 commit d211766
Show file tree
Hide file tree
Showing 29 changed files with 859 additions and 416 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Optional;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl;

/**
* Definition of all the callbacks used for the ManagedLedger asynchronous API.
Expand All @@ -46,6 +47,12 @@ interface OpenReadOnlyCursorCallback {
void openReadOnlyCursorFailed(ManagedLedgerException exception, Object ctx);
}

interface OpenReadOnlyManagedLedgerCallback {
void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl managedLedger, Object ctx);

void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Object ctx);
}

interface DeleteLedgerCallback {
void deleteLedgerComplete(Object ctx);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,17 @@ ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPositi
void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, ManagedLedgerConfig config,
OpenReadOnlyCursorCallback callback, Object ctx);

/**
* Asynchronous open a Read-only managedLedger.
* @param managedLedgerName the unique name that identifies the managed ledger
* @param callback
* @param config the managed ledger configuration.
* @param ctx opaque context
*/
void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback,
ManagedLedgerConfig config, Object ctx);

/**
* Get the current metadata info for a managed ledger.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,29 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
});
}

@Override
public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback,
ManagedLedgerConfig config, Object ctx) {
if (closed) {
callback.openReadOnlyManagedLedgerFailed(
new ManagedLedgerException.ManagedLedgerFactoryClosedException(), ctx);
}
ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this,
bookkeeperFactory
.get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties())),
store, config, scheduledExecutor, managedLedgerName);
roManagedLedger.initialize().thenRun(() -> {
log.info("[{}] Successfully initialize Read-only managed ledger", managedLedgerName);
callback.openReadOnlyManagedLedgerComplete(roManagedLedger, ctx);

}).exceptionally(e -> {
log.error("[{}] Failed to initialize Read-only managed ledger", managedLedgerName, e);
callback.openReadOnlyManagedLedgerFailed((ManagedLedgerException) e.getCause(), ctx);
return null;
});
}

@Override
public ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPosition,
Expand Down Expand Up @@ -465,28 +487,20 @@ public void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosi
return;
}
checkArgument(startPosition instanceof PositionImpl);
ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this,
bookkeeperFactory
.get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties())),
store, config, scheduledExecutor, managedLedgerName);

roManagedLedger.initializeAndCreateCursor((PositionImpl) startPosition)
.thenAccept(roCursor -> callback.openReadOnlyCursorComplete(roCursor, ctx))
.exceptionally(ex -> {
Throwable t = ex;
if (t instanceof CompletionException) {
t = ex.getCause();
AsyncCallbacks.OpenReadOnlyManagedLedgerCallback openReadOnlyManagedLedgerCallback =
new AsyncCallbacks.OpenReadOnlyManagedLedgerCallback() {
@Override
public void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl readOnlyManagedLedger, Object ctx) {
callback.openReadOnlyCursorComplete(readOnlyManagedLedger.
createReadOnlyCursor((PositionImpl) startPosition), ctx);
}

if (t instanceof ManagedLedgerException) {
callback.openReadOnlyCursorFailed((ManagedLedgerException) t, ctx);
} else {
callback.openReadOnlyCursorFailed(new ManagedLedgerException(t), ctx);
@Override
public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Object ctx) {
callback.openReadOnlyCursorFailed(exception, ctx);
}

return null;
});
};
asyncOpenReadOnlyManagedLedger(managedLedgerName, openReadOnlyManagedLedgerCallback, config, null);
}

void close(ManagedLedger ledger) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public ReadOnlyManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bo
super(factory, bookKeeper, store, config, scheduledExecutor, name);
}

CompletableFuture<ReadOnlyCursor> initializeAndCreateCursor(PositionImpl startPosition) {
CompletableFuture<ReadOnlyCursor> future = new CompletableFuture<>();
CompletableFuture<Void> initialize() {
CompletableFuture<Void> future = new CompletableFuture<>();

// Fetch the list of existing ledgers in the managed ledger
store.getManagedLedgerInfo(name, false, new MetaStoreCallback<ManagedLedgerInfo>() {
Expand All @@ -72,15 +72,15 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
.setTimestamp(clock.millis()).build();
ledgers.put(lastLedgerId, info);

future.complete(createReadOnlyCursor(startPosition));
future.complete(null);
}).exceptionally(ex -> {
if (ex instanceof CompletionException
&& ex.getCause() instanceof IllegalArgumentException) {
// The last ledger was empty, so we cannot read the last add confirmed.
LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lastLedgerId)
.setEntries(0).setSize(0).setTimestamp(clock.millis()).build();
ledgers.put(lastLedgerId, info);
future.complete(createReadOnlyCursor(startPosition));
future.complete(null);
} else {
future.completeExceptionally(new ManagedLedgerException(ex));
}
Expand All @@ -93,15 +93,15 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lastLedgerId).setEntries(0)
.setSize(0).setTimestamp(clock.millis()).build();
ledgers.put(lastLedgerId, info);
future.complete(createReadOnlyCursor(startPosition));
future.complete(null);
} else {
future.completeExceptionally(new ManagedLedgerException(ex));
}
return null;
});
} else {
// The read-only managed ledger is ready to use
future.complete(createReadOnlyCursor(startPosition));
future.complete(null);
}
}

Expand All @@ -118,7 +118,7 @@ public void operationFailed(MetaStoreException e) {
return future;
}

private ReadOnlyCursor createReadOnlyCursor(PositionImpl startPosition) {
ReadOnlyCursor createReadOnlyCursor(PositionImpl startPosition) {
if (ledgers.isEmpty()) {
lastConfirmedEntry = PositionImpl.EARLIEST;
} else if (ledgers.lastEntry().getValue().getEntries() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,10 @@
import org.apache.pulsar.broker.rest.Topics;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
import org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
import org.apache.pulsar.broker.stats.MetricsGenerator;
Expand Down Expand Up @@ -260,8 +259,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private MetadataStoreExtended localMetadataStore;
private PulsarMetadataEventSynchronizer localMetadataSynchronizer;
private CoordinationService coordinationService;
private TransactionBufferSnapshotService transactionBufferSnapshotService;

private TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory;
private MetadataStore configurationMetadataStore;
private PulsarMetadataEventSynchronizer configMetadataSynchronizer;
private boolean shouldShutdownConfigurationMetadataStore;
Expand Down Expand Up @@ -510,9 +508,9 @@ public CompletableFuture<Void> closeAsync() {
adminClient = null;
}

if (transactionBufferSnapshotService != null) {
transactionBufferSnapshotService.close();
transactionBufferSnapshotService = null;
if (transactionBufferSnapshotServiceFactory != null) {
transactionBufferSnapshotServiceFactory.close();
transactionBufferSnapshotServiceFactory = null;
}

if (transactionBufferClient != null) {
Expand Down Expand Up @@ -837,7 +835,8 @@ public void start() throws PulsarServerException {
MLTransactionMetadataStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());

this.transactionBufferSnapshotService = new SystemTopicBaseTxnBufferSnapshotService(getClient());
this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(getClient());

this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
transactionBufferClient = TransactionBufferClientImpl.create(this, transactionTimer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, Action
} else {
PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
CompletableFuture<MessageId> actionFuture =
ActionType.DELETE.equals(actionType) ? writer.deleteAsync(event) : writer.writeAsync(event);
ActionType.DELETE.equals(actionType) ? writer.deleteAsync(getEventKey(event), event)
: writer.writeAsync(getEventKey(event), event);
actionFuture.whenComplete(((messageId, e) -> {
if (e != null) {
result.completeExceptionally(e);
Expand Down Expand Up @@ -455,7 +456,8 @@ private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
systemTopicClient.newWriterAsync().thenAccept(writer
-> writer.deleteAsync(getPulsarEvent(topicName, ActionType.DELETE, null))
-> writer.deleteAsync(getEventKey(topicName),
getPulsarEvent(topicName, ActionType.DELETE, null))
.whenComplete((result, e) -> writer.closeAsync().whenComplete((res, ex) -> {
if (ex != null) {
log.error("close writer failed ", ex);
Expand Down Expand Up @@ -539,6 +541,20 @@ private void fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader<Puls
});
}

public static String getEventKey(PulsarEvent event) {
return TopicName.get(event.getTopicPoliciesEvent().getDomain(),
event.getTopicPoliciesEvent().getTenant(),
event.getTopicPoliciesEvent().getNamespace(),
event.getTopicPoliciesEvent().getTopic()).toString();
}

public static String getEventKey(TopicName topicName) {
return TopicName.get(topicName.getDomain().toString(),
topicName.getTenant(),
topicName.getNamespace(),
TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).toString();
}

@VisibleForTesting
long getPoliciesCacheSize() {
return policiesCache.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,64 +23,63 @@
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;

public class SystemTopicBaseTxnBufferSnapshotService implements TransactionBufferSnapshotService {
public class SystemTopicTxnBufferSnapshotService<T> {

private final Map<TopicName, SystemTopicClient<TransactionBufferSnapshot>> clients;
protected final Map<TopicName, SystemTopicClient<T>> clients;
protected final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;

private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
protected final Class<T> schemaType;
protected final EventType systemTopicType;

public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) {
public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType systemTopicType,
Class<T> schemaType) {
this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
this.systemTopicType = systemTopicType;
this.schemaType = schemaType;
this.clients = new ConcurrentHashMap<>();
}

@Override
public CompletableFuture<Writer<TransactionBufferSnapshot>> createWriter(TopicName topicName) {
public CompletableFuture<SystemTopicClient.Writer<T>> createWriter(TopicName topicName) {
return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
}

private CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>> getTransactionBufferSystemTopicClient(
TopicName topicName) {
TopicName systemTopicName = NamespaceEventsSystemTopicFactory
.getSystemTopicName(topicName.getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
if (systemTopicName == null) {
return FutureUtil.failedFuture(
new InvalidTopicNameException("Can't create SystemTopicBaseTxnBufferSnapshotService, "
+ "because the topicName is null!"));
}
return CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
(v) -> namespaceEventsSystemTopicFactory
.createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(), this)));
}

@Override
public CompletableFuture<Reader<TransactionBufferSnapshot>> createReader(TopicName topicName) {
public CompletableFuture<SystemTopicClient.Reader<T>> createReader(TopicName topicName) {
return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
}

@Override
public void removeClient(TopicName topicName,
TransactionBufferSystemTopicClient transactionBufferSystemTopicClient) {
public void removeClient(TopicName topicName, SystemTopicClientBase<T> transactionBufferSystemTopicClient) {
if (transactionBufferSystemTopicClient.getReaders().size() == 0
&& transactionBufferSystemTopicClient.getWriters().size() == 0) {
clients.remove(topicName);
}
}

@Override
protected CompletableFuture<SystemTopicClient<T>> getTransactionBufferSystemTopicClient(TopicName topicName) {
TopicName systemTopicName = NamespaceEventsSystemTopicFactory
.getSystemTopicName(topicName.getNamespaceObject(), systemTopicType);
if (systemTopicName == null) {
return FutureUtil.failedFuture(
new PulsarClientException
.InvalidTopicNameException("Can't create SystemTopicBaseTxnBufferSnapshotIndexService, "
+ "because the topicName is null!"));
}
return CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
(v) -> namespaceEventsSystemTopicFactory
.createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(),
this, schemaType)));
}

public void close() throws Exception {
for (Map.Entry<TopicName, SystemTopicClient<TransactionBufferSnapshot>> entry : clients.entrySet()) {
for (Map.Entry<TopicName, SystemTopicClient<T>> entry : clients.entrySet()) {
entry.getValue().close();
}
}

}
Loading

0 comments on commit d211766

Please sign in to comment.