Skip to content

Commit

Permalink
Skip sending ShardActiveRequest checks in stateless nodes (elastic#12…
Browse files Browse the repository at this point in the history
…1387)

Stateless nodes rely on an external blob store to persist data, therefore
it's not necessary to go through such checks when a shard store
should be deleted.

Closes ES-10577
  • Loading branch information
fcofdez authored Feb 6, 2025
1 parent eb6a49b commit 0015d56
Showing 1 changed file with 34 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ private void deleteShardIfExistElseWhere(
long clusterStateVersion,
IndexShardRoutingTable indexShardRoutingTable
) {
if (DiscoveryNode.isStateless(clusterService.getSettings())) {
deleteShardStoreOnApplierThread(indexShardRoutingTable.shardId(), clusterStateVersion);
return;
}

List<Tuple<DiscoveryNode, ShardActiveRequest>> requests = new ArrayList<>(indexShardRoutingTable.size());
String indexUUID = indexShardRoutingTable.shardId().getIndex().getUUID();
for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) {
Expand Down Expand Up @@ -320,34 +325,37 @@ private void allNodesResponded() {
return;
}

clusterService.getClusterApplierService()
.runOnApplierThread("indices_store ([" + shardId + "] active fully on other nodes)", Priority.HIGH, currentState -> {
if (clusterStateVersion != currentState.getVersion()) {
logger.trace(
"not deleting shard {}, the update task state version[{}] is not equal to cluster state before "
+ "shard active api call [{}]",
shardId,
currentState.getVersion(),
clusterStateVersion
);
return;
}
try {
indicesService.deleteShardStore("no longer used", shardId, currentState);
} catch (Exception ex) {
logger.debug(() -> format("%s failed to delete unallocated shard, ignoring", shardId), ex);
}
}, new ActionListener<>() {
@Override
public void onResponse(Void unused) {}

@Override
public void onFailure(Exception e) {
logger.error(() -> format("%s unexpected error during deletion of unallocated shard", shardId), e);
}
});
deleteShardStoreOnApplierThread(shardId, clusterStateVersion);
}
}

private void deleteShardStoreOnApplierThread(ShardId shardId, long clusterStateVersion) {
clusterService.getClusterApplierService()
.runOnApplierThread("indices_store ([" + shardId + "] active fully on other nodes)", Priority.HIGH, currentState -> {
if (clusterStateVersion != currentState.getVersion()) {
logger.trace(
"not deleting shard {}, the update task state version[{}] is not equal to cluster state before "
+ "shard active api call [{}]",
shardId,
currentState.getVersion(),
clusterStateVersion
);
return;
}
try {
indicesService.deleteShardStore("no longer used", shardId, currentState);
} catch (Exception ex) {
logger.debug(() -> format("%s failed to delete unallocated shard, ignoring", shardId), ex);
}
}, new ActionListener<>() {
@Override
public void onResponse(Void unused) {}

@Override
public void onFailure(Exception e) {
logger.error(() -> format("%s unexpected error during deletion of unallocated shard", shardId), e);
}
});
}

private class ShardActiveRequestHandler implements TransportRequestHandler<ShardActiveRequest> {
Expand Down

0 comments on commit 0015d56

Please sign in to comment.