Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always use smallest offset for new partitionGroups #8053

Merged
merged 1 commit into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
String segmentName =
setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
numPartitionGroups, numReplicas, newPartitionGroupMetadataList, false);
numPartitionGroups, numReplicas, newPartitionGroupMetadataList);

updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
instancePartitionsMap);
Expand Down Expand Up @@ -875,8 +875,12 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevel
if (idealState.isEnabled()) {
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
getPartitionGroupConsumptionStatusList(idealState, streamConfig);
// Read the smallest offset when a new partition is detected
OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
streamConfig.setOffsetCriteria(originalOffsetCriteria);
return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList);
} else {
LOGGER.info("Skipping LLC segments validation for disabled table: {}", realtimeTableName);
Expand Down Expand Up @@ -1196,7 +1200,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelS
if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
String newSegmentName =
setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
numPartitions, numReplicas, newPartitionGroupMetadataList, true);
numPartitions, numReplicas, newPartitionGroupMetadataList);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
instancePartitionsMap);
}
Expand All @@ -1206,14 +1210,11 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelS
}

private StreamPartitionMsgOffset getPartitionGroupSmallestOffset(StreamConfig streamConfig, int partitionGroupId) {
Map<String, String> streamConfigMapWithSmallestOffsetCriteria = new HashMap<>(streamConfig.getStreamConfigsMap());
streamConfigMapWithSmallestOffsetCriteria.put(StreamConfigProperties
.constructStreamProperty(streamConfig.getType(), StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA),
OffsetCriteria.SMALLEST_OFFSET_CRITERIA.getOffsetString());
StreamConfig smallestOffsetCriteriaStreamConfig =
new StreamConfig(streamConfig.getTableNameWithType(), streamConfigMapWithSmallestOffsetCriteria);
OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
List<PartitionGroupMetadata> smallestOffsetCriteriaPartitionGroupMetadata =
getNewPartitionGroupMetadataList(smallestOffsetCriteriaStreamConfig, Collections.emptyList());
getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList());
streamConfig.setOffsetCriteria(originalOffsetCriteria);
StreamPartitionMsgOffset partitionStartOffset = null;
for (PartitionGroupMetadata info : smallestOffsetCriteriaPartitionGroupMetadata) {
if (info.getPartitionGroupId() == partitionGroupId) {
Expand All @@ -1235,25 +1236,18 @@ private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName,
*/
private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
PartitionGroupMetadata partitionGroupMetadata, long creationTimeMs, InstancePartitions instancePartitions,
int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList,
boolean isLiveTable) {
int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList) {
String realtimeTableName = tableConfig.getTableName();
int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
StreamPartitionMsgOffset startOffset;
if (isLiveTable) {
startOffset = getPartitionGroupSmallestOffset(streamConfig, partitionGroupId);
} else {
startOffset = partitionGroupMetadata.getStartOffset();
}
String startOffset = partitionGroupMetadata.getStartOffset().toString();
LOGGER.info("Setting up new partition group: {} for table: {}", partitionGroupId, realtimeTableName);

String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
LLCSegmentName newLLCSegmentName =
new LLCSegmentName(rawTableName, partitionGroupId, STARTING_SEQUENCE_NUMBER, creationTimeMs);
String newSegmentName = newLLCSegmentName.getSegmentName();

CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null,
startOffset.toString(), 0);
CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, startOffset, 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs,
committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas,
partitionGroupMetadataList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public enum ConsumerType {
private final String _tableNameWithType;
private final List<ConsumerType> _consumerTypes = new ArrayList<>();
private final String _consumerFactoryClassName;
private final OffsetCriteria _offsetCriteria;
private final String _decoderClass;
private final Map<String, String> _decoderProperties = new HashMap<>();

Expand All @@ -79,6 +78,9 @@ public enum ConsumerType {

private final Map<String, String> _streamConfigMap = new HashMap<>();

// Allow overriding it to use different offset criteria
private OffsetCriteria _offsetCriteria;

/**
* Initializes a StreamConfig using the map of stream configs from the table config
*/
Expand Down Expand Up @@ -284,6 +286,10 @@ public OffsetCriteria getOffsetCriteria() {
return _offsetCriteria;
}

public void setOffsetCriteria(OffsetCriteria offsetCriteria) {
_offsetCriteria = offsetCriteria;
}

public String getDecoderClass() {
return _decoderClass;
}
Expand Down