Skip to content

Commit

Permalink
Remove special routing handling for multiple consuming segments (#11371)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Aug 17, 2023
1 parent 088599f commit 575398d
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 342 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ Map<String, Long> getNewSegmentPushTimeMapFromZK(IdealState idealState, External
newSegmentPushTimeMap.put(segmentZKMetadata.getSegmentName(), pushTimeMillis);
}
}
LOGGER.info("Got {} new segments: {} for table: {} by reading ZK metadata, current time: {}",
newSegmentPushTimeMap.size(), newSegmentPushTimeMap, _tableNameWithType, nowMillis);
return newSegmentPushTimeMap;
}

Expand Down Expand Up @@ -294,9 +296,9 @@ void refreshSegmentStates() {
for (SegmentInstanceCandidate candidate : candidates) {
candidateInstances.add(candidate.getInstance());
}
LOGGER.warn(
"Failed to find servers hosting segment: {} for table: {} (all candidate instances: {} are disabled, "
+ "counting segment as unavailable)", segment, _tableNameWithType, candidateInstances);
LOGGER.warn("Failed to find servers hosting old segment: {} for table: {} "
+ "(all candidate instances: {} are disabled, counting segment as unavailable)", segment,
_tableNameWithType, candidateInstances);
unavailableSegments.add(segment);
_brokerMetrics.addMeteredTableValue(_tableNameWithType, BrokerMeter.NO_SERVING_HOST_FOR_SEGMENT, 1);
}
Expand All @@ -314,8 +316,16 @@ void refreshSegmentStates() {
}
if (!enabledCandidates.isEmpty()) {
instanceCandidatesMap.put(segment, enabledCandidates);
} else {
// Do not count new segment as unavailable
List<String> candidateInstances = new ArrayList<>(candidates.size());
for (SegmentInstanceCandidate candidate : candidates) {
candidateInstances.add(candidate.getInstance());
}
LOGGER.info("Failed to find servers hosting new segment: {} for table: {} "
+ "(all candidate instances: {} are disabled, but not counting new segment as unavailable)", segment,
_tableNameWithType, candidateInstances);
}
// Do not count new segment as unavailable
}

_segmentStates = new SegmentStates(instanceCandidatesMap, unavailableSegments);
Expand Down Expand Up @@ -377,6 +387,8 @@ Map<String, Long> getNewSegmentPushTimeMapFromExistingStates(IdealState idealSta
}
}
}
LOGGER.info("Got {} new segments: {} for table: {} by processing existing states, current time: {}",
newSegmentPushTimeMap.size(), newSegmentPushTimeMap, _tableNameWithType, nowMillis);
return newSegmentPushTimeMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.HashUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand Down Expand Up @@ -66,6 +68,7 @@
* </pre>
*/
public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSelector {
private static final Logger LOGGER = LoggerFactory.getLogger(StrictReplicaGroupInstanceSelector.class);

public StrictReplicaGroupInstanceSelector(String tableNameWithType, ZkHelixPropertyStore<ZNRecord> propertyStore,
BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock) {
Expand Down Expand Up @@ -122,12 +125,19 @@ void updateSegmentMaps(IdealState idealState, ExternalView externalView, Set<Str
Map<Set<String>, Set<String>> unavailableInstancesMap = new HashMap<>();
for (Map.Entry<String, Set<String>> entry : oldSegmentToOnlineInstancesMap.entrySet()) {
String segment = entry.getKey();
Set<String> instancesInIdealState = idealStateAssignment.get(segment).keySet();
Set<String> onlineInstances = entry.getValue();
Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment);
Set<String> instancesInIdealState = idealStateInstanceStateMap.keySet();
Set<String> unavailableInstances =
unavailableInstancesMap.computeIfAbsent(instancesInIdealState, k -> new HashSet<>());
for (String instance : instancesInIdealState) {
if (!entry.getValue().contains(instance)) {
unavailableInstances.add(instance);
if (!onlineInstances.contains(instance)) {
if (unavailableInstances.add(instance)) {
LOGGER.warn(
"Found unavailable instance: {} in instance group: {} for segment: {}, table: {} (IS: {}, EV: {})",
instance, instancesInIdealState, segment, _tableNameWithType, idealStateInstanceStateMap,
externalViewAssignment.get(segment));
}
}
}
}
Expand All @@ -138,8 +148,7 @@ void updateSegmentMaps(IdealState idealState, ExternalView externalView, Set<Str
// NOTE: onlineInstances is either a TreeSet or an EmptySet (sorted)
Set<String> onlineInstances = entry.getValue();
Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment);
Set<String> unavailableInstances =
unavailableInstancesMap.getOrDefault(idealStateInstanceStateMap.keySet(), Collections.emptySet());
Set<String> unavailableInstances = unavailableInstancesMap.get(idealStateInstanceStateMap.keySet());
List<SegmentInstanceCandidate> candidates = new ArrayList<>(onlineInstances.size());
for (String instance : onlineInstances) {
if (!unavailableInstances.contains(instance)) {
Expand All @@ -156,9 +165,8 @@ void updateSegmentMaps(IdealState idealState, ExternalView externalView, Set<Str
Set<String> unavailableInstances =
unavailableInstancesMap.getOrDefault(idealStateInstanceStateMap.keySet(), Collections.emptySet());
List<SegmentInstanceCandidate> candidates = new ArrayList<>(idealStateInstanceStateMap.size());
for (Map.Entry<String, String> instanceStateEntry : convertToSortedMap(idealStateInstanceStateMap).entrySet()) {
String instance = instanceStateEntry.getKey();
if (!unavailableInstances.contains(instance) && isOnlineForRouting(instanceStateEntry.getValue())) {
for (String instance : convertToSortedMap(idealStateInstanceStateMap).keySet()) {
if (!unavailableInstances.contains(instance)) {
candidates.add(new SegmentInstanceCandidate(instance, onlineInstances.contains(instance)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,16 @@
import org.apache.pinot.common.request.BrokerRequest;


/**
* Segment selector for offline table.
*/
public class OfflineSegmentSelector implements SegmentSelector {
public class DefaultSegmentSelector implements SegmentSelector {
private volatile Set<String> _segments;

@Override
public void init(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) {
onAssignmentChange(idealState, externalView, onlineSegments);
_segments = Collections.unmodifiableSet(onlineSegments);
}

@Override
public void onAssignmentChange(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) {
// TODO: for new added segments, before all replicas are up, consider not selecting them to avoid causing
// hotspot servers

_segments = Collections.unmodifiableSet(onlineSegments);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,6 @@
/**
* The segment selector selects the segments for the query. The segments selected should cover the whole dataset (table)
* without overlap.
* <p>Segment selector examples:
* <ul>
* <li>
* For real-time table, when HLC and LLC segments coexist (during LLC migration), select only HLC segments or LLC
* segments
* </li>
* <li>For HLC real-time table, select segments in one group</li>
* <li>
* For table with segment merge/rollup enabled, select the merged segments over the original segments with the same
* data
* </li>
* </ul>
*/
public interface SegmentSelector {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,13 @@
package org.apache.pinot.broker.routing.segmentselector;

import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;


public class SegmentSelectorFactory {
private SegmentSelectorFactory() {
}

public static SegmentSelector getSegmentSelector(TableConfig tableConfig) {
if (tableConfig.getTableType() == TableType.OFFLINE) {
return new OfflineSegmentSelector();
} else {
return new RealtimeSegmentSelector();
}
return new DefaultSegmentSelector();
}
}
Loading

0 comments on commit 575398d

Please sign in to comment.