Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker] Segmented transaction buffer snapshot segment and index system topic #16931

Merged
merged 31 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
677f765
Implement snapshot segment topic and snapshot index topic.
liangyepianzhou Sep 6, 2022
7b0f1ee
Merge branch 'master' into xiangying/pip/pip196-2
liangyepianzhou Sep 6, 2022
f8fa913
move snapshot to index
liangyepianzhou Sep 8, 2022
88c33e6
Merge remote-tracking branch 'origin/xiangying/pip/pip196-2' into xia…
liangyepianzhou Sep 14, 2022
d98e006
abstract
liangyepianzhou Sep 15, 2022
e8dda12
change reader to cursor for segment snapshot.
liangyepianzhou Sep 18, 2022
bba847e
optimize name
liangyepianzhou Sep 18, 2022
63dec02
optimize
liangyepianzhou Sep 18, 2022
73a4731
change read-only cursor to read-only managedledger.
liangyepianzhou Sep 19, 2022
e8b0c4f
abstract transactionBufferSnapshotService
liangyepianzhou Sep 19, 2022
bc45917
abstract writer and read, and fix read-only managedLedger init.
liangyepianzhou Sep 20, 2022
e99941d
Merge remote-tracking branch 'apache/master' into xiangying/pip/pip196-2
liangyepianzhou Sep 20, 2022
0626685
merge config
liangyepianzhou Sep 20, 2022
b643434
optimize
liangyepianzhou Sep 21, 2022
8cf7ee0
fix
liangyepianzhou Sep 22, 2022
7849206
fix
liangyepianzhou Sep 22, 2022
2737664
fix
liangyepianzhou Sep 22, 2022
f9dc0ae
optimize by class<T>
liangyepianzhou Oct 17, 2022
189fe40
optimize by class<T>
liangyepianzhou Oct 17, 2022
e282cbf
merge
liangyepianzhou Oct 17, 2022
6cba282
fix some comments
liangyepianzhou Oct 18, 2022
8da3749
fix some comments
liangyepianzhou Oct 19, 2022
cd6c909
(T t, String key) to (String key, T t)
liangyepianzhou Oct 20, 2022
d2f3889
optimize inner class and TxnIDData
liangyepianzhou Oct 20, 2022
2dec0a5
CompletableFuture<Void> initialize, and getEventKey
liangyepianzhou Oct 20, 2022
108edc9
rename TransactionBufferSnapshot to TransactionBufferSnapshotSegment
liangyepianzhou Oct 20, 2022
83c380f
license header
liangyepianzhou Oct 20, 2022
6a41ee8
rollback txnID and optimize close
liangyepianzhou Oct 21, 2022
fe6ec1b
fix
liangyepianzhou Oct 21, 2022
6ec7dc6
fix
liangyepianzhou Oct 21, 2022
39da371
fix
liangyepianzhou Oct 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPositi
void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, ManagedLedgerConfig config,
OpenReadOnlyCursorCallback callback, Object ctx);

/**
* Asynchronous open a Read-only managedLedger.
* @param managedLedgerName the unique name that identifies the managed ledger
* @param config the managed ledegr configuratiion.
* @param callback callback object
* @param ctx opaque context
*/
void asyncOpenReadOnlyManagedLedger(String managedLedgerName, ManagedLedgerConfig config,
OpenLedgerCallback callback, Object ctx);

