Skip to content

Commit 4319f2b

Browse files
authored
Remove unnecessary refresh listeners from NRTReplicationReaderManager. (opensearch-project#8859)
* Remove unnecessary refresh listeners from NRTReplicationReaderManager. This change removes RefreshListeners used by InternalEngine to provide waitFor functionality. These listeners were previously registered onto NRT replicas only to be force released on the next refresh cycle without actually refreshing the reader. This change also removes the unnecessary blocking refresh from NRTReaderManager because we no longer have conflicting refresh invocations from scheduledRefresh. Signed-off-by: Marc Handalian <[email protected]> * Reduce the amount of docs ingested with testPrimaryRelocation and testPrimaryRelocationWithSegRepFailure. These tests were ingesting 100-1k docs and randomly selecting a refresh policy. Wtih the IMMEDIATE refresh policy a blocking refresh is performed that increase the time required for the primary to block operations for relocation. On my machine this change reduces the test time with max docs from 1m to 5-6s. Signed-off-by: Marc Handalian <[email protected]> --------- Signed-off-by: Marc Handalian <[email protected]>
1 parent 4fd6877 commit 4319f2b

File tree

5 files changed

+17
-19
lines changed

5 files changed

+17
-19
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public void testPrimaryRelocation() throws Exception {
6060
createIndex(1);
6161
final String replica = internalCluster().startNode();
6262
ensureGreen(INDEX_NAME);
63-
final int initialDocCount = scaledRandomIntBetween(100, 1000);
63+
final int initialDocCount = scaledRandomIntBetween(10, 100);
6464
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());
6565
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
6666
for (int i = 0; i < initialDocCount; i++) {
@@ -137,7 +137,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {
137137
createIndex(1);
138138
final String replica = internalCluster().startNode();
139139
ensureGreen(INDEX_NAME);
140-
final int initialDocCount = scaledRandomIntBetween(100, 1000);
140+
final int initialDocCount = scaledRandomIntBetween(10, 100);
141141
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());
142142
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
143143
for (int i = 0; i < initialDocCount; i++) {

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

+6-15
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,10 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
7777
this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
7878
this.readerManager = readerManager;
7979
this.readerManager.addListener(completionStatsCache);
80-
for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) {
81-
this.readerManager.addListener(listener);
82-
}
80+
// NRT Replicas do not have a concept of Internal vs External reader managers.
81+
// We also do not want to wire up refresh listeners for waitFor & pending refresh location.
82+
// which are the current external listeners set from IndexShard.
83+
// Only wire up the internal listeners.
8384
for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) {
8485
this.readerManager.addListener(listener);
8586
}
@@ -322,22 +323,12 @@ public List<Segment> segments(boolean verbose) {
322323

323324
@Override
324325
public void refresh(String source) throws EngineException {
325-
maybeRefresh(source);
326+
// Refresh on this engine should only ever happen in the reader after new segments arrive.
326327
}
327328

328329
@Override
329330
public boolean maybeRefresh(String source) throws EngineException {
330-
ensureOpen();
331-
try {
332-
return readerManager.maybeRefresh();
333-
} catch (IOException e) {
334-
try {
335-
failEngine("refresh failed source[" + source + "]", e);
336-
} catch (Exception inner) {
337-
e.addSuppressed(inner);
338-
}
339-
throw new RefreshFailedEngineException(shardId, e);
340-
}
331+
return false;
341332
}
342333

343334
@Override

server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public void updateSegments(SegmentInfos infos) throws IOException {
103103
// is always increased.
104104
infos.updateGeneration(currentInfos);
105105
currentInfos = infos;
106-
maybeRefreshBlocking();
106+
maybeRefresh();
107107
}
108108

109109
public SegmentInfos getSegmentInfos() {

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -4423,7 +4423,8 @@ public void addRefreshListener(Translog.Location location, Consumer<Boolean> lis
44234423
readAllowed = isReadAllowed();
44244424
}
44254425
}
4426-
if (readAllowed) {
4426+
// NRT Replicas will not accept refresh listeners.
4427+
if (readAllowed && isSegmentReplicationAllowed() == false) {
44274428
refreshListeners.addOrNotify(location, listener);
44284429
} else {
44294430
// we're not yet ready fo ready for reads, just ignore refresh cycles

server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java

+6
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ public void testReplicationCheckpointNotNullForSegRep() throws IOException {
120120
closeShards(indexShard);
121121
}
122122

123+
public void testNRTReplicasDoNotAcceptRefreshListeners() throws IOException {
124+
final IndexShard indexShard = newStartedShard(false, settings, new NRTReplicationEngineFactory());
125+
indexShard.addRefreshListener(mock(Translog.Location.class), Assert::assertFalse);
126+
closeShards(indexShard);
127+
}
128+
123129
public void testSegmentInfosAndReplicationCheckpointTuple() throws Exception {
124130
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
125131
shards.startAll();

0 commit comments

Comments
 (0)