Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CEP-45: Initial read path integration and reconciliation implementation #3882

Open
wants to merge 34 commits into
base: cep-45-mutation-tracking
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
7436dce
schema configuration
bdeggleston Dec 6, 2024
1f37619
add mutation id
bdeggleston Dec 4, 2024
9826f59
add messaging service serialization version
bdeggleston Dec 20, 2024
8741de0
add mutation id to mutation
bdeggleston Dec 20, 2024
9f03ddd
add mutation tracker
bdeggleston Feb 12, 2025
acdb28c
initial write path integration
bdeggleston Feb 12, 2025
260a999
partial mutation summarization stuff
bdeggleston Feb 12, 2025
e164629
initial mutation summarization
bdeggleston Dec 20, 2024
45afac8
refactor ReadResponse
bdeggleston Dec 20, 2024
4671982
refactor ResponseResolver
bdeggleston Dec 10, 2024
45e8402
refactor ReadRepair
bdeggleston Dec 20, 2024
e8b32a3
add LoggedReadResponse
bdeggleston Dec 20, 2024
3a42008
add LoggedResolver
bdeggleston Dec 20, 2024
baa498b
add LoggedReadReconciliation
bdeggleston Dec 20, 2024
1565658
integrate LoggedReadResponse into ReadCommand
bdeggleston Dec 11, 2024
61df4ee
add support for read path mutation summarization
bdeggleston Feb 12, 2025
0b05b54
integrate mutation summarization into read path
bdeggleston Dec 20, 2024
c2d8c18
integrate logged request type selection into read path
bdeggleston Dec 11, 2024
8f36ecc
add tracker reconciliation support
bdeggleston Dec 18, 2024
8ddeaf6
implement read reconciliation
bdeggleston Feb 12, 2025
f4979d7
disable repaired status tracking for logged reads
bdeggleston Jan 6, 2025
1f5b80d
add read result mutation augmentation
bdeggleston Jan 6, 2025
5000124
addl ReadCommand integration
bdeggleston Jan 8, 2025
15e4143
addl integrate mutation summarization into read path
bdeggleston Jan 8, 2025
7e5b33d
add support for range summaries
bdeggleston Jan 8, 2025
e6a7222
add support for range queries
bdeggleston Jan 8, 2025
b8f7bdb
addl range summarization
bdeggleston Jan 9, 2025
58a5ed3
more range read work
bdeggleston Jan 9, 2025
8500d46
more range read work
bdeggleston Jan 9, 2025
68eeb3f
Fix range read test flakes
aratno Jan 22, 2025
23533f4
Fix timeout for point read vs. range
aratno Jan 22, 2025
99dd226
adding pending reads/writes
bdeggleston Jan 23, 2025
c84a333
additional pending read/write work
bdeggleston Feb 6, 2025
baa73a1
remove accumulating mutation summarizer
bdeggleston Feb 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/java/org/apache/cassandra/batchlog/BatchlogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
import org.apache.cassandra.db.MutationId;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.transport.Dispatcher;
Expand Down Expand Up @@ -126,10 +127,10 @@ public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedExcep

public static void remove(TimeUUID id)
{
new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
id.toBytes(),
FBUtilities.timestampMicros(),
FBUtilities.nowInSeconds()))
new Mutation(MutationId.none(), PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
id.toBytes(),
FBUtilities.timestampMicros(),
FBUtilities.nowInSeconds()))
.apply();
}

Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,8 @@ public static class SSTableConfig

public boolean dynamic_data_masking_enabled = false;

public boolean mutation_tracking_enabled = false;

