Skip to content
This repository has been archived by the owner on Nov 17, 2024. It is now read-only.

Commit

Permalink
Merge pull request #57 from caskdata/feature/tx-checkpoints
Browse files Browse the repository at this point in the history
TEPHRA-96 Transaction checkpoints: support multiple write pointers per tx
  • Loading branch information
Nitin Motgi committed May 22, 2015
2 parents 561f1cb + 63ab1c2 commit 8615f0d
Show file tree
Hide file tree
Showing 41 changed files with 2,734 additions and 392 deletions.
158 changes: 145 additions & 13 deletions tephra-api/src/main/java/co/cask/tephra/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,54 +25,132 @@
// are available
public class Transaction {
private final long readPointer;
private final long txId;
private final long writePointer;
private final long[] invalids;
private final long[] inProgress;
private final long firstShortInProgress;
private final TransactionType type;
private final long[] checkpointWritePointers;

private VisibilityLevel visibilityLevel = VisibilityLevel.SNAPSHOT;

private static final long[] NO_EXCLUDES = { };
public static final long NO_TX_IN_PROGRESS = Long.MAX_VALUE;

public static final Transaction ALL_VISIBLE_LATEST =
new Transaction(Long.MAX_VALUE, Long.MAX_VALUE, NO_EXCLUDES, NO_EXCLUDES, NO_TX_IN_PROGRESS, TransactionType.SHORT);

/**
* Defines the possible visibility levels for read operations.
*
* <p>
* <ul>
* <li><code>SNAPSHOT</code> - uses the transaction's read snapshot, plus includes all write pointers from the
* current transaction</li>
* <li><code>SNAPSHOT_EXCLUDE_CURRENT</code> - uses the transaction's read snapshot, plus includes all write
* pointers from the current transaction, <strong>except</strong> the current write pointer
* (see {@link #getWritePointer()})</li>
* </ul>
* </p>
*
* <p>The default value used is {@code SNAPSHOT}.</p>
*
* @see #setVisibility(VisibilityLevel)
*/
public enum VisibilityLevel {
SNAPSHOT,
SNAPSHOT_EXCLUDE_CURRENT
}

/**
* Creates a new short transaction.
* @param readPointer read pointer for transaction
* @param txId unique identifier for the transaction
* @param invalids list of invalid transactions to exclude while reading
* @param inProgress list of in-progress transactions to exclude while reading
* @param firstShortInProgress earliest in-progress short transaction
*/
public Transaction(long readPointer, long txId, long[] invalids, long[] inProgress,
long firstShortInProgress) {
this(readPointer, txId, invalids, inProgress, firstShortInProgress, TransactionType.SHORT);
}

/**
* Creates a new transaction.
* @param readPointer read pointer for transaction
* @param writePointer write pointer for transaction
* @param txId unique identifier for the transaction
* @param invalids list of invalid transactions to exclude while reading
* @param inProgress list of in-progress transactions to exclude while reading
* @param firstShortInProgress earliest in-progress short transaction
* @param type transaction type
*/
public Transaction(long readPointer, long writePointer, long[] invalids, long[] inProgress,
public Transaction(long readPointer, long txId, long[] invalids, long[] inProgress,
long firstShortInProgress, TransactionType type) {
this(readPointer, txId, txId, invalids, inProgress, firstShortInProgress, type, new long[0],
VisibilityLevel.SNAPSHOT);
}

/**
* Creates a new transaction.
* @param readPointer read pointer for transaction
* @param txId unique identifier for the transaction
* @param writePointer the current pointer to be used for any writes.
* For new transactions, this will be the same as {@code txId}. For checkpointed
* transactions, this will be the most recent write pointer issued.
* @param invalids list of invalid transactions to exclude while reading
* @param inProgress list of in-progress transactions to exclude while reading
* @param firstShortInProgress earliest in-progress short transaction
* @param type transaction type
* @param checkpointPointers the list of writer pointers added from checkpoints on the transaction
* @param visibilityLevel the visibility level to use for transactional reads
*/
public Transaction(long readPointer, long txId, long writePointer, long[] invalids, long[] inProgress,
long firstShortInProgress, TransactionType type, long[] checkpointPointers,
VisibilityLevel visibilityLevel) {
this.readPointer = readPointer;
this.txId = txId;
this.writePointer = writePointer;
this.invalids = invalids;
this.inProgress = inProgress;
this.firstShortInProgress = firstShortInProgress;
this.type = type;
this.checkpointWritePointers = checkpointPointers;
this.visibilityLevel = visibilityLevel;
}

/**
* Creates a new short transaction.
* @param readPointer read pointer for transaction
* @param writePointer write pointer for transaction
* @param invalids list of invalid transactions to exclude while reading
* @param inProgress list of in-progress transactions to exclude while reading
* @param firstShortInProgress earliest in-progress short transaction
* Creates a new transaction for a checkpoint operation, copying all members from the original transaction,
* with the updated checkpoint write pointers.
*
* @param toCopy the original transaction containing the state to copy
* @param writePointer the new write pointer to use for the transaction
* @param checkpointPointers the list of write pointers added from checkpoints on the transaction
*/
public Transaction(long readPointer, long writePointer, long[] invalids, long[] inProgress,
long firstShortInProgress) {
this(readPointer, writePointer, invalids, inProgress, firstShortInProgress, TransactionType.SHORT);
public Transaction(Transaction toCopy, long writePointer, long[] checkpointPointers) {
this(toCopy.getReadPointer(), toCopy.getTransactionId(), writePointer, toCopy.getInvalids(),
toCopy.getInProgress(), toCopy.getFirstShortInProgress(), toCopy.getType(), checkpointPointers,
toCopy.getVisibilityLevel());
}

public long getReadPointer() {
return readPointer;
}

/**
* Returns the initial write pointer assigned to the transaction. This will remain the same for the life of the
* transaction, and uniquely identifies it with the transaction service. This value should be provided
* to identify the transaction when calling any transaction lifecycle methods on the transaction service.
*/
public long getTransactionId() {
return txId;
}

/**
* Returns the write pointer to be used in persisting any changes. After a checkpoint is performed, this will differ
* from {@link #getTransactionId()}. This method should always be used when setting the timestamp for writes
* in order to ensure that the correct value is used.
*/
public long getWritePointer() {
return writePointer;
}
Expand Down Expand Up @@ -101,25 +179,67 @@ public long getVisibilityUpperBound() {
// NOTE: in some cases when we do not provide visibility guarantee, we set readPointer to MAX value, but
// at same time we don't want that to case cleanup everything as this is used for tx janitor + ttl to see
// what can be cleaned up. When non-tx mode is implemented better, we should not need this check
return inProgress.length == 0 ? Math.min(writePointer - 1, readPointer) : inProgress[0] - 1;
return inProgress.length == 0 ? Math.min(txId - 1, readPointer) : inProgress[0] - 1;
}

public long getFirstShortInProgress() {
return firstShortInProgress;
}

/**
* Returns true if the given version corresponds to a transaction that was in-progress at the time this transaction
* started.
*/
public boolean isInProgress(long version) {
return Arrays.binarySearch(inProgress, version) >= 0;
}

/**
* Returns true if the given version is present in one of the arrays of excluded versions (in-progress and
* invalid transactions).
*/
public boolean isExcluded(long version) {
return Arrays.binarySearch(inProgress, version) >= 0
|| Arrays.binarySearch(invalids, version) >= 0;
}

/**
* Returns true if the the given version corresponds to one of the checkpoint versions in the current
* transaction.
*/
public boolean isCheckpoint(long version) {
return Arrays.binarySearch(checkpointWritePointers, version) >= 0;
}

/**
* Returns whether or not the given version should be visible to the current transaction. A version will be visible
* if it was successfully committed prior to the current transaction starting, or was written by the current
* transaction (using either the current write pointer or the write pointer from a prior checkpoint).
*
* @param version the data version to check for visibility
* @return true if the version is visible, false if it should be hidden (filtered)
*
* @see #setVisibility(VisibilityLevel) to control whether the current write pointer is visible.
*/
public boolean isVisible(long version) {
// either it was committed before or the change belongs to current tx
return (version <= getReadPointer() && !isExcluded(version)) || writePointer == version;
return (version <= getReadPointer() && !isExcluded(version)) ||
((txId == version || isCheckpoint(version)) &&
(visibilityLevel == VisibilityLevel.SNAPSHOT || writePointer != version));
}

/**
* Sets the visibility level for read operations.
*/
public void setVisibility(VisibilityLevel level) {
this.visibilityLevel = level;
}

/**
* Returns the currently set visibility level.
*/
public VisibilityLevel getVisibilityLevel() {
return visibilityLevel;
}

public boolean hasExcludes() {
Expand All @@ -131,17 +251,29 @@ public int excludesSize() {
return invalids.length + inProgress.length;
}

/**
* Returns any prior write pointers used in the current transaction. A new write pointer is issued when the
* {@code TransactionContext.checkpoint(Transaction)} operation is called, and the prior write pointer is added
* to the array of checkpoint write pointers.
` */
public long[] getCheckpointWritePointers() {
return checkpointWritePointers;
}

@Override
public String toString() {
return new StringBuilder(100)
.append(Transaction.class.getSimpleName())
.append('{')
.append("readPointer: ").append(readPointer)
.append(", transactionId: ").append(txId)
.append(", writePointer: ").append(writePointer)
.append(", invalids: ").append(Arrays.toString(invalids))
.append(", inProgress: ").append(Arrays.toString(inProgress))
.append(", firstShortInProgress: ").append(firstShortInProgress)
.append(", type: ").append(type)
.append(", checkpointWritePointers: ").append(Arrays.toString(checkpointWritePointers))
.append(", visibilityLevel: ").append(visibilityLevel)
.append('}')
.toString();
}
Expand Down
13 changes: 12 additions & 1 deletion tephra-api/src/main/java/co/cask/tephra/TransactionAware.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,23 @@
// todo: add onCommitted() - so that e.g. hbase table can do *actual* deletes at this point
public interface TransactionAware {
/**
* Called when new transaction has started.
* Called when new transaction has started. This may reset any state which has been left behind by the previous
* transaction.
*
* @param tx transaction info
*/
// todo: rename to onTxStart()
void startTx(Transaction tx);

/**
* Called when the state of the current transaction has been updated. This should replace any reference to the
* current {@link Transaction} held by this {@code TransactionAware}, but should <strong>not</strong> reset
* any state (such as the write change sets) that is currently maintained.
*
* @param tx the updated transaction
*/
void updateTx(Transaction tx);

/**
* @return changes made by current transaction to be used for conflicts detection before commit.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ public void startTx(Transaction tx) {
}
}

@Override
public void updateTx(Transaction tx) {
for (TransactionAware txAware : this) {
txAware.updateTx(tx);
}
}

@Override
public Collection<byte[]> getTxChanges() {
List<byte[]> changes = new ArrayList<byte[]>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@

import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Bytes;
import com.google.common.primitives.UnsignedBytes;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;

Expand All @@ -35,7 +37,8 @@
*/
public abstract class AbstractTransactionAwareTable implements TransactionAware {
protected final TransactionCodec txCodec;
protected final Set<ActionChange> changeSet;
// map of write pointers to change set assocaited with each
protected final Map<Long, Set<ActionChange>> changeSets;
protected final TxConstants.ConflictDetection conflictLevel;
protected Transaction tx;
protected boolean allowNonTransactional;
Expand All @@ -44,7 +47,7 @@ public AbstractTransactionAwareTable(TxConstants.ConflictDetection conflictLevel
this.conflictLevel = conflictLevel;
this.allowNonTransactional = allowNonTransactional;
this.txCodec = new TransactionCodec();
this.changeSet = new HashSet<ActionChange>();
this.changeSets = Maps.newHashMap();
}

/**
Expand All @@ -68,15 +71,22 @@ public void startTx(Transaction tx) {
this.tx = tx;
}

@Override
public void updateTx(Transaction tx) {
this.tx = tx;
}

@Override
public Collection<byte[]> getTxChanges() {
if (conflictLevel == TxConstants.ConflictDetection.NONE) {
return Collections.emptyList();
}

Collection<byte[]> txChanges = new TreeSet<byte[]>(UnsignedBytes.lexicographicalComparator());
for (ActionChange change : changeSet) {
txChanges.add(getChangeKey(change.getRow(), change.getFamily(), change.getQualifier()));
for (Set<ActionChange> changeSet : changeSets.values()) {
for (ActionChange change : changeSet) {
txChanges.add(getChangeKey(change.getRow(), change.getFamily(), change.getQualifier()));
}
}
return txChanges;
}
Expand Down Expand Up @@ -111,7 +121,7 @@ public boolean commitTx() throws Exception {
@Override
public void postTxCommit() {
tx = null;
changeSet.clear();
changeSets.clear();
}

@Override
Expand All @@ -137,6 +147,12 @@ public boolean rollbackTx() throws Exception {
protected abstract boolean doRollback() throws Exception;

protected void addToChangeSet(byte[] row, byte[] family, byte[] qualifier) {
long currentWritePointer = tx.getWritePointer();
Set<ActionChange> changeSet = changeSets.get(currentWritePointer);
if (changeSet == null) {
changeSet = Sets.newHashSet();
changeSets.put(currentWritePointer, changeSet);
}
switch (conflictLevel) {
case ROW:
case NONE:
Expand Down
Loading

0 comments on commit 8615f0d

Please sign in to comment.