Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker] Segmented transaction buffer snapshot segment and index system topic #16931

Merged
merged 31 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
677f765
Implement snapshot segment topic and snapshot index topic.
liangyepianzhou Sep 6, 2022
7b0f1ee
Merge branch 'master' into xiangying/pip/pip196-2
liangyepianzhou Sep 6, 2022
f8fa913
move snapshot to index
liangyepianzhou Sep 8, 2022
88c33e6
Merge remote-tracking branch 'origin/xiangying/pip/pip196-2' into xia…
liangyepianzhou Sep 14, 2022
d98e006
abstract
liangyepianzhou Sep 15, 2022
e8dda12
change reader to cursor for segment snapshot.
liangyepianzhou Sep 18, 2022
bba847e
optimize name
liangyepianzhou Sep 18, 2022
63dec02
optimize
liangyepianzhou Sep 18, 2022
73a4731
change read-only cursor to read-only managedledger.
liangyepianzhou Sep 19, 2022
e8b0c4f
abstract transactionBufferSnapshotService
liangyepianzhou Sep 19, 2022
bc45917
abstract writer and read, and fix read-only managedLedger init.
liangyepianzhou Sep 20, 2022
e99941d
Merge remote-tracking branch 'apache/master' into xiangying/pip/pip196-2
liangyepianzhou Sep 20, 2022
0626685
merge config
liangyepianzhou Sep 20, 2022
b643434
optimize
liangyepianzhou Sep 21, 2022
8cf7ee0
fix
liangyepianzhou Sep 22, 2022
7849206
fix
liangyepianzhou Sep 22, 2022
2737664
fix
liangyepianzhou Sep 22, 2022
f9dc0ae
optimize by class<T>
liangyepianzhou Oct 17, 2022
189fe40
optimize by class<T>
liangyepianzhou Oct 17, 2022
e282cbf
merge
liangyepianzhou Oct 17, 2022
6cba282
fix some comments
liangyepianzhou Oct 18, 2022
8da3749
fix some comments
liangyepianzhou Oct 19, 2022
cd6c909
(T t, String key) to (String key, T t)
liangyepianzhou Oct 20, 2022
d2f3889
optimize inner class and TxnIDData
liangyepianzhou Oct 20, 2022
2dec0a5
CompletableFuture<Void> initialize, and getEventKey
liangyepianzhou Oct 20, 2022
108edc9
rename TransactionBufferSnapshot to TransactionBufferSnapshotSegment
liangyepianzhou Oct 20, 2022
83c380f
license header
liangyepianzhou Oct 20, 2022
6a41ee8
rollback txnID and optimize close
liangyepianzhou Oct 21, 2022
fe6ec1b
fix
liangyepianzhou Oct 21, 2022
6ec7dc6
fix
liangyepianzhou Oct 21, 2022
39da371
fix
liangyepianzhou Oct 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback;
import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;

/**
Expand Down Expand Up @@ -116,6 +117,15 @@ ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPositi
void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, ManagedLedgerConfig config,
OpenReadOnlyCursorCallback callback, Object ctx);

/**
* Asynchronous open a Read-only managedLedger.
* @param managedLedgerName the unique name that identifies the managed ledger
* @param config the managed ledegr configuratiion.
* @param ctx opaque context
*/
CompletableFuture<ReadOnlyManagedLedgerImpl> asyncOpenReadOnlyManagedLedger(String managedLedgerName,
ManagedLedgerConfig config, Object ctx);

