Skip to content

Commit

Permalink
Fix the broker routing when segment is deleted (apache#7817)
Browse files Browse the repository at this point in the history
# Issue
When a segment is deleted, or moved from one server to another server, it follows these steps:
1. Ideal state is modified by the controller
2. Controller sends messages to servers to offload and delete the segment
3. Servers deletes the segment and update their current state
4. Controller gathers current states of the servers and updates the external view
5. Broker watches external view and updates the routing to not query the servers

Between step 3 to 5, broker will still query the server for the deleted segment, but the segment is already offloaded from the server.

# Solution
Broker should also watch on ideal state change, and update the routing when the segment is deleted or moved in the ideal state. If a segment/instance is in the external view but not in the ideal state, broker should not query the segment/instance as it will be dropped soon in the external view.
  • Loading branch information
Jackie-Jiang authored and kriti-sc committed Dec 12, 2021
1 parent 63590df commit b7fd066
Show file tree
Hide file tree
Showing 18 changed files with 344 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
protected String _hostname;
protected int _port;
protected String _instanceId;
protected final List<ClusterChangeHandler> _idealStateChangeHandlers = new ArrayList<>();
protected final List<ClusterChangeHandler> _externalViewChangeHandlers = new ArrayList<>();
protected final List<ClusterChangeHandler> _instanceConfigChangeHandlers = new ArrayList<>();
protected final List<ClusterChangeHandler> _liveInstanceChangeHandlers = new ArrayList<>();
Expand Down Expand Up @@ -141,6 +142,15 @@ private void setupHelixSystemProperties() {
_brokerConf.getProperty(Helix.CONFIG_OF_BROKER_FLAPPING_TIME_WINDOW_MS, Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
}

/**
* Adds an ideal state change handler to handle Helix ideal state change callbacks.
* <p>NOTE: all change handlers will be run in a single thread, so any slow change handler can block other change
* handlers from running. For slow change handler, make it asynchronous.
*/
public void addIdealStateChangeHandler(ClusterChangeHandler idealStateChangeHandler) {
_idealStateChangeHandlers.add(idealStateChangeHandler);
}

/**
* Adds an external view change handler to handle Helix external view change callbacks.
* <p>NOTE: all change handlers will be run in a single thread, so any slow change handler can block other change
Expand Down Expand Up @@ -241,6 +251,10 @@ public void start()
_brokerAdminApplication.start(_listenerConfigs);

LOGGER.info("Initializing cluster change mediator");
for (ClusterChangeHandler idealStateChangeHandler : _idealStateChangeHandlers) {
idealStateChangeHandler.init(_spectatorHelixManager);
}
_idealStateChangeHandlers.add(_routingManager);
for (ClusterChangeHandler externalViewChangeHandler : _externalViewChangeHandlers) {
externalViewChangeHandler.init(_spectatorHelixManager);
}
Expand All @@ -255,13 +269,15 @@ public void start()
liveInstanceChangeHandler.init(_spectatorHelixManager);
}
Map<ChangeType, List<ClusterChangeHandler>> clusterChangeHandlersMap = new HashMap<>();
clusterChangeHandlersMap.put(ChangeType.IDEAL_STATE, _idealStateChangeHandlers);
clusterChangeHandlersMap.put(ChangeType.EXTERNAL_VIEW, _externalViewChangeHandlers);
clusterChangeHandlersMap.put(ChangeType.INSTANCE_CONFIG, _instanceConfigChangeHandlers);
if (!_liveInstanceChangeHandlers.isEmpty()) {
clusterChangeHandlersMap.put(ChangeType.LIVE_INSTANCE, _liveInstanceChangeHandlers);
}
_clusterChangeMediator = new ClusterChangeMediator(clusterChangeHandlersMap, _brokerMetrics);
_clusterChangeMediator.start();
_spectatorHelixManager.addIdealStateChangeListener(_clusterChangeMediator);
_spectatorHelixManager.addExternalViewChangeListener(_clusterChangeMediator);
_spectatorHelixManager.addInstanceConfigChangeListener(_clusterChangeMediator);
if (!_liveInstanceChangeHandlers.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import org.apache.helix.NotificationContext;
import org.apache.helix.api.listeners.BatchMode;
import org.apache.helix.api.listeners.ExternalViewChangeListener;
import org.apache.helix.api.listeners.IdealStateChangeListener;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.pinot.common.metrics.BrokerMeter;
Expand All @@ -51,7 +53,8 @@
@BatchMode(enabled = false)
@PreFetch(enabled = false)
public class ClusterChangeMediator
implements ExternalViewChangeListener, InstanceConfigChangeListener, LiveInstanceChangeListener {
implements IdealStateChangeListener, ExternalViewChangeListener, InstanceConfigChangeListener,
LiveInstanceChangeListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterChangeMediator.class);

// If no change got for 1 hour, proactively check changes
Expand Down Expand Up @@ -168,6 +171,15 @@ public synchronized void stop() {
}
}

@Override
public void onIdealStateChange(List<IdealState> idealStateList, NotificationContext changeContext)
throws InterruptedException {
// Ideal state list should be empty because Helix pre-fetch is disabled
assert idealStateList.isEmpty();

enqueueChange(ChangeType.IDEAL_STATE);
}

@Override
public void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) {
// External view list should be empty because Helix pre-fetch is disabled
Expand Down
Loading

0 comments on commit b7fd066

Please sign in to comment.