Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-446] Remove refcount due to outdated #449

Merged
merged 1 commit into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ public class ForwardOutputDesc<T> implements IOutputDesc, Serializable {
private final int edgeId;
// Partition number.
private final int numPartitions;
// Data ref count, for data disposal.
private final int refCount;
// Name of the output edge.
private final String edgeName;
// Data exchange mode.
Expand All @@ -47,7 +45,6 @@ public ForwardOutputDesc(
int vertexId,
int edgeId,
int numPartitions,
int refCount,
String edgeName,
DataExchangeMode dataExchangeMode,
List<Integer> targetTaskIndices,
Expand All @@ -56,7 +53,6 @@ public ForwardOutputDesc(
this.vertexId = vertexId;
this.edgeId = edgeId;
this.numPartitions = numPartitions;
this.refCount = refCount;
this.edgeName = edgeName;
this.dataExchangeMode = dataExchangeMode;
this.targetTaskIndices = targetTaskIndices;
Expand All @@ -76,10 +72,6 @@ public int getNumPartitions() {
return this.numPartitions;
}

public int getRefCount() {
return this.refCount;
}

public String getEdgeName() {
return this.edgeName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ public void update(UpdateEmitterRequest request) {
.setTaskName(taskArgs.getTaskName())
.setChannelNum(forwardOutputDesc.getTargetTaskIndices().size())
.setEncoder(encoder)
.setDataExchangeMode(forwardOutputDesc.getDataExchangeMode())
.setRefCount(forwardOutputDesc.getRefCount());
.setDataExchangeMode(forwardOutputDesc.getDataExchangeMode());
pipeRecordWriter.init(writerContext);

AtomicBoolean flag = new AtomicBoolean(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,4 @@ public interface IWriterContext extends Serializable {

DataExchangeMode getDataExchangeMode();

int getRefCount();

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public void init(IWriterContext writerContext) {
}

@Override
protected PipelineSlice newSlice(String taskLogTag, SliceId sliceId, int refCount) {
return new PipelineSlice(taskLogTag, sliceId, refCount);
protected PipelineSlice newSlice(String taskLogTag, SliceId sliceId) {
return new PipelineSlice(taskLogTag, sliceId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void init(IWriterContext writerContext) {
this.maxBufferSize = this.shuffleConfig.getFlushBufferSizeBytes();

this.buffers = this.buildBufferBuilder(this.targetChannels);
this.resultSlices = this.buildResultSlices(this.targetChannels, writerContext.getRefCount());
this.resultSlices = this.buildResultSlices(this.targetChannels);
this.recordSerializer = this.getRecordSerializer();
}

Expand All @@ -89,20 +89,20 @@ private BufferBuilder[] buildBufferBuilder(int channels) {
return buffers;
}

protected IPipelineSlice[] buildResultSlices(int channels, int refCount) {
protected IPipelineSlice[] buildResultSlices(int channels) {
IPipelineSlice[] slices = new IPipelineSlice[channels];
WriterId writerId = new WriterId(this.pipelineId, this.edgeId, this.taskIndex);
SliceManager sliceManager = ShuffleManager.getInstance().getSliceManager();
for (int i = 0; i < channels; i++) {
SliceId sliceId = new SliceId(writerId, i);
IPipelineSlice slice = this.newSlice(this.taskLogTag, sliceId, refCount);
IPipelineSlice slice = this.newSlice(this.taskLogTag, sliceId);
slices[i] = slice;
sliceManager.register(sliceId, slice);
}
return slices;
}

protected abstract IPipelineSlice newSlice(String taskLogTag, SliceId sliceId, int refCount);
protected abstract IPipelineSlice newSlice(String taskLogTag, SliceId sliceId);

@SuppressWarnings("unchecked")
private IRecordSerializer<T> getRecordSerializer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public SpillableShardWriter(ShuffleAddress shuffleAddress) {
}

@Override
protected IPipelineSlice newSlice(String taskLogTag, SliceId sliceId, int refCount) {
return new SpillablePipelineSlice(taskLogTag, sliceId, refCount);
protected IPipelineSlice newSlice(String taskLogTag, SliceId sliceId) {
return new SpillablePipelineSlice(taskLogTag, sliceId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public class WriterContext implements IWriterContext {
private DataExchangeMode dataExchangeMode;
private int targetChannels;
private ShuffleConfig config;
private int refCount;
private IEncoder<?> encoder;

public WriterContext(long pipelineId, String pipelineName) {
Expand Down Expand Up @@ -82,11 +81,6 @@ public WriterContext setEncoder(IEncoder<?> encoder) {
return this;
}

public WriterContext setRefCount(int refCount) {
this.refCount = refCount;
return this;
}

@Override
public PipelineInfo getPipelineInfo() {
return pipelineInfo;
Expand Down Expand Up @@ -137,11 +131,6 @@ public DataExchangeMode getDataExchangeMode() {
return this.dataExchangeMode;
}

@Override
public int getRefCount() {
return this.refCount;
}

public static WriterContextBuilder newBuilder() {
return new WriterContextBuilder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public WriteNextMessageIfPossibleListener(PipeBuffer pipeBuffer) {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
try {
if (buffer != null && buffer.isDisposable()) {
if (buffer != null) {
buffer.release();
}
if (future.isSuccess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
public abstract class AbstractBuffer implements OutBuffer {

private final ShuffleMemoryTracker memoryTracker;
protected int refCount;

public AbstractBuffer(boolean enableMemoryTrack) {
this.memoryTracker = enableMemoryTrack
Expand All @@ -30,16 +29,6 @@ public AbstractBuffer(ShuffleMemoryTracker memoryTracker) {
this.memoryTracker = memoryTracker;
}

@Override
public void setRefCount(int refCount) {
this.refCount = refCount;
}

@Override
public boolean isDisposable() {
return this.refCount <= 0;
}

protected void requireMemory(long dataSize) {
if (this.memoryTracker != null) {
memoryTracker.requireMemory(dataSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,6 @@ public interface OutBuffer {
*/
void write(OutputStream outputStream) throws IOException;

/**
* Set ref count, the number of consumer which handle this buffer.
*
* @param refCount ref count.
*/
void setRefCount(int refCount);

/**
* Check if this buffer disposable.
*
* @return if disposable.
*/
boolean isDisposable();

/**
* Release this buffer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,14 @@ public abstract class AbstractSlice implements IPipelineSlice {

protected final SliceId sliceId;
protected final String taskLogTag;
protected int refCount;
protected int totalBufferCount;
protected ArrayDeque<PipeBuffer> buffers;
protected PipelineSliceReader sliceReader;
protected volatile boolean isReleased;

public AbstractSlice(String taskLogTag, SliceId sliceId, int refCount) {
public AbstractSlice(String taskLogTag, SliceId sliceId) {
this.sliceId = sliceId;
this.taskLogTag = taskLogTag;
this.refCount = refCount;
this.totalBufferCount = 0;
this.buffers = new ArrayDeque<>();
}
Expand All @@ -61,23 +59,17 @@ public PipelineSliceReader createSliceReader(long startBatchId, PipelineSliceLis
throw new GeaflowRuntimeException("slice is already created:" + sliceId);
}

refCount--;
LOGGER.info("creating reader for {} {} with startBatch:{} refCount:{}",
taskLogTag, sliceId, startBatchId, refCount);
LOGGER.info("creating reader for {} {} with startBatch:{}",
taskLogTag, sliceId, startBatchId);

// multiple repeatable readers can exist at the same time.
if (refCount >= 1) {
sliceReader = new RepeatableSliceReader(this, startBatchId, listener);
} else {
sliceReader = new DisposableSliceReader(this, startBatchId, listener);
}
sliceReader = new DisposableSliceReader(this, startBatchId, listener);
return sliceReader;
}
}

@Override
public boolean canRelease() {
return refCount == 0 && !hasNext();
return !hasNext();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import com.antgroup.geaflow.shuffle.message.SliceId;
import com.antgroup.geaflow.shuffle.pipeline.buffer.PipeBuffer;
import java.util.Iterator;

public interface IPipelineSlice {

Expand Down Expand Up @@ -60,9 +59,4 @@ public interface IPipelineSlice {
*/
PipeBuffer next();

/**
* Get slice buffer iterator.
*/
Iterator<PipeBuffer> getBufferIterator();

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.antgroup.geaflow.shuffle.message.SliceId;
import com.antgroup.geaflow.shuffle.pipeline.buffer.PipeBuffer;
import com.google.common.base.Preconditions;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,11 +27,7 @@ public class PipelineSlice extends AbstractSlice {
private boolean flushRequested;

public PipelineSlice(String taskLogTag, SliceId sliceId) {
super(taskLogTag, sliceId, 1);
}

public PipelineSlice(String taskLogTag, SliceId sliceId, int refCount) {
super(taskLogTag, sliceId, refCount);
super(taskLogTag, sliceId);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -87,12 +82,6 @@ private void notifyDataAvailable(long batchId) {
}
}

public void setRefCount(int refCount) {
synchronized (buffers) {
this.refCount = refCount;
}
}

// ------------------------------------------------------------------------
// Consume
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -133,11 +122,6 @@ private void updateFlushRequested(boolean flushRequested) {
this.flushRequested = flushRequested;
}

@Override
public Iterator<PipeBuffer> getBufferIterator() {
return buffers.iterator();
}

@Override
public void release() {
final PipelineSliceReader reader;
Expand Down

This file was deleted.

Loading
Loading