/**
* Get the current metadata info for a managed ledger.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,33 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
});
}

@Override
public void asyncOpenReadOnlyManagedLedger(String managedLedgerName, ManagedLedgerConfig config,
OpenLedgerCallback callback, Object ctx) {
if (closed) {
callback.openLedgerFailed(new ManagedLedgerException.ManagedLedgerFactoryClosedException(), ctx);
return;
}
ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this,
bookkeeperFactory
.get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties())),
store, config, scheduledExecutor, managedLedgerName);

roManagedLedger.initialize(new ManagedLedgerInitializeLedgerCallback() {
@Override
public void initializeComplete() {
log.info("[{}] Successfully initialize Read-only managed ledger", managedLedgerName);
callback.openLedgerComplete(roManagedLedger, ctx);
}

@Override
public void initializeFailed(ManagedLedgerException e) {
log.error("[{}] Failed to initialize Read-only managed ledger", managedLedgerName, e);
callback.openLedgerFailed(e, ctx);
}
}, ctx);
}

@Override
public ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPosition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,12 @@
import org.apache.pulsar.broker.rest.Topics;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
import org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotSegmentServiceImpl;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceImpl;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
import org.apache.pulsar.broker.stats.MetricsGenerator;
Expand Down Expand Up @@ -261,7 +262,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private PulsarMetadataEventSynchronizer localMetadataSynchronizer;
private CoordinationService coordinationService;
private TransactionBufferSnapshotService transactionBufferSnapshotService;

private MetadataStore configurationMetadataStore;
private PulsarMetadataEventSynchronizer configMetadataSynchronizer;
private boolean shouldShutdownConfigurationMetadataStore;
Expand Down Expand Up @@ -819,7 +819,11 @@ public void start() throws PulsarServerException {

// Register pulsar system namespaces and start transaction meta store service
if (config.isTransactionCoordinatorEnabled()) {
this.transactionBufferSnapshotService = new SystemTopicBaseTxnBufferSnapshotService(getClient());
// if (transactionBufferSegmentedSnapshotEnabled)
this.transactionBufferSnapshotService = new TransactionBufferSnapshotSegmentServiceImpl(getClient());
// else
this.transactionBufferSnapshotService = new TransactionBufferSnapshotServiceImpl(getClient());

this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
transactionBufferClient = TransactionBufferClientImpl.create(this, transactionTimer,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
import org.apache.pulsar.common.naming.TopicName;

public abstract class SystemTopicBaseTxnBufferSnapshotBaseService<T> implements
SystemTopicTxnBufferSnapshotService<T> {

protected final Map<TopicName, SystemTopicClient<T>> clients;

public SystemTopicBaseTxnBufferSnapshotBaseService(
Map<TopicName, SystemTopicClient<T>> clients) {
this.clients = clients;
}

@Override
public CompletableFuture<SystemTopicClient.Writer<T>> createWriter(TopicName topicName) {
return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);

}

@Override
public CompletableFuture<SystemTopicClient.Reader<T>> createReader(TopicName topicName) {
return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
}

@Override
public void removeClient(TopicName topicName, SystemTopicClientBase<T> transactionBufferSystemTopicClient) {
if (transactionBufferSystemTopicClient.getReaders().size() == 0
&& transactionBufferSystemTopicClient.getWriters().size() == 0) {
clients.remove(topicName);
}
}

@Override
public void close() throws Exception {
for (Map.Entry<TopicName, SystemTopicClient<T>> entry : clients.entrySet()) {
entry.getValue().close();
}
}

protected abstract CompletableFuture<SystemTopicClient<T>> getTransactionBufferSystemTopicClient(
TopicName topicName);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.matadata.v2.TransactionBufferSnapshotIndexes;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;

public class SystemTopicBaseTxnBufferSnapshotIndexService extends
SystemTopicBaseTxnBufferSnapshotBaseService<TransactionBufferSnapshotIndexes> {

private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;

public SystemTopicBaseTxnBufferSnapshotIndexService(PulsarClient client) {
super(new ConcurrentHashMap<>());
this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
}

@Override
protected CompletableFuture<SystemTopicClient<TransactionBufferSnapshotIndexes>>
getTransactionBufferSystemTopicClient(
TopicName topicName) {
TopicName systemTopicName = NamespaceEventsSystemTopicFactory
.getSystemTopicName(topicName.getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
if (systemTopicName == null) {
return FutureUtil.failedFuture(
new InvalidTopicNameException("Can't create SystemTopicBaseTxnBufferSnapshotIndexService, "
+ "because the topicName is null!"));
}
return CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
(v) -> namespaceEventsSystemTopicFactory
.createTransactionBufferSnapshotIndexSystemTopicClient(topicName.getNamespaceObject(),
this)));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.matadata.v2.TransactionBufferSnapshotIndexes;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;

public class SystemTopicBaseTxnBufferSnapshotSegmentService extends
SystemTopicBaseTxnBufferSnapshotBaseService<TransactionBufferSnapshotIndexes.TransactionBufferSnapshot> {
private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;

public SystemTopicBaseTxnBufferSnapshotSegmentService(PulsarClient client) {
super(new ConcurrentHashMap<>());
this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
}

@Override
protected CompletableFuture<SystemTopicClient<TransactionBufferSnapshotIndexes.TransactionBufferSnapshot>>
getTransactionBufferSystemTopicClient(
TopicName topicName) {
TopicName systemTopicName = NamespaceEventsSystemTopicFactory
.getSystemTopicName(topicName.getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT_SEGMENT);
if (systemTopicName == null) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidTopicNameException(
"Can't create SystemTopicBaseTxnBufferSnapshotSegmentService, "
+ "because the topicName is null!"));
}
return CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
(v) -> namespaceEventsSystemTopicFactory
.createTransactionBufferSnapshotSegmentSystemTopicClient(topicName.getNamespaceObject(),
this)));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,29 @@
*/
package org.apache.pulsar.broker.service;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;

public class SystemTopicBaseTxnBufferSnapshotService implements TransactionBufferSnapshotService {

private final Map<TopicName, SystemTopicClient<TransactionBufferSnapshot>> clients;
public class SystemTopicBaseTxnBufferSnapshotService
extends SystemTopicBaseTxnBufferSnapshotBaseService<TransactionBufferSnapshot> {

private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;

public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) {
super(new ConcurrentHashMap<>());
this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
this.clients = new ConcurrentHashMap<>();
}

@Override
public CompletableFuture<Writer<TransactionBufferSnapshot>> createWriter(TopicName topicName) {
return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
}

private CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>> getTransactionBufferSystemTopicClient(
protected CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>> getTransactionBufferSystemTopicClient(
TopicName topicName) {
TopicName systemTopicName = NamespaceEventsSystemTopicFactory
.getSystemTopicName(topicName.getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
Expand All @@ -62,25 +53,4 @@ private CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>> getTrans
(v) -> namespaceEventsSystemTopicFactory
.createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(), this)));
}

@Override
public CompletableFuture<Reader<TransactionBufferSnapshot>> createReader(TopicName topicName) {
return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
}

@Override
public void removeClient(TopicName topicName,
TransactionBufferSystemTopicClient transactionBufferSystemTopicClient) {
if (transactionBufferSystemTopicClient.getReaders().size() == 0
&& transactionBufferSystemTopicClient.getWriters().size() == 0) {
clients.remove(topicName);
}
}

@Override
public void close() throws Exception {
for (Map.Entry<TopicName, SystemTopicClient<TransactionBufferSnapshot>> entry : clients.entrySet()) {
entry.getValue().close();
}
}
}
Loading