Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

track progress from within segment processor framework #9457

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerConfig;
Expand All @@ -43,17 +44,22 @@ public class SegmentProcessorConfig {
private final MergeType _mergeType;
private final Map<String, AggregationFunctionType> _aggregationTypes;
private final SegmentConfig _segmentConfig;
private final Consumer<Object> _progressObserver;

private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, TimeHandlerConfig timeHandlerConfig,
List<PartitionerConfig> partitionerConfigs, MergeType mergeType,
Map<String, AggregationFunctionType> aggregationTypes, SegmentConfig segmentConfig) {
Map<String, AggregationFunctionType> aggregationTypes, SegmentConfig segmentConfig,
Consumer<Object> progressObserver) {
_tableConfig = tableConfig;
_schema = schema;
_timeHandlerConfig = timeHandlerConfig;
_partitionerConfigs = partitionerConfigs;
_mergeType = mergeType;
_aggregationTypes = aggregationTypes;
_segmentConfig = segmentConfig;
_progressObserver = (progressObserver != null) ? progressObserver : p -> {
// Do nothing.
};
}

/**
Expand Down Expand Up @@ -105,6 +111,10 @@ public SegmentConfig getSegmentConfig() {
return _segmentConfig;
}

public Consumer<Object> getProgressObserver() {
return _progressObserver;
}

@Override
public String toString() {
return "SegmentProcessorConfig{" + "_tableConfig=" + _tableConfig + ", _schema=" + _schema + ", _timeHandlerConfig="
Expand All @@ -123,6 +133,7 @@ public static class Builder {
private MergeType _mergeType;
private Map<String, AggregationFunctionType> _aggregationTypes;
private SegmentConfig _segmentConfig;
private Consumer<Object> _progressObserver;

public Builder setTableConfig(TableConfig tableConfig) {
_tableConfig = tableConfig;
Expand Down Expand Up @@ -159,6 +170,11 @@ public Builder setSegmentConfig(SegmentConfig segmentConfig) {
return this;
}

public Builder setProgressObserver(Consumer<Object> progressObserver) {
_progressObserver = progressObserver;
return this;
}

public SegmentProcessorConfig build() {
Preconditions.checkState(_tableConfig != null, "Must provide table config in SegmentProcessorConfig");
Preconditions.checkState(_schema != null, "Must provide schema in SegmentProcessorConfig");
Expand All @@ -179,7 +195,7 @@ public SegmentProcessorConfig build() {
_segmentConfig = new SegmentConfig.Builder().build();
}
return new SegmentProcessorConfig(_tableConfig, _schema, _timeHandlerConfig, _partitionerConfigs, _mergeType,
_aggregationTypes, _segmentConfig);
_aggregationTypes, _segmentConfig, _progressObserver);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
Expand Down Expand Up @@ -103,8 +104,13 @@ public List<File> process()

// Reduce phase
LOGGER.info("Beginning reduce phase on partitions: {}", partitionToFileManagerMap.keySet());
Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver();
int totalCount = partitionToFileManagerMap.keySet().size();
int count = 1;
for (Map.Entry<String, GenericRowFileManager> entry : partitionToFileManagerMap.entrySet()) {
String partitionId = entry.getKey();
observer.accept(String
.format("Doing reduce phase on data from partition: %s (%d out of %d)", partitionId, count++, totalCount));
GenericRowFileManager fileManager = entry.getValue();
Reducer reducer = ReducerFactory.getReducer(partitionId, fileManager, _segmentProcessorConfig, _reducerOutputDir);
entry.setValue(reducer.reduce());
Expand Down Expand Up @@ -146,6 +152,9 @@ public List<File> process()
int endRowId = Math.min(startRowId + maxNumRecordsPerSegment, numRows);
LOGGER.info("Start creating segment of sequenceId: {} with row range: {} to {}", sequenceId, startRowId,
endRowId);
observer.accept(String.format(
"Creating segment of sequentId: %d with data from partition: %s and row range: [%d, %d) out of [0, %d)",
sequenceId, partitionId, startRowId, endRowId, numRows));
generatorConfig.setSequenceId(sequenceId);
GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId);
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class SegmentMapper {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMapper.class);

private final List<RecordReader> _recordReaders;
private final SegmentProcessorConfig _processorConfig;
private final File _mapperOutputDir;

private final List<FieldSpec> _fieldSpecs;
Expand All @@ -75,6 +77,7 @@ public class SegmentMapper {

public SegmentMapper(List<RecordReader> recordReaders, SegmentProcessorConfig processorConfig, File mapperOutputDir) {
_recordReaders = recordReaders;
_processorConfig = processorConfig;
_mapperOutputDir = mapperOutputDir;

TableConfig tableConfig = processorConfig.getTableConfig();
Expand Down Expand Up @@ -105,8 +108,12 @@ public SegmentMapper(List<RecordReader> recordReaders, SegmentProcessorConfig pr
*/
public Map<String, GenericRowFileManager> map()
throws Exception {
Consumer<Object> observer = _processorConfig.getProgressObserver();
int totalCount = _recordReaders.size();
int count = 1;
GenericRow reuse = new GenericRow();
for (RecordReader recordReader : _recordReaders) {
observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count++, totalCount));
while (recordReader.hasNext()) {
reuse = recordReader.next(reuse);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
*/
package org.apache.pinot.minion.event;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.utils.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -95,7 +94,7 @@ public synchronized void notifyTaskCancelled(PinotTaskConfig pinotTaskConfig) {
@Override
public synchronized void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception e) {
long endTs = System.currentTimeMillis();
addStatus(endTs, "Task failed in " + (endTs - _startTs) + "ms with error: " + makeStringFromException(e));
addStatus(endTs, "Task failed in " + (endTs - _startTs) + "ms with error: " + StringUtil.getStackTraceAsString(e));
super.notifyTaskError(pinotTaskConfig, e);
}

Expand All @@ -106,14 +105,6 @@ private void addStatus(long ts, String progress) {
}
}