/**
* Time in milliseconds after a warning will be emitted to the log and to the client that a UDF runs too long.
* (Only valid, if user_defined_functions_threads_enabled==true)
Expand Down
14 changes: 14 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -5394,6 +5394,20 @@ public static void setDynamicDataMaskingEnabled(boolean enabled)
}
}

public static boolean getMutationTrackingEnabled()
{
return conf.mutation_tracking_enabled;
}

public static void setMutationTrackingEnabled(boolean enabled)
{
if (enabled != conf.mutation_tracking_enabled)
{
logger.info("Setting mutation_tracking_enabled to {}", enabled);
conf.mutation_tracking_enabled = enabled;
}
}

public static OptionalDouble getSeverityDuringDecommission()
{
return conf.severity_during_decommission > 0 ?
Expand Down
14 changes: 7 additions & 7 deletions src/java/org/apache/cassandra/cql3/QueryProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadQuery;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.SinglePartitionReadQuery;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.AbstractType;
Expand Down Expand Up @@ -88,6 +87,7 @@
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.pager.QueryPager;
import org.apache.cassandra.service.reads.IReadResponse;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.Dispatcher;
Expand Down Expand Up @@ -514,17 +514,17 @@ else if (readQuery instanceof SinglePartitionReadQuery.Group)
{
throw new IllegalArgumentException("Unable to handle; only expected ReadCommands but given " + readQuery.getClass());
}
Future<List<Message<ReadResponse>>> future = FutureCombiner.allOf(commands.stream()
.map(rc -> Message.out(rc.verb(), rc))
.map(m -> MessagingService.instance().<ReadCommand, ReadResponse>sendWithResult(m, address))
.collect(Collectors.toList()));
Future<List<Message<IReadResponse>>> future = FutureCombiner.allOf(commands.stream()
.map(rc -> Message.out(rc.verb(), rc))
.map(m -> MessagingService.instance().<ReadCommand, IReadResponse>sendWithResult(m, address))
.collect(Collectors.toList()));

ResultSetBuilder result = new ResultSetBuilder(select.getResultMetadata(), select.getSelection().newSelectors(options), false);
return future.map(list -> {
int i = 0;
for (Message<ReadResponse> m : list)
for (Message<IReadResponse> m : list)
{
ReadResponse rsp = m.payload;
IReadResponse rsp = m.payload;
try (PartitionIterator it = UnfilteredPartitionIterators.filter(rsp.makeIterator(commands.get(i++)), nowInSec))
{
while (it.hasNext())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.MutationId;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.partitions.PartitionUpdate;
Expand Down Expand Up @@ -218,12 +219,14 @@ public MutationBuilder add(PartitionUpdate.Builder updateBuilder)
public Mutation build()
{
ImmutableMap.Builder<TableId, PartitionUpdate> updates = new ImmutableMap.Builder<>();
boolean createId = false;
for (Map.Entry<TableId, PartitionUpdate.Builder> updateEntry : modifications.entrySet())
{
PartitionUpdate update = updateEntry.getValue().build();
updates.put(updateEntry.getKey(), update);
createId |= update.metadata().hasLoggedReplication();
}
return new Mutation(keyspaceName, key, updates.build(), createdAt);
return new Mutation(createId ? MutationId.createNext() : MutationId.none(), keyspaceName, key, updates.build(), createdAt);
}

public PartitionUpdate.Builder get(TableId tableId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.MutationId;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.partitions.PartitionUpdate;
Expand Down Expand Up @@ -104,9 +105,9 @@ public List<IMutation> toMutations(ClientState state)
if (metadata.isVirtual())
mutation = new VirtualMutation(builder.build());
else if (metadata.isCounter())
mutation = new CounterMutation(new Mutation(builder.build()), counterConsistencyLevel);
mutation = new CounterMutation(new Mutation(MutationId.createFor(metadata), builder.build()), counterConsistencyLevel);
else
mutation = new Mutation(builder.build());
mutation = new Mutation(MutationId.createFor(metadata), builder.build());

mutation.validateIndexedColumns(state);
mutation.validateSize(MessagingService.current_version, CommitLogSegment.ENTRY_OVERHEAD_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.KeyspaceParams.Option;
import org.apache.cassandra.schema.ReplicationType;
import org.apache.cassandra.schema.ReplicationParams;

public final class KeyspaceAttributes extends PropertyDefinitions
Expand Down Expand Up @@ -66,17 +67,25 @@ private Map<String, String> getAllReplicationOptions()
KeyspaceParams asNewKeyspaceParams()
{
boolean durableWrites = getBoolean(Option.DURABLE_WRITES.toString(), KeyspaceParams.DEFAULT_DURABLE_WRITES);
return KeyspaceParams.create(durableWrites, getAllReplicationOptions());

String rtypeName = getString(Option.REPLICATION_TYPE.toString());
ReplicationType replicationType = rtypeName != null ? ReplicationType.valueOf(rtypeName) : KeyspaceParams.DEFAULT_REPLICATION_TYPE;

return KeyspaceParams.create(durableWrites, getAllReplicationOptions(), replicationType);
}

KeyspaceParams asAlteredKeyspaceParams(KeyspaceParams previous)
{
boolean durableWrites = getBoolean(Option.DURABLE_WRITES.toString(), previous.durableWrites);
String rtypeName = getString(Option.REPLICATION_TYPE.toString());
ReplicationType replicationType = rtypeName != null ? ReplicationType.valueOf(rtypeName) : previous.replicationType;


Map<String, String> previousOptions = previous.replication.options;
ReplicationParams replication = getReplicationStrategyClass() == null
? previous.replication
: ReplicationParams.fromMapWithDefaults(getAllReplicationOptions(), previousOptions);
return new KeyspaceParams(durableWrites, replication);
return new KeyspaceParams(durableWrites, replication, replicationType);
}

public boolean hasOption(Option option)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableParams;
import org.apache.cassandra.schema.TableParams.Option;
import org.apache.cassandra.schema.TableParams.TableReplicationType;
import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;

Expand Down Expand Up @@ -151,6 +152,9 @@ private TableParams build(TableParams.Builder builder)
if (hasOption(READ_REPAIR))
builder.readRepair(ReadRepairStrategy.fromString(getString(READ_REPAIR)));

if (hasOption(REPLICATION_TYPE))
builder.replicationType(TableReplicationType.fromString(getString(REPLICATION_TYPE)));

return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.replication.MutationTracker;
import org.apache.cassandra.replication.MutationTrackingService;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.concurrent.OpOrder;
Expand All @@ -42,24 +44,30 @@ public CassandraKeyspaceWriteHandler(Keyspace keyspace)
public WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws RequestExecutionException
{
OpOrder.Group group = null;
MutationTracker.PendingWrite pendingWrite = null;
try
{
group = Keyspace.writeOrder.start();
pendingWrite = MutationTrackingService.instance().startWrite(mutation);

// write the mutation to the commitlog and memtables
CommitLogPosition position = null;
if (makeDurable)
{
position = addToCommitLog(mutation);
}
return new CassandraWriteContext(group, position);
return new CassandraWriteContext(group, position, pendingWrite);
}
catch (Throwable t)
{
if (group != null)
{
group.close();
}
if (pendingWrite != null)
{
pendingWrite.close();
}
throw t;
}
}
Expand Down Expand Up @@ -105,7 +113,7 @@ private WriteContext createEmptyContext()
try
{
group = Keyspace.writeOrder.start();
return new CassandraWriteContext(group, null);
return new CassandraWriteContext(group, null, MutationTracker.PendingWrite.NOOP);
}
catch (Throwable t)
{
Expand Down
6 changes: 5 additions & 1 deletion src/java/org/apache/cassandra/db/CassandraWriteContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@
import com.google.common.base.Preconditions;

import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.replication.MutationTracker;
import org.apache.cassandra.utils.concurrent.OpOrder;

public class CassandraWriteContext implements WriteContext
{
private final OpOrder.Group opGroup;
private final CommitLogPosition position;
private final MutationTracker.PendingWrite pendingWrite;

public CassandraWriteContext(OpOrder.Group opGroup, CommitLogPosition position)
public CassandraWriteContext(OpOrder.Group opGroup, CommitLogPosition position, MutationTracker.PendingWrite pendingWrite)
{
Preconditions.checkArgument(opGroup != null);
this.opGroup = opGroup;
this.position = position;
this.pendingWrite = pendingWrite;
}

public static CassandraWriteContext fromContext(WriteContext context)
Expand All @@ -55,5 +58,6 @@ public CommitLogPosition getPosition()
public void close()
{
opGroup.close();
pendingWrite.close();
}
}
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/db/CounterMutation.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ public CounterMutation(Mutation mutation, ConsistencyLevel consistency)
this.consistency = consistency;
}

@Override
public MutationId id()
{
return mutation.id();
}

public String getKeyspaceName()
{
return mutation.getKeyspaceName();
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/db/IMutation.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public interface IMutation
{
long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize();

MutationId id();
void apply();
String getKeyspaceName();
Collection<TableId> getTableIds();
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/db/Keyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.metrics.KeyspaceMetrics;
import org.apache.cassandra.repair.KeyspaceRepairManager;
import org.apache.cassandra.replication.MutationTrackingService;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
Expand Down Expand Up @@ -545,6 +546,8 @@ else if (isDeferrable)
}
try (WriteContext ctx = getWriteHandler().beginWrite(mutation, makeDurable))
{
MutationTrackingService.instance().add(mutation);

for (PartitionUpdate upd : mutation.getPartitionUpdates())
{
ColumnFamilyStore cfs = columnFamilyStores.get(upd.metadata().id);
Expand Down
Loading