diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java index 94624580a481..0eee468dab7c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig; import org.apache.pinot.core.segment.processing.timehandler.TimeHandler; import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerConfig; @@ -43,10 +44,12 @@ public class SegmentProcessorConfig { private final MergeType _mergeType; private final Map _aggregationTypes; private final SegmentConfig _segmentConfig; + private final Consumer _progressObserver; private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, TimeHandlerConfig timeHandlerConfig, List partitionerConfigs, MergeType mergeType, - Map aggregationTypes, SegmentConfig segmentConfig) { + Map aggregationTypes, SegmentConfig segmentConfig, + Consumer progressObserver) { _tableConfig = tableConfig; _schema = schema; _timeHandlerConfig = timeHandlerConfig; @@ -54,6 +57,9 @@ private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, TimeHandl _mergeType = mergeType; _aggregationTypes = aggregationTypes; _segmentConfig = segmentConfig; + _progressObserver = (progressObserver != null) ? progressObserver : p -> { + // Do nothing. + }; } /** @@ -105,6 +111,10 @@ public SegmentConfig getSegmentConfig() { return _segmentConfig; } + public Consumer getProgressObserver() { + return _progressObserver; + } + @Override public String toString() { return "SegmentProcessorConfig{" + "_tableConfig=" + _tableConfig + ", _schema=" + _schema + ", _timeHandlerConfig=" @@ -123,6 +133,7 @@ public static class Builder { private MergeType _mergeType; private Map _aggregationTypes; private SegmentConfig _segmentConfig; + private Consumer _progressObserver; public Builder setTableConfig(TableConfig tableConfig) { _tableConfig = tableConfig; @@ -159,6 +170,11 @@ public Builder setSegmentConfig(SegmentConfig segmentConfig) { return this; } + public Builder setProgressObserver(Consumer progressObserver) { + _progressObserver = progressObserver; + return this; + } + public SegmentProcessorConfig build() { Preconditions.checkState(_tableConfig != null, "Must provide table config in SegmentProcessorConfig"); Preconditions.checkState(_schema != null, "Must provide schema in SegmentProcessorConfig"); @@ -179,7 +195,7 @@ public SegmentProcessorConfig build() { _segmentConfig = new SegmentConfig.Builder().build(); } return new SegmentProcessorConfig(_tableConfig, _schema, _timeHandlerConfig, _partitionerConfigs, _mergeType, - _aggregationTypes, _segmentConfig); + _aggregationTypes, _segmentConfig, _progressObserver); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java index e86b45691587..42aea2a6f5e0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.apache.commons.io.FileUtils; import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager; import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader; @@ -103,8 +104,13 @@ public List process() // Reduce phase LOGGER.info("Beginning reduce phase on partitions: {}", partitionToFileManagerMap.keySet()); + Consumer observer = _segmentProcessorConfig.getProgressObserver(); + int totalCount = partitionToFileManagerMap.keySet().size(); + int count = 1; for (Map.Entry entry : partitionToFileManagerMap.entrySet()) { String partitionId = entry.getKey(); + observer.accept(String + .format("Doing reduce phase on data from partition: %s (%d out of %d)", partitionId, count++, totalCount)); GenericRowFileManager fileManager = entry.getValue(); Reducer reducer = ReducerFactory.getReducer(partitionId, fileManager, _segmentProcessorConfig, _reducerOutputDir); entry.setValue(reducer.reduce()); @@ -146,6 +152,9 @@ public List process() int endRowId = Math.min(startRowId + maxNumRecordsPerSegment, numRows); LOGGER.info("Start creating segment of sequenceId: {} with row range: {} to {}", sequenceId, startRowId, endRowId); + observer.accept(String.format( + "Creating segment of sequentId: %d with data from partition: %s and row range: [%d, %d) out of [0, %d)", + sequenceId, partitionId, startRowId, endRowId, numRows)); generatorConfig.setSequenceId(sequenceId); GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId); SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java index 076d689b7f11..f20b8631f65a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; @@ -60,6 +61,7 @@ public class SegmentMapper { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMapper.class); private final List _recordReaders; + private final SegmentProcessorConfig _processorConfig; private final File _mapperOutputDir; private final List _fieldSpecs; @@ -75,6 +77,7 @@ public class SegmentMapper { public SegmentMapper(List recordReaders, SegmentProcessorConfig processorConfig, File mapperOutputDir) { _recordReaders = recordReaders; + _processorConfig = processorConfig; _mapperOutputDir = mapperOutputDir; TableConfig tableConfig = processorConfig.getTableConfig(); @@ -105,8 +108,12 @@ public SegmentMapper(List recordReaders, SegmentProcessorConfig pr */ public Map map() throws Exception { + Consumer observer = _processorConfig.getProgressObserver(); + int totalCount = _recordReaders.size(); + int count = 1; GenericRow reuse = new GenericRow(); for (RecordReader recordReader : _recordReaders) { + observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count++, totalCount)); while (recordReader.hasNext()) { reuse = recordReader.next(reuse); diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java index b20029f7a286..f37e3a6f55d6 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java @@ -18,8 +18,6 @@ */ package org.apache.pinot.minion.event; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.ArrayList; import java.util.Deque; import java.util.LinkedList; @@ -27,6 +25,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.spi.utils.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,7 +94,7 @@ public synchronized void notifyTaskCancelled(PinotTaskConfig pinotTaskConfig) { @Override public synchronized void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception e) { long endTs = System.currentTimeMillis(); - addStatus(endTs, "Task failed in " + (endTs - _startTs) + "ms with error: " + makeStringFromException(e)); + addStatus(endTs, "Task failed in " + (endTs - _startTs) + "ms with error: " + StringUtil.getStackTraceAsString(e)); super.notifyTaskError(pinotTaskConfig, e); } @@ -106,14 +105,6 @@ private void addStatus(long ts, String progress) { } } - private static String makeStringFromException(Exception exp) { - StringWriter expStr = new StringWriter(); - try (PrintWriter pw = new PrintWriter(expStr)) { - exp.printStackTrace(pw); - } - return expStr.toString(); - } - public static class StatusEntry { private final long _ts; private final String _status; diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java index 8668b6e3a6ba..7d95d73dbcd8 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java @@ -46,6 +46,7 @@ import org.apache.pinot.minion.executor.PinotTaskExecutor; import org.apache.pinot.minion.executor.PinotTaskExecutorFactory; import org.apache.pinot.minion.executor.TaskExecutorFactoryRegistry; +import org.apache.pinot.spi.utils.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -107,9 +108,8 @@ public TaskResult run() { private TaskResult runInternal() { PinotTaskConfig pinotTaskConfig = PinotTaskConfig.fromHelixTaskConfig(_taskConfig); if (StringUtils.isBlank(pinotTaskConfig.getConfigs().get(MinionConstants.AUTH_TOKEN))) { - pinotTaskConfig.getConfigs() - .put(MinionConstants.AUTH_TOKEN, - AuthProviderUtils.toStaticToken(MinionContext.getInstance().getTaskAuthProvider())); + pinotTaskConfig.getConfigs().put(MinionConstants.AUTH_TOKEN, + AuthProviderUtils.toStaticToken(MinionContext.getInstance().getTaskAuthProvider())); } _eventObserver.notifyTaskStart(pinotTaskConfig); @@ -127,17 +127,17 @@ private TaskResult runInternal() { _eventObserver.notifyTaskCancelled(pinotTaskConfig); _minionMetrics.addMeteredTableValue(taskType, MinionMeter.NUMBER_TASKS_CANCELLED, 1L); LOGGER.info("Task: {} got cancelled", _taskConfig.getId(), e); - return new TaskResult(TaskResult.Status.CANCELED, e.toString()); + return new TaskResult(TaskResult.Status.CANCELED, StringUtil.getStackTraceAsString(e)); } catch (FatalException e) { _eventObserver.notifyTaskError(pinotTaskConfig, e); _minionMetrics.addMeteredTableValue(taskType, MinionMeter.NUMBER_TASKS_FATAL_FAILED, 1L); LOGGER.error("Caught fatal exception while executing task: {}", _taskConfig.getId(), e); - return new TaskResult(TaskResult.Status.FATAL_FAILED, e.toString()); + return new TaskResult(TaskResult.Status.FATAL_FAILED, StringUtil.getStackTraceAsString(e)); } catch (Exception e) { _eventObserver.notifyTaskError(pinotTaskConfig, e); _minionMetrics.addMeteredTableValue(taskType, MinionMeter.NUMBER_TASKS_FAILED, 1L); LOGGER.error("Caught exception while executing task: {}", _taskConfig.getId(), e); - return new TaskResult(TaskResult.Status.FAILED, e.toString()); + return new TaskResult(TaskResult.Status.FAILED, StringUtil.getStackTraceAsString(e)); } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java index 932831609931..5b64c5842026 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java @@ -164,15 +164,15 @@ public List executeTask(PinotTaskConfig pinotTaskConfig List inputSegmentDirs = new ArrayList<>(); for (int i = 0; i < downloadURLs.length; i++) { // Download the segment file - _eventObserver.notifyProgress(_pinotTaskConfig, - String.format("Downloading segment from: %s, %d/%d", downloadURLs[i], (i + 1), downloadURLs.length)); + _eventObserver.notifyProgress(_pinotTaskConfig, String + .format("Downloading segment from: %s (%d out of %d)", downloadURLs[i], (i + 1), downloadURLs.length)); File tarredSegmentFile = new File(tempDataDir, "tarredSegmentFile_" + i); LOGGER.info("Downloading segment from {} to {}", downloadURLs[i], tarredSegmentFile.getAbsolutePath()); SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadURLs[i], tarredSegmentFile, crypterName); // Un-tar the segment file - _eventObserver.notifyProgress(_pinotTaskConfig, - String.format("Decompressing segment from: %s, %d/%d", downloadURLs[i], (i + 1), downloadURLs.length)); + _eventObserver.notifyProgress(_pinotTaskConfig, String + .format("Decompressing segment from: %s (%d out of %d)", downloadURLs[i], (i + 1), downloadURLs.length)); File segmentDir = new File(tempDataDir, "segmentDir_" + i); File indexDir = TarGzCompressionUtils.untar(tarredSegmentFile, segmentDir).get(0); inputSegmentDirs.add(indexDir); @@ -196,7 +196,7 @@ public List executeTask(PinotTaskConfig pinotTaskConfig for (SegmentConversionResult segmentConversionResult : segmentConversionResults) { // Tar the converted segment _eventObserver.notifyProgress(_pinotTaskConfig, String - .format("Compressing segment: %s, %d/%d", segmentConversionResult.getSegmentName(), count++, + .format("Compressing segment: %s (%d out of %d)", segmentConversionResult.getSegmentName(), count++, numOutputSegments)); File convertedSegmentDir = segmentConversionResult.getFile(); File convertedSegmentTarFile = new File(convertedTarredSegmentDir, @@ -233,7 +233,7 @@ public List executeTask(PinotTaskConfig pinotTaskConfig SegmentConversionResult segmentConversionResult = segmentConversionResults.get(i); String resultSegmentName = segmentConversionResult.getSegmentName(); _eventObserver.notifyProgress(_pinotTaskConfig, - String.format("Uploading segment: %s, %d/%d", resultSegmentName, (i + 1), numOutputSegments)); + String.format("Uploading segment: %s (%d out of %d)", resultSegmentName, (i + 1), numOutputSegments)); // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java index 36b5eddb600b..049859c1a996 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java @@ -86,13 +86,16 @@ protected List convert(PinotTaskConfig pinotTaskConfig, // Segment config segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs)); + // Progress observer + segmentProcessorConfigBuilder.setProgressObserver(p -> _eventObserver.notifyProgress(_pinotTaskConfig, p)); + SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build(); List recordReaders = new ArrayList<>(numInputSegments); int count = 1; for (File segmentDir : segmentDirs) { _eventObserver.notifyProgress(_pinotTaskConfig, - String.format("Creating RecordReader for: %s, %d/%d", segmentDir, count++, numInputSegments)); + String.format("Creating RecordReader for: %s (%d out of %d)", segmentDir, count++, numInputSegments)); PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader(); // NOTE: Do not fill null field with default value to be consistent with other record readers recordReader.init(segmentDir, null, null, true); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java index fffb4b7bce9a..502fa1cc7629 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java @@ -152,13 +152,16 @@ protected List convert(PinotTaskConfig pinotTaskConfig, // Segment config segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs)); + // Progress observer + segmentProcessorConfigBuilder.setProgressObserver(p -> _eventObserver.notifyProgress(_pinotTaskConfig, p)); + SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build(); List recordReaders = new ArrayList<>(numInputSegments); int count = 1; for (File segmentDir : segmentDirs) { _eventObserver.notifyProgress(_pinotTaskConfig, - String.format("Creating RecordReader for: %s, %d/%d", segmentDir, count++, numInputSegments)); + String.format("Creating RecordReader for: %s (%d out of %d)", segmentDir, count++, numInputSegments)); PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader(); // NOTE: Do not fill null field with default value to be consistent with other record readers recordReader.init(segmentDir, null, null, true); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/StringUtil.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/StringUtil.java index d7d2d25875e6..c4c664992476 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/StringUtil.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/StringUtil.java @@ -18,6 +18,8 @@ */ package org.apache.pinot.spi.utils; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.ArrayList; import java.util.List; import org.apache.commons.lang.ArrayUtils; @@ -85,4 +87,15 @@ public static String sanitizeStringValue(String value, int maxLength) { } return value.substring(0, Math.min(index, maxLength)); } + + /** + * Get the exception full stack track as String. + */ + public static String getStackTraceAsString(Exception exp) { + StringWriter expStr = new StringWriter(); + try (PrintWriter pw = new PrintWriter(expStr)) { + exp.printStackTrace(pw); + } + return expStr.toString(); + } }