private static String makeStringFromException(Exception exp) {
StringWriter expStr = new StringWriter();
try (PrintWriter pw = new PrintWriter(expStr)) {
exp.printStackTrace(pw);
}
return expStr.toString();
}

public static class StatusEntry {
private final long _ts;
private final String _status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pinot.minion.executor.PinotTaskExecutor;
import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
import org.apache.pinot.minion.executor.TaskExecutorFactoryRegistry;
import org.apache.pinot.spi.utils.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand Down Expand Up @@ -107,9 +108,8 @@ public TaskResult run() {
private TaskResult runInternal() {
PinotTaskConfig pinotTaskConfig = PinotTaskConfig.fromHelixTaskConfig(_taskConfig);
if (StringUtils.isBlank(pinotTaskConfig.getConfigs().get(MinionConstants.AUTH_TOKEN))) {
pinotTaskConfig.getConfigs()
.put(MinionConstants.AUTH_TOKEN,
AuthProviderUtils.toStaticToken(MinionContext.getInstance().getTaskAuthProvider()));
pinotTaskConfig.getConfigs().put(MinionConstants.AUTH_TOKEN,
AuthProviderUtils.toStaticToken(MinionContext.getInstance().getTaskAuthProvider()));
}

_eventObserver.notifyTaskStart(pinotTaskConfig);
Expand All @@ -127,17 +127,17 @@ private TaskResult runInternal() {
_eventObserver.notifyTaskCancelled(pinotTaskConfig);
_minionMetrics.addMeteredTableValue(taskType, MinionMeter.NUMBER_TASKS_CANCELLED, 1L);
LOGGER.info("Task: {} got cancelled", _taskConfig.getId(), e);
return new TaskResult(TaskResult.Status.CANCELED, e.toString());
return new TaskResult(TaskResult.Status.CANCELED, StringUtil.getStackTraceAsString(e));
} catch (FatalException e) {
_eventObserver.notifyTaskError(pinotTaskConfig, e);
_minionMetrics.addMeteredTableValue(taskType, MinionMeter.NUMBER_TASKS_FATAL_FAILED, 1L);
LOGGER.error("Caught fatal exception while executing task: {}", _taskConfig.getId(), e);
return new TaskResult(TaskResult.Status.FATAL_FAILED, e.toString());
return new TaskResult(TaskResult.Status.FATAL_FAILED, StringUtil.getStackTraceAsString(e));
} catch (Exception e) {
_eventObserver.notifyTaskError(pinotTaskConfig, e);
_minionMetrics.addMeteredTableValue(taskType, MinionMeter.NUMBER_TASKS_FAILED, 1L);
LOGGER.error("Caught exception while executing task: {}", _taskConfig.getId(), e);
return new TaskResult(TaskResult.Status.FAILED, e.toString());
return new TaskResult(TaskResult.Status.FAILED, StringUtil.getStackTraceAsString(e));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.toString() like e.getMessage() just returns the error msg (basically the first line of the stack trace).

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,15 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
List<File> inputSegmentDirs = new ArrayList<>();
for (int i = 0; i < downloadURLs.length; i++) {
// Download the segment file
_eventObserver.notifyProgress(_pinotTaskConfig,
String.format("Downloading segment from: %s, %d/%d", downloadURLs[i], (i + 1), downloadURLs.length));
_eventObserver.notifyProgress(_pinotTaskConfig, String
.format("Downloading segment from: %s (%d out of %d)", downloadURLs[i], (i + 1), downloadURLs.length));
File tarredSegmentFile = new File(tempDataDir, "tarredSegmentFile_" + i);
LOGGER.info("Downloading segment from {} to {}", downloadURLs[i], tarredSegmentFile.getAbsolutePath());
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadURLs[i], tarredSegmentFile, crypterName);

// Un-tar the segment file
_eventObserver.notifyProgress(_pinotTaskConfig,
String.format("Decompressing segment from: %s, %d/%d", downloadURLs[i], (i + 1), downloadURLs.length));
_eventObserver.notifyProgress(_pinotTaskConfig, String
.format("Decompressing segment from: %s (%d out of %d)", downloadURLs[i], (i + 1), downloadURLs.length));
File segmentDir = new File(tempDataDir, "segmentDir_" + i);
File indexDir = TarGzCompressionUtils.untar(tarredSegmentFile, segmentDir).get(0);
inputSegmentDirs.add(indexDir);
Expand All @@ -196,7 +196,7 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
for (SegmentConversionResult segmentConversionResult : segmentConversionResults) {
// Tar the converted segment
_eventObserver.notifyProgress(_pinotTaskConfig, String
.format("Compressing segment: %s, %d/%d", segmentConversionResult.getSegmentName(), count++,
.format("Compressing segment: %s (%d out of %d)", segmentConversionResult.getSegmentName(), count++,
numOutputSegments));
File convertedSegmentDir = segmentConversionResult.getFile();
File convertedSegmentTarFile = new File(convertedTarredSegmentDir,
Expand Down Expand Up @@ -233,7 +233,7 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
SegmentConversionResult segmentConversionResult = segmentConversionResults.get(i);
String resultSegmentName = segmentConversionResult.getSegmentName();
_eventObserver.notifyProgress(_pinotTaskConfig,
String.format("Uploading segment: %s, %d/%d", resultSegmentName, (i + 1), numOutputSegments));
String.format("Uploading segment: %s (%d out of %d)", resultSegmentName, (i + 1), numOutputSegments));

// Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,16 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig,
// Segment config
segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs));

// Progress observer
segmentProcessorConfigBuilder.setProgressObserver(p -> _eventObserver.notifyProgress(_pinotTaskConfig, p));

SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();

List<RecordReader> recordReaders = new ArrayList<>(numInputSegments);
int count = 1;
for (File segmentDir : segmentDirs) {
_eventObserver.notifyProgress(_pinotTaskConfig,
String.format("Creating RecordReader for: %s, %d/%d", segmentDir, count++, numInputSegments));
String.format("Creating RecordReader for: %s (%d out of %d)", segmentDir, count++, numInputSegments));
PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
// NOTE: Do not fill null field with default value to be consistent with other record readers
recordReader.init(segmentDir, null, null, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,16 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig,
// Segment config
segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs));

// Progress observer
segmentProcessorConfigBuilder.setProgressObserver(p -> _eventObserver.notifyProgress(_pinotTaskConfig, p));

SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();

List<RecordReader> recordReaders = new ArrayList<>(numInputSegments);
int count = 1;
for (File segmentDir : segmentDirs) {
_eventObserver.notifyProgress(_pinotTaskConfig,
String.format("Creating RecordReader for: %s, %d/%d", segmentDir, count++, numInputSegments));
String.format("Creating RecordReader for: %s (%d out of %d)", segmentDir, count++, numInputSegments));
PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
// NOTE: Do not fill null field with default value to be consistent with other record readers
recordReader.init(segmentDir, null, null, true);
Expand Down
13 changes: 13 additions & 0 deletions pinot-spi/src/main/java/org/apache/pinot/spi/utils/StringUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pinot.spi.utils;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.ArrayUtils;
Expand Down Expand Up @@ -85,4 +87,15 @@ public static String sanitizeStringValue(String value, int maxLength) {
}
return value.substring(0, Math.min(index, maxLength));
}

/**
* Get the exception full stack track as String.
*/
public static String getStackTraceAsString(Exception exp) {
StringWriter expStr = new StringWriter();
try (PrintWriter pw = new PrintWriter(expStr)) {
exp.printStackTrace(pw);
}
return expStr.toString();
}
}