From 85c4d9e9f05c0ebab4471fe574cd02edc057db62 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Mon, 27 Jan 2025 16:25:09 +0100 Subject: [PATCH 1/5] Fix problems and race conditions with topology fetching - preclude TCM of being behind Accord if newer epoch is reported via withEpoch/fetchTopologyInternal - improve topology discovery during first boot and replay - fix races between config service TCM listener reporting topologies, and fetched topologies during Patch by Alex Petrov, reviewed by Ariel Weisberg for CASSANDRA-20245 --- .../accord/AccordConfigurationService.java | 20 ++- .../service/accord/AccordService.java | 124 +++++++++++------- .../service/accord/FetchMinEpoch.java | 74 +++-------- .../service/accord/FetchTopology.java | 37 +++--- .../service/accord/IAccordService.java | 9 +- .../service/accord/FetchMinEpochTest.java | 94 ++++--------- 6 files changed, 149 insertions(+), 209 deletions(-) diff --git a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java index 0dbf99635a88..a7673b6f1247 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java @@ -213,6 +213,7 @@ public EpochDiskState truncateTopologyUntil(long epoch, EpochDiskState diskState } } + // TODO: should not be public public final ChangeListener listener = new MetadataChangeListener(); private class MetadataChangeListener implements ChangeListener { @@ -267,8 +268,6 @@ public synchronized void start() Map removedNodes = mapping.removedNodes(); for (Map.Entry e : removedNodes.entrySet()) onNodeRemoved(e.getValue(), currentTopology(), e.getKey()); - - ClusterMetadataService.instance().log().addListener(listener); } @Override @@ -416,7 +415,11 @@ void maybeReportMetadata(ClusterMetadata metadata) long epoch = metadata.epoch.getEpoch(); synchronized (epochs) { - if (epochs.maxEpoch() == 0) + long maxEpoch = epochs.maxEpoch(); + if (maxEpoch >= epoch) + return; + + if (maxEpoch == 0) { getOrCreateEpochState(epoch); // touch epoch state so subsequent calls see it reportMetadata(metadata); @@ -440,9 +443,7 @@ protected void fetchTopologyInternal(long epoch) if (peers.isEmpty()) return; Topology topology; - while ((topology = FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get()) == null) - { - } + while ((topology = FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get()) == null) {} reportTopology(topology); } catch (InterruptedException e) @@ -461,6 +462,13 @@ protected void fetchTopologyInternal(long epoch) }); } + @Override + public void reportTopology(Topology topology, boolean isLoad, boolean startSync) + { + Invariants.require(topology.epoch() <= ClusterMetadata.current().epoch.getEpoch()); + super.reportTopology(topology, isLoad, startSync); + } + @Override protected void localSyncComplete(Topology topology, boolean startSync) { diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 005b856005b0..bd30d105cecc 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -22,13 +22,10 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -44,7 +41,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; -import com.google.common.collect.Sets; import com.google.common.primitives.Ints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -138,7 +134,6 @@ import org.apache.cassandra.net.MessageDelivery; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.SharedContext; -import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; @@ -165,7 +160,6 @@ import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.membership.NodeId; -import org.apache.cassandra.tcm.ownership.DataPlacement; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.utils.Blocking; @@ -379,24 +373,33 @@ public synchronized void startup() node.commandStores().restoreShardStateUnsafe(topology -> configService.reportTopology(topology, true, true)); configService.start(); - long minEpoch = fetchMinEpoch(); - if (minEpoch >= 0) + try { - for (long epoch = minEpoch; epoch <= metadata.epoch.getEpoch(); epoch++) - node.configService().fetchTopologyForEpoch(epoch); + // Fetch topologies up to current + List topologies = fetchTopologies(0, metadata); + for (Topology topology : topologies) + configService.reportTopology(topology); - try - { - epochReady(metadata.epoch).get(DatabaseDescriptor.getTransactionTimeout(MILLISECONDS), MILLISECONDS); - } - catch (InterruptedException e) - { - throw new UncheckedInterruptedException(e); - } - catch (ExecutionException | TimeoutException e) + ClusterMetadataService.instance().log().addListener(configService.listener); + ClusterMetadata next = ClusterMetadata.current(); + + // if metadata was updated before we were able to add a listener, fetch remaining topologies + if (metadata != next) { - throw new RuntimeException(e); + topologies = fetchTopologies(metadata.epoch.getEpoch(), next); + for (Topology topology : topologies) + configService.reportTopology(topology); } + + epochReady(metadata.epoch).get(); + } + catch (InterruptedException e) + { + throw new UncheckedInterruptedException(e); + } + catch (ExecutionException e) + { + throw new RuntimeException(e); } fastPathCoordinator.start(); @@ -412,48 +415,67 @@ public synchronized void startup() } /** - * Queries peers to discover min epoch + * Queries peers to discover min epoch, and then fetches all topologies between min and current epochs */ - private long fetchMinEpoch() + private List fetchTopologies(long minEpoch, ClusterMetadata metadata) throws ExecutionException, InterruptedException { - ClusterMetadata metadata = ClusterMetadata.current(); - Map> peers = new HashMap<>(); - for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces()) + if (configService.maxEpoch() >= metadata.epoch.getEpoch()) { - List tables = keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList()); - if (tables.isEmpty()) - continue; - DataPlacement current = metadata.placements.get(keyspace.params.replication); - DataPlacement settled = metadata.writePlacementAllSettled(keyspace); - Sets.SetView alive = Sets.intersection(settled.writes.byEndpoint().keySet(), current.writes.byEndpoint().keySet()); - InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort(); - settled.writes.forEach((range, group) -> { - if (group.endpoints().contains(self)) - { - for (InetAddressAndPort peer : group.endpoints()) - { - if (peer.equals(self) || !alive.contains(peer)) continue; - for (TableMetadata table : tables) - peers.computeIfAbsent(peer, i -> new HashSet<>()).add(AccordTopology.fullRange(table.id)); - } - } - }); + logger.info("Accord epoch {} matches TCM. All topologies are known locally", metadata.epoch); + return Collections.singletonList(AccordTopology.createAccordTopology(metadata)); } + + Set peers = new HashSet<>(); + peers.addAll(metadata.directory.allAddresses()); + peers.remove(FBUtilities.getBroadcastAddressAndPort()); + + // No peers: single node cluster or first node to boot if (peers.isEmpty()) - return -1; + return Collections.singletonList(AccordTopology.createAccordTopology(metadata));; + + // Bootstrap, fetch min epoch + if (minEpoch == 0) + { + long fetched = findMinEpoch(SharedContext.Global.instance, peers); + // No other node has advanced epoch just yet + if (fetched == 0) + return Collections.singletonList(AccordTopology.createAccordTopology(metadata)); + + minEpoch = fetched; + } + + long maxEpoch = metadata.epoch.getEpoch(); + + // If we are behind minEpoch, catch up to at least minEpoch + if (metadata.epoch.getEpoch() < minEpoch) + { + minEpoch = metadata.epoch.getEpoch(); + maxEpoch = minEpoch; + } + + List> futures = new ArrayList<>(); + logger.info("Discovered min epoch of {}. Proceeding to fetch epochs up to {}.", minEpoch, maxEpoch); + + for (long epoch = minEpoch; epoch <= maxEpoch; epoch++) + futures.add(FetchTopology.fetch(SharedContext.Global.instance, peers, epoch)); + + FBUtilities.waitOnFutures(futures); + List topologies = new ArrayList<>(futures.size()); + for (Future future : futures) + topologies.add(future.get()); - Long minEpoch = findMinEpoch(SharedContext.Global.instance, peers); - if (minEpoch == null) - return -1; - return minEpoch; + return topologies; } @VisibleForTesting - static Long findMinEpoch(SharedContext context, Map> peers) + static long findMinEpoch(SharedContext context, Set peers) { try { - return FetchMinEpoch.fetch(context, peers).get(); + Long result = FetchMinEpoch.fetch(context, peers).get(); + if (result == null) + return 0L; + return result.longValue(); } catch (InterruptedException e) { @@ -1152,7 +1174,7 @@ private static CommandStoreTxnBlockedGraph.TxnState populate(CommandStoreTxnBloc @Nullable @Override - public Long minEpoch(Collection ranges) + public Long minEpoch() { return node.topology().minEpoch(); } diff --git a/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java b/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java index ef670c572e04..9febcb060953 100644 --- a/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java +++ b/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java @@ -20,9 +20,7 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Set; import javax.annotation.Nullable; @@ -38,7 +36,6 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.IVerbHandler; -import org.apache.cassandra.net.MessageDelivery; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.Verb; import org.apache.cassandra.repair.SharedContext; @@ -53,40 +50,32 @@ // TODO (required, efficiency): this can be simplified: we seem to always use "entire range" public class FetchMinEpoch { + private static final FetchMinEpoch instance = new FetchMinEpoch(); + public static final IVersionedSerializer serializer = new IVersionedSerializer<>() { @Override - public void serialize(FetchMinEpoch t, DataOutputPlus out, int version) throws IOException + public void serialize(FetchMinEpoch t, DataOutputPlus out, int version) { - out.writeUnsignedVInt32(t.ranges.size()); - for (TokenRange range : t.ranges) - TokenRange.serializer.serialize(range, out, version); } @Override - public FetchMinEpoch deserialize(DataInputPlus in, int version) throws IOException + public FetchMinEpoch deserialize(DataInputPlus in, int version) { - int size = in.readUnsignedVInt32(); - List ranges = new ArrayList<>(size); - for (int i = 0; i < size; i++) - ranges.add(TokenRange.serializer.deserialize(in, version)); - return new FetchMinEpoch(ranges); + return FetchMinEpoch.instance; } @Override public long serializedSize(FetchMinEpoch t, int version) { - long size = TypeSizes.sizeofUnsignedVInt(t.ranges.size()); - for (TokenRange range : t.ranges) - size += TokenRange.serializer.serializedSize(range, version); - return size; + return 0; } }; public static final IVerbHandler handler = message -> { if (AccordService.started()) { - Long epoch = AccordService.instance().minEpoch(message.payload.ranges); + Long epoch = AccordService.instance().minEpoch(); MessagingService.instance().respond(new Response(epoch), message); } else @@ -96,41 +85,15 @@ public long serializedSize(FetchMinEpoch t, int version) } }; - public final Collection ranges; - - public FetchMinEpoch(Collection ranges) - { - this.ranges = ranges; - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - FetchMinEpoch that = (FetchMinEpoch) o; - return Objects.equals(ranges, that.ranges); - } - - @Override - public int hashCode() - { - return Objects.hash(ranges); - } - - @Override - public String toString() + private FetchMinEpoch() { - return "FetchMinEpoch{" + - "ranges=" + ranges + - '}'; } - public static Future fetch(SharedContext context, Map> peers) + public static Future fetch(SharedContext context, Set peers) { List> accum = new ArrayList<>(peers.size()); - for (Map.Entry> e : peers.entrySet()) - accum.add(fetch(context, e.getKey(), e.getValue())); + for (InetAddressAndPort peer : peers) + accum.add(fetch(context, peer)); // TODO (required): we are collecting only successes, but we need some threshold return FutureCombiner.successfulOf(accum).map(epochs -> { Long min = null; @@ -145,21 +108,22 @@ public static Future fetch(SharedContext context, Map fetch(SharedContext context, InetAddressAndPort to, Set value) + static Future fetch(SharedContext context, InetAddressAndPort to) { - FetchMinEpoch req = new FetchMinEpoch(value); - return context.messaging().sendWithRetries(Backoff.NO_OP.INSTANCE, - MessageDelivery.ImmediateRetryScheduler.instance, - Verb.ACCORD_FETCH_MIN_EPOCH_REQ, req, + Backoff backoff = Backoff.fromConfig(context, DatabaseDescriptor.getAccord().minEpochSyncRetry); + return context.messaging().sendWithRetries(backoff, + context.optionalTasks()::schedule, + Verb.ACCORD_FETCH_MIN_EPOCH_REQ, + FetchMinEpoch.instance, Iterators.cycle(to), - RetryPredicate.times(DatabaseDescriptor.getAccord().minEpochSyncRetry.maxAttempts.value), + RetryPredicate.ALWAYS_RETRY, RetryErrorMessage.EMPTY) .map(m -> m.payload.minEpoch); } public static class Response { - public static final IVersionedSerializer serializer = new IVersionedSerializer() + public static final IVersionedSerializer serializer = new IVersionedSerializer<>() { @Override public void serialize(Response t, DataOutputPlus out, int version) throws IOException diff --git a/src/java/org/apache/cassandra/service/accord/FetchTopology.java b/src/java/org/apache/cassandra/service/accord/FetchTopology.java index d40d2654ea25..f6c1a518c1f3 100644 --- a/src/java/org/apache/cassandra/service/accord/FetchTopology.java +++ b/src/java/org/apache/cassandra/service/accord/FetchTopology.java @@ -21,7 +21,11 @@ import java.io.IOException; import java.util.Collection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import accord.topology.Topology; +import org.apache.cassandra.exceptions.RequestFailure; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -37,6 +41,7 @@ public class FetchTopology { + private static final Logger log = LoggerFactory.getLogger(FetchTopology.class); private final long epoch; public static final IVersionedSerializer serializer = new IVersionedSerializer<>() @@ -67,12 +72,10 @@ public FetchTopology(long epoch) public static class Response { - private static Response UNKNOWN = new Response(-1, null) { - public String toString() - { - return "UNKNOWN_TOPOLOGY{}"; - } - }; + private static Response unkonwn(long epoch) + { + throw new IllegalStateException("Unknown topology: " + epoch); + } // TODO (required): messaging version after version patch public static final IVersionedSerializer serializer = new IVersionedSerializer<>() @@ -80,11 +83,6 @@ public String toString() @Override public void serialize(Response t, DataOutputPlus out, int version) throws IOException { - if (t == UNKNOWN) - { - out.writeLong(-1); - return; - } out.writeLong(t.epoch); TopologySerializers.topology.serialize(t.topology, out, version); } @@ -93,8 +91,6 @@ public void serialize(Response t, DataOutputPlus out, int version) throws IOExce public Response deserialize(DataInputPlus in, int version) throws IOException { long epoch = in.readLong(); - if (epoch == -1) - return UNKNOWN; Topology topology = TopologySerializers.topology.deserialize(in, version); return new Response(epoch, topology); } @@ -102,9 +98,6 @@ public Response deserialize(DataInputPlus in, int version) throws IOException @Override public long serializedSize(Response t, int version) { - if (t == UNKNOWN) - return Long.BYTES; - return Long.BYTES + TopologySerializers.topology.serializedSize(t.topology, version); } }; @@ -123,18 +116,20 @@ public Response(long epoch, Topology topology) long epoch = message.payload.epoch; Topology topology = AccordService.instance().topology().maybeGlobalForEpoch(epoch); if (topology == null) - MessagingService.instance().respond(Response.UNKNOWN, message); + MessagingService.instance().respond(Response.unkonwn(epoch), message); else MessagingService.instance().respond(new Response(epoch, topology), message); }; + private static final Logger logger = LoggerFactory.getLogger(FetchTopology.class); + public static Future fetch(SharedContext context, Collection peers, long epoch) { FetchTopology req = new FetchTopology(epoch); - return context.messaging().sendWithRetries(Verb.ACCORD_FETCH_TOPOLOGY_REQ, req, MessagingUtils.tryAliveFirst(SharedContext.Global.instance, peers), - // If the epoch is already discovered, no need to retry - (attempt, from, failure) -> AccordService.instance().currentEpoch() < epoch, - MessageDelivery.RetryErrorMessage.EMPTY) + return context.messaging().sendWithRetries(Verb.ACCORD_FETCH_TOPOLOGY_REQ, req, + MessagingUtils.tryAliveFirst(SharedContext.Global.instance, peers), + (attempt, from, failure) -> true, + MessageDelivery.RetryErrorMessage.EMPTY) .map(m -> m.payload.topology); } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java b/src/java/org/apache/cassandra/service/accord/IAccordService.java index 14fb94b6b453..865473d76c48 100644 --- a/src/java/org/apache/cassandra/service/accord/IAccordService.java +++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java @@ -18,7 +18,6 @@ package org.apache.cassandra.service.accord; -import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.List; @@ -180,7 +179,7 @@ public CompactionInfo(Int2ObjectHashMap redundantBefores, Int2O List debugTxnBlockedGraph(TxnId txnId); @Nullable - Long minEpoch(Collection ranges); + Long minEpoch(); void tryMarkRemoved(Topology topology, Node.Id node); void awaitTableDrop(TableId id); @@ -326,7 +325,7 @@ public List debugTxnBlockedGraph(TxnId txnId) @Nullable @Override - public Long minEpoch(Collection ranges) + public Long minEpoch() { return null; } @@ -513,9 +512,9 @@ public List debugTxnBlockedGraph(TxnId txnId) @Nullable @Override - public Long minEpoch(Collection ranges) + public Long minEpoch() { - return delegate.minEpoch(ranges); + return delegate.minEpoch(); } @Override diff --git a/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java b/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java index 7bfe09b15f66..a8b573c08a49 100644 --- a/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java +++ b/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.service.accord; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -28,6 +27,7 @@ import java.util.stream.Stream; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.junit.Assert; import org.junit.Test; @@ -36,18 +36,13 @@ import accord.utils.RandomSource; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.RetrySpec; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; -import org.apache.cassandra.exceptions.RequestFailure; import org.apache.cassandra.io.IVersionedSerializers; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageDelivery; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.SimulatedMessageDelivery.Action; -import org.apache.cassandra.service.accord.api.AccordRoutingKey.RoutingKeyKind; -import org.apache.cassandra.utils.AccordGenerators; -import org.apache.cassandra.utils.CassandraGenerators; import org.apache.cassandra.utils.SimulatedMiniCluster; import org.apache.cassandra.utils.SimulatedMiniCluster.Node; import org.apache.cassandra.utils.concurrent.Future; @@ -55,7 +50,6 @@ import static accord.utils.Property.qt; import static org.apache.cassandra.net.MessagingService.Version.VERSION_51; -import static org.apache.cassandra.utils.AccordGenerators.fromQT; import static org.assertj.core.api.Assertions.assertThat; public class FetchMinEpochTest @@ -74,24 +68,6 @@ private static void boundedRetries(int retries) DatabaseDescriptor.getAccord().minEpochSyncRetry.maxAttempts = new RetrySpec.MaxAttempt(retries); } - @Test - public void requestSerde() - { - DataOutputBuffer output = new DataOutputBuffer(); - Gen gen = fromQT(CassandraGenerators.partitioners()) - .map(CassandraGenerators::simplify) - .flatMap(partitioner -> - Gens.lists(AccordGenerators.range(partitioner) - .map(r -> (TokenRange) r)) - .ofSizeBetween(0, 10) - .map(FetchMinEpoch::new)); - qt().forAll(gen).check(req -> { - maybeSetPartitioner(req); - for (MessagingService.Version version : SUPPORTED) - IVersionedSerializers.testSerde(output, FetchMinEpoch.serializer, req, version.value); - }); - } - @Test public void responseSerde() { @@ -115,12 +91,12 @@ public void fetchOneNodeAlwaysFails() Node from = cluster.createNodeAndJoin(); Node to = cluster.createNodeAndJoin(); - Future f = FetchMinEpoch.fetch(from, to.broadcastAddressAndPort(), Collections.emptySet()); + Future f = FetchMinEpoch.fetch(from, to.broadcastAddressAndPort()); assertThat(f).isNotDone(); cluster.processAll(); assertThat(f).isDone(); - MessageDelivery.FailedResponseException maxRetries = getFailedResponseException(f); - Assertions.assertThat(maxRetries.failure).isEqualTo(RequestFailure.TIMEOUT); + MessageDelivery.MaxRetriesException maxRetries = getMaxRetriesException(f); + Assertions.assertThat(maxRetries.attempts).isEqualTo(expectedMaxAttempts); }); } @@ -139,7 +115,7 @@ public void fetchOneNode() } Node to = cluster.createNodeAndJoin(); - Future f = FetchMinEpoch.fetch(from, to.broadcastAddressAndPort(), Collections.emptySet()); + Future f = FetchMinEpoch.fetch(from, to.broadcastAddressAndPort()); assertThat(f).isNotDone(); cluster.processAll(); assertThat(f).isDone(); @@ -161,10 +137,10 @@ public void fetchManyNodesAllNodesFail() Node to3 = cluster.createNodeAndJoin(); Node to4 = cluster.createNodeAndJoin(); - Future f = FetchMinEpoch.fetch(from, ImmutableMap.of(to1.broadcastAddressAndPort(), Collections.emptySet(), - to2.broadcastAddressAndPort(), Collections.emptySet(), - to3.broadcastAddressAndPort(), Collections.emptySet(), - to4.broadcastAddressAndPort(), Collections.emptySet())); + Future f = FetchMinEpoch.fetch(from, ImmutableSet.of(to1.broadcastAddressAndPort(), + to2.broadcastAddressAndPort(), + to3.broadcastAddressAndPort(), + to4.broadcastAddressAndPort())); assertThat(f).isNotDone(); cluster.processAll(); assertThat(f).isDone(); @@ -201,10 +177,10 @@ public void fetchManyNodes() to4.broadcastAddressAndPort(), actionGen(rs, maxRetries)); from.messagingActions((self, msg, to) -> nodeToActions.get(to).get()); - Future f = FetchMinEpoch.fetch(from, ImmutableMap.of(to1.broadcastAddressAndPort(), Collections.emptySet(), - to2.broadcastAddressAndPort(), Collections.emptySet(), - to3.broadcastAddressAndPort(), Collections.emptySet(), - to4.broadcastAddressAndPort(), Collections.emptySet())); + Future f = FetchMinEpoch.fetch(from, ImmutableSet.of(to1.broadcastAddressAndPort(), + to2.broadcastAddressAndPort(), + to3.broadcastAddressAndPort(), + to4.broadcastAddressAndPort())); assertThat(f).isNotDone(); cluster.processAll(); assertThat(f).isDone(); @@ -235,34 +211,10 @@ public Action next(RandomSource rng) return safeActionGen.asSupplier(actionSource); } - private static void maybeSetPartitioner(FetchMinEpoch req) - { - IPartitioner partitioner = null; - for (TokenRange r : req.ranges) - { - IPartitioner rangePartitioner = null; - if (r.start().kindOfRoutingKey() != RoutingKeyKind.SENTINEL) - rangePartitioner = r.start().token().getPartitioner(); - if (rangePartitioner == null && r.end().kindOfRoutingKey() != RoutingKeyKind.SENTINEL) - rangePartitioner = r.end().token().getPartitioner(); - if (rangePartitioner == null) - continue; - if (partitioner == null) - { - partitioner = rangePartitioner; - } - else - { - Assertions.assertThat(rangePartitioner).isEqualTo(partitioner); - } - } - if (partitioner != null) - DatabaseDescriptor.setPartitionerUnsafe(partitioner); - } - private static MessageDelivery.FailedResponseException getFailedResponseException(Future f) throws InterruptedException, ExecutionException + private static MessageDelivery.MaxRetriesException getMaxRetriesException(Future f) throws InterruptedException, ExecutionException { - MessageDelivery.FailedResponseException exception; + MessageDelivery.MaxRetriesException maxRetries; try { f.get(); @@ -271,21 +223,21 @@ private static MessageDelivery.FailedResponseException getFailedResponseExceptio } catch (ExecutionException e) { - if (e.getCause() instanceof MessageDelivery.FailedResponseException) + if (e.getCause() instanceof MessageDelivery.MaxRetriesException) { - exception = (MessageDelivery.FailedResponseException) e.getCause(); + maxRetries = (MessageDelivery.MaxRetriesException) e.getCause(); } else { throw e; } } - return exception; + return maxRetries; } - private static MessageDelivery.MaxRetriesException getMaxRetriesException(Future f) throws InterruptedException, ExecutionException + private static MessageDelivery.FailedResponseException getFailedResponseException(Future f) throws InterruptedException, ExecutionException { - MessageDelivery.MaxRetriesException maxRetries; + MessageDelivery.FailedResponseException exception; try { f.get(); @@ -294,15 +246,15 @@ private static MessageDelivery.MaxRetriesException getMaxRetriesException(Future } catch (ExecutionException e) { - if (e.getCause() instanceof MessageDelivery.MaxRetriesException) + if (e.getCause() instanceof MessageDelivery.FailedResponseException) { - maxRetries = (MessageDelivery.MaxRetriesException) e.getCause(); + exception = (MessageDelivery.FailedResponseException) e.getCause(); } else { throw e; } } - return maxRetries; + return exception; } } \ No newline at end of file From 6be92d29286b9bc167cd9a6ee895a251969f80f7 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Wed, 29 Jan 2025 19:53:59 +0100 Subject: [PATCH 2/5] Address Ariel's and David's comments --- .gitmodules | 4 +- modules/accord | 2 +- .../apache/cassandra/config/AccordSpec.java | 14 +++--- .../apache/cassandra/net/MessageDelivery.java | 8 --- .../apache/cassandra/net/MessagingUtils.java | 16 +++++- .../accord/AccordConfigurationService.java | 25 +++++++--- .../service/accord/AccordService.java | 49 ++++++++++++------- .../service/accord/FetchTopology.java | 42 +++++++++------- .../service/accord/FetchMinEpochTest.java | 23 --------- 9 files changed, 96 insertions(+), 87 deletions(-) diff --git a/.gitmodules b/.gitmodules index 616dacf610a7..a5685a541e16 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "modules/accord"] path = modules/accord - url = https://github.com/apache/cassandra-accord.git - branch = trunk + url = https://github.com/ifesdjeen/cassandra-accord.git + branch = CASSANDRA-20245 diff --git a/modules/accord b/modules/accord index c7a789b1f424..380d741c8d75 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit c7a789b1f424771a4befab6bcb91edd4ab5d7198 +Subproject commit 380d741c8d75f6d5c14e0a627c129762bcec9385 diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java b/src/java/org/apache/cassandra/config/AccordSpec.java index 720a2db7722d..e6f954cb26d2 100644 --- a/src/java/org/apache/cassandra/config/AccordSpec.java +++ b/src/java/org/apache/cassandra/config/AccordSpec.java @@ -194,15 +194,17 @@ public enum TransactionalRangeMigration public boolean ephemeralReadEnabled = true; public boolean state_cache_listener_jfr_enabled = true; public final JournalSpec journal = new JournalSpec(); - public final MinEpochRetrySpec minEpochSyncRetry = new MinEpochRetrySpec(); - - public static class MinEpochRetrySpec extends RetrySpec - { - public MinEpochRetrySpec() + public final RetrySpec minEpochSyncRetry = new RetrySpec() { { maxAttempts = new MaxAttempt(3); } - } + }; + + public final RetrySpec fetchRetry = new RetrySpec() { + { + maxAttempts = new MaxAttempt(100); + } + }; public static class JournalSpec implements Params { diff --git a/src/java/org/apache/cassandra/net/MessageDelivery.java b/src/java/org/apache/cassandra/net/MessageDelivery.java index 9dc374750f60..f9439dbb30d2 100644 --- a/src/java/org/apache/cassandra/net/MessageDelivery.java +++ b/src/java/org/apache/cassandra/net/MessageDelivery.java @@ -99,14 +99,6 @@ public default Future> sendWithRetries(Backoff backoff, return promise; } - public default Future> sendWithRetries(Verb verb, REQ request, - Iterator candidates, - RetryPredicate shouldRetry, - RetryErrorMessage errorMessage) - { - return sendWithRetries(Backoff.NO_OP.INSTANCE, ImmediateRetryScheduler.instance, verb, request, candidates, shouldRetry, errorMessage); - } - public default void sendWithRetries(Backoff backoff, RetryScheduler retryThreads, Verb verb, REQ request, Iterator candidates, diff --git a/src/java/org/apache/cassandra/net/MessagingUtils.java b/src/java/org/apache/cassandra/net/MessagingUtils.java index 2190eaf3a655..11735f8d7f66 100644 --- a/src/java/org/apache/cassandra/net/MessagingUtils.java +++ b/src/java/org/apache/cassandra/net/MessagingUtils.java @@ -18,22 +18,31 @@ package org.apache.cassandra.net; +import java.util.Collection; import java.util.Iterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.SharedContext; public class MessagingUtils { + private static final Logger logger = LoggerFactory.getLogger(MessagingUtils.class); + /** * Candidate iterator that would try all endpoints known to be alive first, and then try all endpoints * in a round-robin manner. + *