/**
* Get the current metadata info for a managed ledger.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,28 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
});
}


@Override
public CompletableFuture<ReadOnlyManagedLedgerImpl> asyncOpenReadOnlyManagedLedger(String managedLedgerName,
ManagedLedgerConfig config, Object ctx) {
CompletableFuture<ReadOnlyManagedLedgerImpl> future = new CompletableFuture<>();
if (closed) {
return FutureUtil.failedFuture(new ManagedLedgerException.ManagedLedgerFactoryClosedException());
}
ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this,
bookkeeperFactory
.get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties())),
store, config, scheduledExecutor, managedLedgerName);
roManagedLedger.initialize().thenRun(() -> {
log.info("[{}] Successfully initialize Read-only managed ledger", managedLedgerName);
future.complete(roManagedLedger);
}).exceptionally(e -> {
log.error("[{}] Failed to initialize Read-only managed ledger", managedLedgerName, e);
future.completeExceptionally(new ManagedLedgerException.ManagedLedgerFactoryClosedException());
return null;
});
return future;
}

@Override
public ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPosition,
Expand Down Expand Up @@ -460,28 +481,23 @@ public void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosi
return;
}
checkArgument(startPosition instanceof PositionImpl);
ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this,
bookkeeperFactory
.get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties())),
store, config, scheduledExecutor, managedLedgerName);

roManagedLedger.initializeAndCreateCursor((PositionImpl) startPosition)
.thenAccept(roCursor -> callback.openReadOnlyCursorComplete(roCursor, ctx))
asyncOpenReadOnlyManagedLedger(managedLedgerName, config, null).thenAccept(readOnlyManagedLedger ->
callback.openReadOnlyCursorComplete(readOnlyManagedLedger.
createReadOnlyCursor((PositionImpl) startPosition), ctx))
.exceptionally(ex -> {
Throwable t = ex;
if (t instanceof CompletionException) {
t = ex.getCause();
}
Throwable t = ex;
if (t instanceof CompletionException) {
t = ex.getCause();
}

if (t instanceof ManagedLedgerException) {
callback.openReadOnlyCursorFailed((ManagedLedgerException) t, ctx);
} else {
callback.openReadOnlyCursorFailed(new ManagedLedgerException(t), ctx);
}
if (t instanceof ManagedLedgerException) {
callback.openReadOnlyCursorFailed((ManagedLedgerException) t, ctx);
} else {
callback.openReadOnlyCursorFailed(new ManagedLedgerException(t), ctx);
}

return null;
});
return null;
});
}

void close(ManagedLedger ledger) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public ReadOnlyManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bo
super(factory, bookKeeper, store, config, scheduledExecutor, name);
}

CompletableFuture<ReadOnlyCursor> initializeAndCreateCursor(PositionImpl startPosition) {
CompletableFuture<ReadOnlyCursor> future = new CompletableFuture<>();
CompletableFuture<ReadOnlyManagedLedgerImpl> initialize() {
CompletableFuture<ReadOnlyManagedLedgerImpl> future = new CompletableFuture<>();

// Fetch the list of existing ledgers in the managed ledger
store.getManagedLedgerInfo(name, false, new MetaStoreCallback<ManagedLedgerInfo>() {
Expand All @@ -72,15 +72,15 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
.setTimestamp(clock.millis()).build();
ledgers.put(lastLedgerId, info);

future.complete(createReadOnlyCursor(startPosition));
future.complete(ReadOnlyManagedLedgerImpl.this);
}).exceptionally(ex -> {
if (ex instanceof CompletionException
&& ex.getCause() instanceof IllegalArgumentException) {
// The last ledger was empty, so we cannot read the last add confirmed.
LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lastLedgerId)
.setEntries(0).setSize(0).setTimestamp(clock.millis()).build();
ledgers.put(lastLedgerId, info);
future.complete(createReadOnlyCursor(startPosition));
future.complete(ReadOnlyManagedLedgerImpl.this);
} else {
future.completeExceptionally(new ManagedLedgerException(ex));
}
Expand All @@ -93,15 +93,15 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lastLedgerId).setEntries(0)
.setSize(0).setTimestamp(clock.millis()).build();
ledgers.put(lastLedgerId, info);
future.complete(createReadOnlyCursor(startPosition));
future.complete(ReadOnlyManagedLedgerImpl.this);
} else {
future.completeExceptionally(new ManagedLedgerException(ex));
}
return null;
});
} else {
// The read-only managed ledger is ready to use
future.complete(createReadOnlyCursor(startPosition));
future.complete(ReadOnlyManagedLedgerImpl.this);
}
}

Expand All @@ -118,7 +118,7 @@ public void operationFailed(MetaStoreException e) {
return future;
}

private ReadOnlyCursor createReadOnlyCursor(PositionImpl startPosition) {
ReadOnlyCursor createReadOnlyCursor(PositionImpl startPosition) {
if (ledgers.isEmpty()) {
lastConfirmedEntry = PositionImpl.EARLIEST;
} else if (ledgers.lastEntry().getValue().getEntries() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@
import org.apache.pulsar.broker.rest.Topics;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
import org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceImpl;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
Expand Down Expand Up @@ -259,7 +259,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private PulsarMetadataEventSynchronizer localMetadataSynchronizer;
private CoordinationService coordinationService;
private TransactionBufferSnapshotService transactionBufferSnapshotService;

private MetadataStore configurationMetadataStore;
private PulsarMetadataEventSynchronizer configMetadataSynchronizer;
private boolean shouldShutdownConfigurationMetadataStore;
Expand Down Expand Up @@ -821,7 +820,9 @@ public void start() throws PulsarServerException {

// Register pulsar system namespaces and start transaction meta store service
if (config.isTransactionCoordinatorEnabled()) {
this.transactionBufferSnapshotService = new SystemTopicBaseTxnBufferSnapshotService(getClient());
this.transactionBufferSnapshotService = new TransactionBufferSnapshotServiceImpl(getClient(),
getConfig().isTransactionBufferSegmentedSnapshotEnabled());

this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
transactionBufferClient = TransactionBufferClientImpl.create(this, transactionTimer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.systopic.TopicPoliciesSystemTopicClient.TopicPolicyWriter.getEventKey;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -119,7 +120,8 @@ private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, Action
} else {
PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
CompletableFuture<MessageId> actionFuture =
ActionType.DELETE.equals(actionType) ? writer.deleteAsync(event) : writer.writeAsync(event);
ActionType.DELETE.equals(actionType) ? writer.deleteAsync(event, getEventKey(event))
: writer.writeAsync(event, getEventKey(event));
actionFuture.whenComplete(((messageId, e) -> {
if (e != null) {
result.completeExceptionally(e);
Expand Down Expand Up @@ -451,7 +453,8 @@ private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
systemTopicClient.newWriterAsync().thenAccept(writer
-> writer.deleteAsync(getPulsarEvent(topicName, ActionType.DELETE, null))
-> writer.deleteAsync(getPulsarEvent(topicName, ActionType.DELETE, null),
getEventKey(topicName))
.whenComplete((result, e) -> writer.closeAsync().whenComplete((res, ex) -> {
if (ex != null) {
log.error("close writer failed ", ex);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.naming.TopicName;

public abstract class SystemTopicTxnBufferSnapshotBaseService<T> implements SystemTopicTxnBufferSnapshotService<T> {

protected final Map<TopicName, SystemTopicClient<T>> clients;
protected final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;

public SystemTopicTxnBufferSnapshotBaseService(PulsarClient client) {
this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
this.clients = new ConcurrentHashMap<>();
}

@Override
public CompletableFuture<SystemTopicClient.Writer<T>> createWriter(TopicName topicName) {
return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
}

@Override
public CompletableFuture<SystemTopicClient.Reader<T>> createReader(TopicName topicName) {
return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
}

@Override
public void removeClient(TopicName topicName, SystemTopicClientBase<T> transactionBufferSystemTopicClient) {
if (transactionBufferSystemTopicClient.getReaders().size() == 0
&& transactionBufferSystemTopicClient.getWriters().size() == 0) {
clients.remove(topicName);
}
}

@Override
public void close() throws Exception {
for (Map.Entry<TopicName, SystemTopicClient<T>> entry : clients.entrySet()) {
entry.getValue().close();
}
}

protected abstract CompletableFuture<SystemTopicClient<T>> getTransactionBufferSystemTopicClient(
TopicName topicName);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.matadata.v2.TransactionBufferSnapshotIndexes;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;

public class SystemTopicTxnBufferSnapshotIndexServiceImpl extends
SystemTopicTxnBufferSnapshotBaseService<TransactionBufferSnapshotIndexes> {

public SystemTopicTxnBufferSnapshotIndexServiceImpl(PulsarClient client) {
super(client);
}

@Override
protected CompletableFuture<SystemTopicClient<TransactionBufferSnapshotIndexes>>
getTransactionBufferSystemTopicClient(
TopicName topicName) {
TopicName systemTopicName = NamespaceEventsSystemTopicFactory
.getSystemTopicName(topicName.getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
if (systemTopicName == null) {
return FutureUtil.failedFuture(
new InvalidTopicNameException("Can't create SystemTopicBaseTxnBufferSnapshotIndexService, "
+ "because the topicName is null!"));
}
return CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
(v) -> namespaceEventsSystemTopicFactory
.createTransactionBufferSnapshotIndexSystemTopicClient(topicName.getNamespaceObject(),
this)));
}

}
Loading