Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker] Segmented transaction buffer snapshot segment and index system topic #16931

Merged
merged 31 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
677f765
Implement snapshot segment topic and snapshot index topic.
liangyepianzhou Sep 6, 2022
7b0f1ee
Merge branch 'master' into xiangying/pip/pip196-2
liangyepianzhou Sep 6, 2022
f8fa913
move snapshot to index
liangyepianzhou Sep 8, 2022
88c33e6
Merge remote-tracking branch 'origin/xiangying/pip/pip196-2' into xia…
liangyepianzhou Sep 14, 2022
d98e006
abstract
liangyepianzhou Sep 15, 2022
e8dda12
change reader to cursor for segment snapshot.
liangyepianzhou Sep 18, 2022
bba847e
optimize name
liangyepianzhou Sep 18, 2022
63dec02
optimize
liangyepianzhou Sep 18, 2022
73a4731
change read-only cursor to read-only managedledger.
liangyepianzhou Sep 19, 2022
e8b0c4f
abstract transactionBufferSnapshotService
liangyepianzhou Sep 19, 2022
bc45917
abstract writer and read, and fix read-only managedLedger init.
liangyepianzhou Sep 20, 2022
e99941d
Merge remote-tracking branch 'apache/master' into xiangying/pip/pip196-2
liangyepianzhou Sep 20, 2022
0626685
merge config
liangyepianzhou Sep 20, 2022
b643434
optimize
liangyepianzhou Sep 21, 2022
8cf7ee0
fix
liangyepianzhou Sep 22, 2022
7849206
fix
liangyepianzhou Sep 22, 2022
2737664
fix
liangyepianzhou Sep 22, 2022
f9dc0ae
optimize by class<T>
liangyepianzhou Oct 17, 2022
189fe40
optimize by class<T>
liangyepianzhou Oct 17, 2022
e282cbf
merge
liangyepianzhou Oct 17, 2022
6cba282
fix some comments
liangyepianzhou Oct 18, 2022
8da3749
fix some comments
liangyepianzhou Oct 19, 2022
cd6c909
(T t, String key) to (String key, T t)
liangyepianzhou Oct 20, 2022
d2f3889
optimize inner class and TxnIDData
liangyepianzhou Oct 20, 2022
2dec0a5
CompletableFuture<Void> initialize, and getEventKey
liangyepianzhou Oct 20, 2022
108edc9
rename TransactionBufferSnapshot to TransactionBufferSnapshotSegment
liangyepianzhou Oct 20, 2022
83c380f
license header
liangyepianzhou Oct 20, 2022
6a41ee8
rollback txnID and optimize close
liangyepianzhou Oct 21, 2022
fe6ec1b
fix
liangyepianzhou Oct 21, 2022
6ec7dc6
fix
liangyepianzhou Oct 21, 2022
39da371
fix
liangyepianzhou Oct 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Optional;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl;

/**
* Definition of all the callbacks used for the ManagedLedger asynchronous API.
Expand All @@ -46,6 +47,12 @@ interface OpenReadOnlyCursorCallback {
void openReadOnlyCursorFailed(ManagedLedgerException exception, Object ctx);
}

interface OpenReadOnlyManagedLedgerCallback {
void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl managedLedger, Object ctx);

void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Object ctx);
}

interface DeleteLedgerCallback {
void deleteLedgerComplete(Object ctx);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,17 @@ ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPositi
void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, ManagedLedgerConfig config,
OpenReadOnlyCursorCallback callback, Object ctx);

/**
* Asynchronous open a Read-only managedLedger.
* @param managedLedgerName the unique name that identifies the managed ledger
* @param callback
* @param config the managed ledger configuration.
* @param ctx opaque context
*/
void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback,
ManagedLedgerConfig config, Object ctx);