+ * Calls onIteration every time after exhausting the peers. */ - public static Iterator tryAliveFirst(SharedContext context, Iterable peers) + public static Iterator tryAliveFirst(SharedContext context, Collection peers, String verb) { return new Iterator<>() { boolean firstRun = true; + int attempt = 0; Iterator iter = peers.iterator(); boolean isEmpty = !iter.hasNext(); @@ -58,10 +67,13 @@ public InetAddressAndPort next() // After that, cycle through all nodes if (!iter.hasNext()) + { + logger.warn("Exhausted iterator on {} cycling through the set of peers: {} attempt #{}", verb, peers, attempt++); iter = peers.iterator(); + } return iter.next(); } }; } -} +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java index a7673b6f1247..6aa4888e3a15 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java @@ -416,9 +416,6 @@ void maybeReportMetadata(ClusterMetadata metadata) synchronized (epochs) { long maxEpoch = epochs.maxEpoch(); - if (maxEpoch >= epoch) - return; - if (maxEpoch == 0) { getOrCreateEpochState(epoch); // touch epoch state so subsequent calls see it @@ -426,7 +423,10 @@ void maybeReportMetadata(ClusterMetadata metadata) return; } } - getOrCreateEpochState(epoch - 1).acknowledged().addCallback(() -> reportMetadata(metadata)); + + // Create a -1 epoch iif we know this epoch may actually exist + if (metadata.epoch.getEpoch() > minEpoch()) + getOrCreateEpochState(epoch - 1).acknowledged().addCallback(() -> reportMetadata(metadata)); } @Override @@ -436,14 +436,25 @@ protected void fetchTopologyInternal(long epoch) Stage.ACCORD_MIGRATION.execute(() -> { if (ClusterMetadata.current().epoch.getEpoch() < epoch) ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(epoch)); + + // In most cases, after fetching log from CMS, we will be caught up to the required epoch. + // This TCM will also notify Accord via reportMetadata, so we do not need to fetch topologies. + // If metadata has reported has skipped one or more eopchs, and is _ahead_ of the requested epoch, + // we need to fetch topologies from peers to fill in the gap. + ClusterMetadata metadata = ClusterMetadata.current(); + if (metadata.epoch.getEpoch() == epoch) + return; + try { - Set peers = new HashSet<>(ClusterMetadata.current().directory.allJoinedEndpoints()); + Set peers = new HashSet<>(metadata.directory.allJoinedEndpoints()); peers.remove(FBUtilities.getBroadcastAddressAndPort()); if (peers.isEmpty()) return; - Topology topology; - while ((topology = FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get()) == null) {} + + // TODO (required): fetch only _missing_ topologies. + Topology topology = FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get(); + Invariants.require(topology.epoch() == epoch); reportTopology(topology); } catch (InterruptedException e) diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index bd30d105cecc..6b9c42dd9f56 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -376,7 +376,7 @@ public synchronized void startup() try { // Fetch topologies up to current - List topologies = fetchTopologies(0, metadata); + List topologies = fetchTopologies(null, metadata); for (Topology topology : topologies) configService.reportTopology(topology); @@ -384,17 +384,31 @@ public synchronized void startup() ClusterMetadata next = ClusterMetadata.current(); // if metadata was updated before we were able to add a listener, fetch remaining topologies - if (metadata != next) + if (next.epoch.isAfter(metadata.epoch)) { - topologies = fetchTopologies(metadata.epoch.getEpoch(), next); + topologies = fetchTopologies(metadata.epoch.getEpoch() + 1, next); for (Topology topology : topologies) configService.reportTopology(topology); } - epochReady(metadata.epoch).get(); + int attempt = 0; + int waitSeconds = 5; + while (true) + { + try + { + epochReady(metadata.epoch).get(5, SECONDS); + break; + } + catch (TimeoutException e) + { + logger.warn("Epoch {} is not ready after waiting for {} seconds", metadata.epoch, (++attempt) * waitSeconds); + } + } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new UncheckedInterruptedException(e); } catch (ExecutionException e) @@ -417,13 +431,10 @@ public synchronized void startup() /** * Queries peers to discover min epoch, and then fetches all topologies between min and current epochs */ - private List fetchTopologies(long minEpoch, ClusterMetadata metadata) throws ExecutionException, InterruptedException + private List fetchTopologies(Long minEpoch, ClusterMetadata metadata) throws ExecutionException, InterruptedException { - if (configService.maxEpoch() >= metadata.epoch.getEpoch()) - { - logger.info("Accord epoch {} matches TCM. All topologies are known locally", metadata.epoch); + if (minEpoch != null && minEpoch == metadata.epoch.getEpoch()) return Collections.singletonList(AccordTopology.createAccordTopology(metadata)); - } Set peers = new HashSet<>(); peers.addAll(metadata.directory.allAddresses()); @@ -431,14 +442,17 @@ private List fetchTopologies(long minEpoch, ClusterMetadata metadata) // No peers: single node cluster or first node to boot if (peers.isEmpty()) - return Collections.singletonList(AccordTopology.createAccordTopology(metadata));; + return Collections.singletonList(AccordTopology.createAccordTopology(metadata)); // Bootstrap, fetch min epoch - if (minEpoch == 0) + if (minEpoch == null) { - long fetched = findMinEpoch(SharedContext.Global.instance, peers); + Long fetched = findMinEpoch(SharedContext.Global.instance, peers); + if (fetched != null) + logger.info("Discovered min epoch of {} by querying {}", fetched, peers); + // No other node has advanced epoch just yet - if (fetched == 0) + if (fetched == null || fetched == metadata.epoch.getEpoch()) return Collections.singletonList(AccordTopology.createAccordTopology(metadata)); minEpoch = fetched; @@ -454,7 +468,7 @@ private List fetchTopologies(long minEpoch, ClusterMetadata metadata) } List> futures = new ArrayList<>(); - logger.info("Discovered min epoch of {}. Proceeding to fetch epochs up to {}.", minEpoch, maxEpoch); + logger.info("Fetching topologies for epochs [{}, {}].", minEpoch, maxEpoch); for (long epoch = minEpoch; epoch <= maxEpoch; epoch++) futures.add(FetchTopology.fetch(SharedContext.Global.instance, peers, epoch)); @@ -468,14 +482,11 @@ private List fetchTopologies(long minEpoch, ClusterMetadata metadata) } @VisibleForTesting - static long findMinEpoch(SharedContext context, Set peers) + static Long findMinEpoch(SharedContext context, Set peers) { try { - Long result = FetchMinEpoch.fetch(context, peers).get(); - if (result == null) - return 0L; - return result.longValue(); + return FetchMinEpoch.fetch(context, peers).get(); } catch (InterruptedException e) { diff --git a/src/java/org/apache/cassandra/service/accord/FetchTopology.java b/src/java/org/apache/cassandra/service/accord/FetchTopology.java index f6c1a518c1f3..08af6cb1fc8a 100644 --- a/src/java/org/apache/cassandra/service/accord/FetchTopology.java +++ b/src/java/org/apache/cassandra/service/accord/FetchTopology.java @@ -21,11 +21,8 @@ import java.io.IOException; import java.util.Collection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import accord.topology.Topology; -import org.apache.cassandra.exceptions.RequestFailure; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -37,11 +34,18 @@ import org.apache.cassandra.net.Verb; import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.service.accord.serializers.TopologySerializers; +import org.apache.cassandra.utils.Backoff; import org.apache.cassandra.utils.concurrent.Future; public class FetchTopology { - private static final Logger log = LoggerFactory.getLogger(FetchTopology.class); + public String toString() + { + return "FetchTopology{" + + "epoch=" + epoch + + '}'; + } + private final long epoch; public static final IVersionedSerializer serializer = new IVersionedSerializer<>() @@ -72,11 +76,6 @@ public FetchTopology(long epoch) public static class Response { - private static Response unkonwn(long epoch) - { - throw new IllegalStateException("Unknown topology: " + epoch); - } - // TODO (required): messaging version after version patch public static final IVersionedSerializer serializer = new IVersionedSerializer<>() { @@ -115,20 +114,25 @@ public Response(long epoch, Topology topology) public static final IVerbHandler handler = message -> { long epoch = message.payload.epoch; Topology topology = AccordService.instance().topology().maybeGlobalForEpoch(epoch); - if (topology == null) - MessagingService.instance().respond(Response.unkonwn(epoch), message); - else + if (topology != null) MessagingService.instance().respond(new Response(epoch, topology), message); + else + throw new IllegalStateException("Unknown topology: " + epoch); }; - private static final Logger logger = LoggerFactory.getLogger(FetchTopology.class); - public static Future fetch(SharedContext context, Collection peers, long epoch) { - FetchTopology req = new FetchTopology(epoch); - return context.messaging().sendWithRetries(Verb.ACCORD_FETCH_TOPOLOGY_REQ, req, - MessagingUtils.tryAliveFirst(SharedContext.Global.instance, peers), - (attempt, from, failure) -> true, + FetchTopology request = new FetchTopology(epoch); + Backoff backoff = Backoff.fromConfig(context, DatabaseDescriptor.getAccord().fetchRetry); + return context.messaging().sendWithRetries(backoff, + context.optionalTasks()::schedule, + Verb.ACCORD_FETCH_TOPOLOGY_REQ, + request, + MessagingUtils.tryAliveFirst(SharedContext.Global.instance, peers, Verb.ACCORD_FETCH_TOPOLOGY_REQ.name()), + (attempt, from, failure) -> { + System.out.println("Got " + failure + " from " + from + " while fetching " + request); + return true; + }, MessageDelivery.RetryErrorMessage.EMPTY) .map(m -> m.payload.topology); } diff --git a/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java b/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java index a8b573c08a49..f06df7e5e337 100644 --- a/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java +++ b/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java @@ -234,27 +234,4 @@ private static MessageDelivery.MaxRetriesException getMaxRetriesException(Future } return maxRetries; } - - private static MessageDelivery.FailedResponseException getFailedResponseException(Future f) throws InterruptedException, ExecutionException - { - MessageDelivery.FailedResponseException exception; - try - { - f.get(); - Assert.fail("Future should have failed"); - throw new AssertionError("Unreachable"); - } - catch (ExecutionException e) - { - if (e.getCause() instanceof MessageDelivery.FailedResponseException) - { - exception = (MessageDelivery.FailedResponseException) e.getCause(); - } - else - { - throw e; - } - } - return exception; - } } \ No newline at end of file From 5bdd1fb88b5fd3f02d387e64f37bab5c39af003b Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Fri, 31 Jan 2025 11:33:15 +0100 Subject: [PATCH 3/5] Address David's comments --- .../org/apache/cassandra/config/AccordSpec.java | 15 +++++++++++---- .../cassandra/exceptions/RequestFailure.java | 2 ++ .../exceptions/RequestFailureReason.java | 1 + .../apache/cassandra/net/MessageDelivery.java | 3 ++- .../accord/AccordConfigurationService.java | 4 ++-- .../cassandra/service/accord/AccordService.java | 2 +- .../cassandra/service/accord/FetchMinEpoch.java | 14 +++++++------- .../cassandra/service/accord/FetchTopology.java | 16 ++++++++-------- .../config/DatabaseDescriptorRefTest.java | 1 + 9 files changed, 35 insertions(+), 23 deletions(-) diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java b/src/java/org/apache/cassandra/config/AccordSpec.java index e6f954cb26d2..566a0d16ef7b 100644 --- a/src/java/org/apache/cassandra/config/AccordSpec.java +++ b/src/java/org/apache/cassandra/config/AccordSpec.java @@ -194,17 +194,24 @@ public enum TransactionalRangeMigration public boolean ephemeralReadEnabled = true; public boolean state_cache_listener_jfr_enabled = true; public final JournalSpec journal = new JournalSpec(); - public final RetrySpec minEpochSyncRetry = new RetrySpec() { + public final RetrySpec minEpochSyncRetry = new MinEpochRetrySpec(); + public final RetrySpec fetchRetry = new FetchRetrySpec(); + + public static class MinEpochRetrySpec extends RetrySpec + { + public MinEpochRetrySpec() { maxAttempts = new MaxAttempt(3); } - }; + } - public final RetrySpec fetchRetry = new RetrySpec() { + public static class FetchRetrySpec extends RetrySpec + { + public FetchRetrySpec() { maxAttempts = new MaxAttempt(100); } - }; + } public static class JournalSpec implements Params { diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailure.java b/src/java/org/apache/cassandra/exceptions/RequestFailure.java index 312102c01201..6946b812aa34 100644 --- a/src/java/org/apache/cassandra/exceptions/RequestFailure.java +++ b/src/java/org/apache/cassandra/exceptions/RequestFailure.java @@ -43,6 +43,7 @@ public class RequestFailure { public static final RequestFailure UNKNOWN = new RequestFailure(RequestFailureReason.UNKNOWN); + public static final RequestFailure UNKNOWN_TOPOLOGY = new RequestFailure(RequestFailureReason.UNKNOWN_TOPOLOGY); public static final RequestFailure READ_TOO_MANY_TOMBSTONES = new RequestFailure(RequestFailureReason.READ_TOO_MANY_TOMBSTONES); public static final RequestFailure TIMEOUT = new RequestFailure(RequestFailureReason.TIMEOUT); public static final RequestFailure INCOMPATIBLE_SCHEMA = new RequestFailure(RequestFailureReason.INCOMPATIBLE_SCHEMA); @@ -134,6 +135,7 @@ public static RequestFailure forReason(RequestFailureReason reason) { default: throw new IllegalStateException("Unhandled request failure reason " + reason); case UNKNOWN: return UNKNOWN; + case UNKNOWN_TOPOLOGY: return UNKNOWN_TOPOLOGY; case READ_TOO_MANY_TOMBSTONES: return READ_TOO_MANY_TOMBSTONES; case TIMEOUT: return TIMEOUT; case INCOMPATIBLE_SCHEMA: return INCOMPATIBLE_SCHEMA; diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java index b3b9bddfc448..917c6c753d46 100644 --- a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java +++ b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java @@ -42,6 +42,7 @@ public enum RequestFailureReason READ_TOO_MANY_INDEXES (10), RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM (11), BOOTING (12), + UNKNOWN_TOPOLOGY (13) ; static diff --git a/src/java/org/apache/cassandra/net/MessageDelivery.java b/src/java/org/apache/cassandra/net/MessageDelivery.java index f9439dbb30d2..c57366d45058 100644 --- a/src/java/org/apache/cassandra/net/MessageDelivery.java +++ b/src/java/org/apache/cassandra/net/MessageDelivery.java @@ -139,7 +139,8 @@ interface RetryErrorMessage } private static void sendWithRetries(MessageDelivery messaging, - Backoff backoff, RetryScheduler retryThreads, + Backoff backoff, + RetryScheduler retryThreads, Verb verb, REQ request, Iterator candidates, OnResult onResult, diff --git a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java index 6aa4888e3a15..2452d5055a3e 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java @@ -213,7 +213,7 @@ public EpochDiskState truncateTopologyUntil(long epoch, EpochDiskState diskState } } - // TODO: should not be public + //TODO (required): should not be public public final ChangeListener listener = new MetadataChangeListener(); private class MetadataChangeListener implements ChangeListener { @@ -439,7 +439,7 @@ protected void fetchTopologyInternal(long epoch) // In most cases, after fetching log from CMS, we will be caught up to the required epoch. // This TCM will also notify Accord via reportMetadata, so we do not need to fetch topologies. - // If metadata has reported has skipped one or more eopchs, and is _ahead_ of the requested epoch, + // If metadata has reported has skipped one or more epochs, and is _ahead_ of the requested epoch, // we need to fetch topologies from peers to fill in the gap. ClusterMetadata metadata = ClusterMetadata.current(); if (metadata.epoch.getEpoch() == epoch) diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 6b9c42dd9f56..cce9b8832bf8 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -397,7 +397,7 @@ public synchronized void startup() { try { - epochReady(metadata.epoch).get(5, SECONDS); + epochReady(metadata.epoch).get(waitSeconds, SECONDS); break; } catch (TimeoutException e) diff --git a/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java b/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java index 9febcb060953..3c6e18c3d635 100644 --- a/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java +++ b/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java @@ -111,13 +111,13 @@ public static Future fetch(SharedContext context, Set static Future fetch(SharedContext context, InetAddressAndPort to) { Backoff backoff = Backoff.fromConfig(context, DatabaseDescriptor.getAccord().minEpochSyncRetry); - return context.messaging().sendWithRetries(backoff, - context.optionalTasks()::schedule, - Verb.ACCORD_FETCH_MIN_EPOCH_REQ, - FetchMinEpoch.instance, - Iterators.cycle(to), - RetryPredicate.ALWAYS_RETRY, - RetryErrorMessage.EMPTY) + return context.messaging().sendWithRetries(backoff, + context.optionalTasks()::schedule, + Verb.ACCORD_FETCH_MIN_EPOCH_REQ, + FetchMinEpoch.instance, + Iterators.cycle(to), + RetryPredicate.ALWAYS_RETRY, + RetryErrorMessage.EMPTY) .map(m -> m.payload.minEpoch); } diff --git a/src/java/org/apache/cassandra/service/accord/FetchTopology.java b/src/java/org/apache/cassandra/service/accord/FetchTopology.java index 08af6cb1fc8a..b5d20b63a648 100644 --- a/src/java/org/apache/cassandra/service/accord/FetchTopology.java +++ b/src/java/org/apache/cassandra/service/accord/FetchTopology.java @@ -23,6 +23,8 @@ import accord.topology.Topology; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.exceptions.RequestFailure; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -82,14 +84,14 @@ public static class Response @Override public void serialize(Response t, DataOutputPlus out, int version) throws IOException { - out.writeLong(t.epoch); + out.writeUnsignedVInt(t.epoch); TopologySerializers.topology.serialize(t.topology, out, version); } @Override public Response deserialize(DataInputPlus in, int version) throws IOException { - long epoch = in.readLong(); + long epoch = in.readUnsignedVInt(); Topology topology = TopologySerializers.topology.deserialize(in, version); return new Response(epoch, topology); } @@ -97,7 +99,8 @@ public Response deserialize(DataInputPlus in, int version) throws IOException @Override public long serializedSize(Response t, int version) { - return Long.BYTES + TopologySerializers.topology.serializedSize(t.topology, version); + return TypeSizes.sizeofUnsignedVInt(t.epoch) + + TopologySerializers.topology.serializedSize(t.topology, version); } }; @@ -117,7 +120,7 @@ public Response(long epoch, Topology topology) if (topology != null) MessagingService.instance().respond(new Response(epoch, topology), message); else - throw new IllegalStateException("Unknown topology: " + epoch); + MessagingService.instance().respondWithFailure(RequestFailure.UNKNOWN_TOPOLOGY, message); }; public static Future fetch(SharedContext context, Collection peers, long epoch) @@ -129,10 +132,7 @@ public static Future fetch(SharedContext context, Collection { - System.out.println("Got " + failure + " from " + from + " while fetching " + request); - return true; - }, + (attempt, from, failure) -> true, MessageDelivery.RetryErrorMessage.EMPTY) .map(m -> m.payload.topology); } diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java index ceb8d911ecb0..1d1d1742bea6 100644 --- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java +++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java @@ -80,6 +80,7 @@ public class DatabaseDescriptorRefTest "org.apache.cassandra.config.AccordSpec", "org.apache.cassandra.config.AccordSpec$JournalSpec", "org.apache.cassandra.config.AccordSpec$MinEpochRetrySpec", + "org.apache.cassandra.config.AccordSpec$FetchRetrySpec", "org.apache.cassandra.config.AccordSpec$TransactionalRangeMigration", "org.apache.cassandra.config.AccordSpec$QueueShardModel", "org.apache.cassandra.config.AccordSpec$QueueSubmissionModel", From 9543c6003b7ad74af02a031136bdd565b8efccc7 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Fri, 31 Jan 2025 22:20:32 +0100 Subject: [PATCH 4/5] Document a change to configuration service --- .../accord/AccordConfigurationService.java | 29 +++++++++++++++---- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java index 2452d5055a3e..9d8b41bcb141 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java @@ -415,8 +415,29 @@ void maybeReportMetadata(ClusterMetadata metadata) long epoch = metadata.epoch.getEpoch(); synchronized (epochs) { - long maxEpoch = epochs.maxEpoch(); - if (maxEpoch == 0) + // On first boot, we have 2 options: + // + // - we can start listening to TCM _before_ we replay topologies + // - we can start listening to TCM _after_ we replay topologies + // + // If we start listening to TCM _before_ we replay topologies from other nodes, + // we may end up in a situation where TCM reports metadata that would create an + // `epoch - 1` epoch state that is not associated with any topologies, and + // therefore should not be listened upon. + // + // If we start listening to TCM _after_ we replay topologies, we may end up in a + // situation where TCM reports metadata that is 1 (or more) epochs _ahead_ of the + // last known epoch. Previous implementations were using TCM peer catch up, which + // could have resulted in gaps. + // + // Current protocol solves both problems by _first_ replaying topologies form peers, + // then subscribing to TCM _and_, if there are still any gaps, filling them again. + // However, it still has a slight chance of creating an `epoch - 1` epoch state + // not associated with any topologies, which under "right" circumstances could + // have been waited upon with `epochReady`. This check precludes creation of this + // epoch: by the time this code can be called, remote topology replay is already + // done, so TCM listener will only report epochs that are _at least_ min epoch. + if (epochs.maxEpoch() == 0 || epochs.minEpoch() == metadata.epoch.getEpoch()) { getOrCreateEpochState(epoch); // touch epoch state so subsequent calls see it reportMetadata(metadata); @@ -424,9 +445,7 @@ void maybeReportMetadata(ClusterMetadata metadata) } } - // Create a -1 epoch iif we know this epoch may actually exist - if (metadata.epoch.getEpoch() > minEpoch()) - getOrCreateEpochState(epoch - 1).acknowledged().addCallback(() -> reportMetadata(metadata)); + getOrCreateEpochState(epoch - 1).acknowledged().addCallback(() -> reportMetadata(metadata)); } @Override From ce7ba8ee4788d1bc2cae347cbaf61e003ebb234f Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Sat, 1 Feb 2025 16:51:04 +0100 Subject: [PATCH 5/5] Minor: fix CASTest --- .../org/apache/cassandra/service/accord/FetchTopology.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/service/accord/FetchTopology.java b/src/java/org/apache/cassandra/service/accord/FetchTopology.java index b5d20b63a648..dc8df4f4548c 100644 --- a/src/java/org/apache/cassandra/service/accord/FetchTopology.java +++ b/src/java/org/apache/cassandra/service/accord/FetchTopology.java @@ -116,8 +116,9 @@ public Response(long epoch, Topology topology) public static final IVerbHandler handler = message -> { long epoch = message.payload.epoch; - Topology topology = AccordService.instance().topology().maybeGlobalForEpoch(epoch); - if (topology != null) + + Topology topology; + if (AccordService.isSetup() && (topology = AccordService.instance().topology().maybeGlobalForEpoch(epoch)) != null) MessagingService.instance().respond(new Response(epoch, topology), message); else MessagingService.instance().respondWithFailure(RequestFailure.UNKNOWN_TOPOLOGY, message);