/**
* Get the current metadata info for a managed ledger.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,30 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
});
}

@Override
public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback,
ManagedLedgerConfig config, Object ctx) {
CompletableFuture<ReadOnlyManagedLedgerImpl> future = new CompletableFuture<>();
if (closed) {
callback.openReadOnlyManagedLedgerFailed(
new ManagedLedgerException.ManagedLedgerFactoryClosedException(), ctx);
}
ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this,
bookkeeperFactory
.get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties())),
store, config, scheduledExecutor, managedLedgerName);
roManagedLedger.initialize().thenRun(() -> {
log.info("[{}] Successfully initialize Read-only managed ledger", managedLedgerName);
callback.openReadOnlyManagedLedgerComplete(roManagedLedger, ctx);

}).exceptionally(e -> {
log.error("[{}] Failed to initialize Read-only managed ledger", managedLedgerName, e);
callback.openReadOnlyManagedLedgerFailed((ManagedLedgerException) e.getCause(), ctx);
return null;
});
}

@Override
public ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPosition,
Expand Down Expand Up @@ -462,28 +485,20 @@ public void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosi
return;
}
checkArgument(startPosition instanceof PositionImpl);
ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this,
bookkeeperFactory
.get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties())),
store, config, scheduledExecutor, managedLedgerName);

roManagedLedger.initializeAndCreateCursor((PositionImpl) startPosition)
.thenAccept(roCursor -> callback.openReadOnlyCursorComplete(roCursor, ctx))
.exceptionally(ex -> {
Throwable t = ex;
if (t instanceof CompletionException) {
t = ex.getCause();
AsyncCallbacks.OpenReadOnlyManagedLedgerCallback openReadOnlyManagedLedgerCallback =
new AsyncCallbacks.OpenReadOnlyManagedLedgerCallback() {
@Override
public void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl readOnlyManagedLedger, Object ctx) {
callback.openReadOnlyCursorComplete(readOnlyManagedLedger.
createReadOnlyCursor((PositionImpl) startPosition), ctx);
}

if (t instanceof ManagedLedgerException) {
callback.openReadOnlyCursorFailed((ManagedLedgerException) t, ctx);
} else {
callback.openReadOnlyCursorFailed(new ManagedLedgerException(t), ctx);
@Override
public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Object ctx) {
callback.openReadOnlyCursorFailed(exception, ctx);
}

return null;
});
};
asyncOpenReadOnlyManagedLedger(managedLedgerName, openReadOnlyManagedLedgerCallback, config, null);
}

void close(ManagedLedger ledger) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public ReadOnlyManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bo
super(factory, bookKeeper, store, config, scheduledExecutor, name);
}

CompletableFuture<ReadOnlyCursor> initializeAndCreateCursor(PositionImpl startPosition) {
CompletableFuture<ReadOnlyCursor> future = new CompletableFuture<>();
CompletableFuture<ReadOnlyManagedLedgerImpl> initialize() {
CompletableFuture<ReadOnlyManagedLedgerImpl> future = new CompletableFuture<>();

// Fetch the list of existing ledgers in the managed ledger
store.getManagedLedgerInfo(name, false, new MetaStoreCallback<ManagedLedgerInfo>() {
Expand All @@ -72,15 +72,15 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
.setTimestamp(clock.millis()).build();
ledgers.put(lastLedgerId, info);

future.complete(createReadOnlyCursor(startPosition));
future.complete(ReadOnlyManagedLedgerImpl.this);
}).exceptionally(ex -> {
if (ex instanceof CompletionException
&& ex.getCause() instanceof IllegalArgumentException) {
// The last ledger was empty, so we cannot read the last add confirmed.
LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lastLedgerId)
.setEntries(0).setSize(0).setTimestamp(clock.millis()).build();
ledgers.put(lastLedgerId, info);
future.complete(createReadOnlyCursor(startPosition));
future.complete(ReadOnlyManagedLedgerImpl.this);
} else {
future.completeExceptionally(new ManagedLedgerException(ex));
}
Expand All @@ -93,15 +93,15 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lastLedgerId).setEntries(0)
.setSize(0).setTimestamp(clock.millis()).build();
ledgers.put(lastLedgerId, info);
future.complete(createReadOnlyCursor(startPosition));
future.complete(ReadOnlyManagedLedgerImpl.this);
} else {
future.completeExceptionally(new ManagedLedgerException(ex));
}
return null;
});
} else {
// The read-only managed ledger is ready to use
future.complete(createReadOnlyCursor(startPosition));
future.complete(ReadOnlyManagedLedgerImpl.this);
}
}

Expand All @@ -118,7 +118,7 @@ public void operationFailed(MetaStoreException e) {
return future;
}

private ReadOnlyCursor createReadOnlyCursor(PositionImpl startPosition) {
ReadOnlyCursor createReadOnlyCursor(PositionImpl startPosition) {
if (ledgers.isEmpty()) {
lastConfirmedEntry = PositionImpl.EARLIEST;
} else if (ledgers.lastEntry().getValue().getEntries() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,10 @@
import org.apache.pulsar.broker.rest.Topics;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
import org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
import org.apache.pulsar.broker.stats.MetricsGenerator;
Expand Down Expand Up @@ -260,8 +259,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private MetadataStoreExtended localMetadataStore;
private PulsarMetadataEventSynchronizer localMetadataSynchronizer;
private CoordinationService coordinationService;
private TransactionBufferSnapshotService transactionBufferSnapshotService;

private TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory;
private MetadataStore configurationMetadataStore;
private PulsarMetadataEventSynchronizer configMetadataSynchronizer;
private boolean shouldShutdownConfigurationMetadataStore;
Expand Down Expand Up @@ -510,9 +508,9 @@ public CompletableFuture<Void> closeAsync() {
adminClient = null;
}

if (transactionBufferSnapshotService != null) {
transactionBufferSnapshotService.close();
transactionBufferSnapshotService = null;
if (transactionBufferSnapshotServiceFactory != null) {
transactionBufferSnapshotServiceFactory.close();
transactionBufferSnapshotServiceFactory = null;
}

if (transactionBufferClient != null) {
Expand Down Expand Up @@ -835,7 +833,8 @@ public void start() throws PulsarServerException {
MLTransactionMetadataStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());

this.transactionBufferSnapshotService = new SystemTopicBaseTxnBufferSnapshotService(getClient());
this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(getClient());

this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
transactionBufferClient = TransactionBufferClientImpl.create(this, transactionTimer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.systopic.TopicPoliciesSystemTopicClient.TopicPolicyWriter.getEventKey;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import java.util.HashSet;
Expand Down Expand Up @@ -123,7 +124,8 @@ private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, Action
} else {
PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
CompletableFuture<MessageId> actionFuture =
ActionType.DELETE.equals(actionType) ? writer.deleteAsync(event) : writer.writeAsync(event);
ActionType.DELETE.equals(actionType) ? writer.deleteAsync(event, getEventKey(event))
: writer.writeAsync(event, getEventKey(event));
actionFuture.whenComplete(((messageId, e) -> {
if (e != null) {
result.completeExceptionally(e);
Expand Down Expand Up @@ -455,7 +457,8 @@ private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
systemTopicClient.newWriterAsync().thenAccept(writer
-> writer.deleteAsync(getPulsarEvent(topicName, ActionType.DELETE, null))
-> writer.deleteAsync(getPulsarEvent(topicName, ActionType.DELETE, null),
getEventKey(topicName))
.whenComplete((result, e) -> writer.closeAsync().whenComplete((res, ex) -> {
if (ex != null) {
log.error("close writer failed ", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,64 +23,63 @@
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;

public class SystemTopicBaseTxnBufferSnapshotService implements TransactionBufferSnapshotService {
public class SystemTopicTxnBufferSnapshotService<T> {

private final Map<TopicName, SystemTopicClient<TransactionBufferSnapshot>> clients;
protected final Map<TopicName, SystemTopicClient<T>> clients;
protected final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;

private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
protected final Class<T> schemaType;
protected final EventType systemTopicType;

public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) {
public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType systemTopicType,
Class<T> schemaType) {
this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
this.systemTopicType = systemTopicType;
this.schemaType = schemaType;
this.clients = new ConcurrentHashMap<>();
}

@Override
public CompletableFuture<Writer<TransactionBufferSnapshot>> createWriter(TopicName topicName) {
public CompletableFuture<SystemTopicClient.Writer<T>> createWriter(TopicName topicName) {
return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
}

private CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>> getTransactionBufferSystemTopicClient(
TopicName topicName) {
TopicName systemTopicName = NamespaceEventsSystemTopicFactory
.getSystemTopicName(topicName.getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
if (systemTopicName == null) {
return FutureUtil.failedFuture(
new InvalidTopicNameException("Can't create SystemTopicBaseTxnBufferSnapshotService, "
+ "because the topicName is null!"));
}
return CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
(v) -> namespaceEventsSystemTopicFactory
.createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(), this)));
}

@Override
public CompletableFuture<Reader<TransactionBufferSnapshot>> createReader(TopicName topicName) {
public CompletableFuture<SystemTopicClient.Reader<T>> createReader(TopicName topicName) {
return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
}

@Override
public void removeClient(TopicName topicName,
TransactionBufferSystemTopicClient transactionBufferSystemTopicClient) {
public void removeClient(TopicName topicName, SystemTopicClientBase<T> transactionBufferSystemTopicClient) {
if (transactionBufferSystemTopicClient.getReaders().size() == 0
&& transactionBufferSystemTopicClient.getWriters().size() == 0) {
clients.remove(topicName);
}
}

@Override
protected CompletableFuture<SystemTopicClient<T>> getTransactionBufferSystemTopicClient(TopicName topicName) {
TopicName systemTopicName = NamespaceEventsSystemTopicFactory
.getSystemTopicName(topicName.getNamespaceObject(), systemTopicType);
if (systemTopicName == null) {
return FutureUtil.failedFuture(
new PulsarClientException
.InvalidTopicNameException("Can't create SystemTopicBaseTxnBufferSnapshotIndexService, "
+ "because the topicName is null!"));
}
return CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
(v) -> namespaceEventsSystemTopicFactory
.createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(),
this, schemaType)));
}

public void close() throws Exception {
for (Map.Entry<TopicName, SystemTopicClient<TransactionBufferSnapshot>> entry : clients.entrySet()) {
for (Map.Entry<TopicName, SystemTopicClient<T>> entry : clients.entrySet()) {
entry.getValue().close();
}
}

}
Loading