diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 4a7f9600ffa42..b28b201ca8280 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -158,10 +158,10 @@ public Collection createComponents( return emptyList(); } - CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(); - this.restoreSourceService.set(restoreSourceService); CcrSettings ccrSettings = new CcrSettings(settings, clusterService.getClusterSettings()); this.ccrSettings.set(ccrSettings); + CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(threadPool, ccrSettings); + this.restoreSourceService.set(restoreSourceService); return Arrays.asList( ccrLicenseChecker, restoreSourceService, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index fe0eb7853e3ce..26089ab46952d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -43,6 +43,14 @@ public final class CcrSettings { Setting.byteSizeSetting("ccr.indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB), Setting.Property.Dynamic, Setting.Property.NodeScope); + /** + * The leader must open resources for a ccr recovery. If there is no activity for this interval of time, + * the leader will close the restore session. + */ + public static final Setting INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING = + Setting.timeSetting("ccr.indices.recovery.recovery_activity_timeout", TimeValue.timeValueSeconds(60), + Setting.Property.Dynamic, Setting.Property.NodeScope); + /** * The settings defined by CCR. * @@ -53,22 +61,33 @@ static List> getSettings() { XPackSettings.CCR_ENABLED_SETTING, CCR_FOLLOWING_INDEX_SETTING, RECOVERY_MAX_BYTES_PER_SECOND, + INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT); } private final CombinedRateLimiter ccrRateLimiter; + private volatile TimeValue recoveryActivityTimeout; public CcrSettings(Settings settings, ClusterSettings clusterSettings) { + this.recoveryActivityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings); this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings)); clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec); + clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setRecoveryActivityTimeout); } private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) { ccrRateLimiter.setMBPerSec(maxBytesPerSec); } + private void setRecoveryActivityTimeout(TimeValue recoveryActivityTimeout) { + this.recoveryActivityTimeout = recoveryActivityTimeout; + } + public CombinedRateLimiter getRateLimiter() { return ccrRateLimiter; } + public TimeValue getRecoveryActivityTimeout() { + return recoveryActivityTimeout; + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java index 785600dd5f8fc..1c7f9f95adbbe 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.KeyedLock; @@ -28,6 +29,9 @@ import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ccr.CcrSettings; import java.io.Closeable; import java.io.IOException; @@ -45,8 +49,14 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen private final Map onGoingRestores = ConcurrentCollections.newConcurrentMap(); private final Map> sessionsForShard = new HashMap<>(); - private final CopyOnWriteArrayList> openSessionListeners = new CopyOnWriteArrayList<>(); private final CopyOnWriteArrayList> closeSessionListeners = new CopyOnWriteArrayList<>(); + private final ThreadPool threadPool; + private final CcrSettings ccrSettings; + + public CcrRestoreSourceService(ThreadPool threadPool, CcrSettings ccrSettings) { + this.threadPool = threadPool; + this.ccrSettings = ccrSettings; + } @Override public synchronized void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { @@ -81,26 +91,10 @@ protected synchronized void doClose() throws IOException { // TODO: The listeners are for testing. Once end-to-end file restore is implemented and can be tested, // these should be removed. - public void addOpenSessionListener(Consumer listener) { - openSessionListeners.add(listener); - } - public void addCloseSessionListener(Consumer listener) { closeSessionListeners.add(listener); } - // default visibility for testing - synchronized HashSet getSessionsForShard(IndexShard indexShard) { - return sessionsForShard.get(indexShard); - } - - // default visibility for testing - synchronized RestoreSession getOngoingRestore(String sessionUUID) { - return onGoingRestores.get(sessionUUID); - } - - // TODO: Add a local timeout for the session. This timeout might might be for the entire session to be - // complete. Or it could be for session to have been touched. public synchronized Store.MetadataSnapshot openSession(String sessionUUID, IndexShard indexShard) throws IOException { boolean success = false; RestoreSession restore = null; @@ -113,9 +107,8 @@ public synchronized Store.MetadataSnapshot openSession(String sessionUUID, Index if (indexShard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(indexShard.shardId(), "cannot open ccr restore session if shard closed"); } - restore = new RestoreSession(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit()); + restore = new RestoreSession(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit(), scheduleTimeout(sessionUUID)); onGoingRestores.put(sessionUUID, restore); - openSessionListeners.forEach(c -> c.accept(sessionUUID)); HashSet sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>()); sessions.add(sessionUUID); } @@ -133,34 +126,60 @@ public synchronized Store.MetadataSnapshot openSession(String sessionUUID, Index } public void closeSession(String sessionUUID) { + internalCloseSession(sessionUUID, true); + } + + public synchronized SessionReader getSessionReader(String sessionUUID) { + RestoreSession restore = onGoingRestores.get(sessionUUID); + if (restore == null) { + logger.debug("could not get session [{}] because session not found", sessionUUID); + throw new IllegalArgumentException("session [" + sessionUUID + "] not found"); + } + restore.idle = false; + return new SessionReader(restore); + } + + private void internalCloseSession(String sessionUUID, boolean throwIfSessionMissing) { final RestoreSession restore; synchronized (this) { - closeSessionListeners.forEach(c -> c.accept(sessionUUID)); restore = onGoingRestores.remove(sessionUUID); if (restore == null) { - logger.debug("could not close session [{}] because session not found", sessionUUID); - throw new IllegalArgumentException("session [" + sessionUUID + "] not found"); + if (throwIfSessionMissing) { + logger.debug("could not close session [{}] because session not found", sessionUUID); + throw new IllegalArgumentException("session [" + sessionUUID + "] not found"); + } else { + return; + } } HashSet sessions = sessionsForShard.get(restore.indexShard); assert sessions != null : "No session UUIDs for shard even though one [" + sessionUUID + "] is active in ongoing restores"; if (sessions != null) { boolean removed = sessions.remove(sessionUUID); - assert removed : "No session found for UUID [" + sessionUUID +"]"; + assert removed : "No session found for UUID [" + sessionUUID + "]"; if (sessions.isEmpty()) { sessionsForShard.remove(restore.indexShard); } } } + closeSessionListeners.forEach(c -> c.accept(sessionUUID)); restore.decRef(); + } - public synchronized SessionReader getSessionReader(String sessionUUID) { - RestoreSession restore = onGoingRestores.get(sessionUUID); - if (restore == null) { - logger.debug("could not get session [{}] because session not found", sessionUUID); - throw new IllegalArgumentException("session [" + sessionUUID + "] not found"); + private Scheduler.Cancellable scheduleTimeout(String sessionUUID) { + TimeValue idleTimeout = ccrSettings.getRecoveryActivityTimeout(); + return threadPool.scheduleWithFixedDelay(() -> maybeTimeout(sessionUUID), idleTimeout, ThreadPool.Names.GENERIC); + } + + private void maybeTimeout(String sessionUUID) { + RestoreSession restoreSession = onGoingRestores.get(sessionUUID); + if (restoreSession != null) { + if (restoreSession.idle) { + internalCloseSession(sessionUUID, false); + } else { + restoreSession.idle = true; + } } - return new SessionReader(restore); } private static class RestoreSession extends AbstractRefCounted { @@ -168,14 +187,18 @@ private static class RestoreSession extends AbstractRefCounted { private final String sessionUUID; private final IndexShard indexShard; private final Engine.IndexCommitRef commitRef; + private final Scheduler.Cancellable timeoutTask; private final KeyedLock keyedLock = new KeyedLock<>(); private final Map cachedInputs = new ConcurrentHashMap<>(); + private volatile boolean idle = false; - private RestoreSession(String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef) { + private RestoreSession(String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef, + Scheduler.Cancellable timeoutTask) { super("restore-session"); this.sessionUUID = sessionUUID; this.indexShard = indexShard; this.commitRef = commitRef; + this.timeoutTask = timeoutTask; } private Store.MetadataSnapshot getMetaData() throws IOException { @@ -223,6 +246,7 @@ private long readFileBytes(String fileName, BytesReference reference) throws IOE protected void closeInternal() { logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard.shardId()); assert keyedLock.hasLockedKeys() == false : "Should not hold any file locks when closing"; + timeoutTask.cancel(); IOUtils.closeWhileHandlingException(cachedInputs.values()); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index c0b7863edf25a..5f352788d9597 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -8,28 +8,41 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.xpack.ccr.CcrSettings; import org.junit.Before; import java.io.IOException; import java.util.ArrayList; -import java.util.HashSet; +import java.util.Set; + +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; public class CcrRestoreSourceServiceTests extends IndexShardTestCase { private CcrRestoreSourceService restoreSourceService; + private DeterministicTaskQueue taskQueue; @Before public void setUp() throws Exception { super.setUp(); - restoreSourceService = new CcrRestoreSourceService(); + Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(); + taskQueue = new DeterministicTaskQueue(settings, random()); + Set> registeredSettings = Sets.newHashSet(CcrSettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, + CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, registeredSettings); + restoreSourceService = new CcrRestoreSourceService(taskQueue.getThreadPool(), new CcrSettings(Settings.EMPTY, clusterSettings)); } public void testOpenSession() throws IOException { @@ -39,22 +52,21 @@ public void testOpenSession() throws IOException { final String sessionUUID2 = UUIDs.randomBase64UUID(); final String sessionUUID3 = UUIDs.randomBase64UUID(); - assertNull(restoreSourceService.getSessionsForShard(indexShard1)); + restoreSourceService.openSession(sessionUUID1, indexShard1); + restoreSourceService.openSession(sessionUUID2, indexShard1); + + try (CcrRestoreSourceService.SessionReader reader1 = restoreSourceService.getSessionReader(sessionUUID1); + CcrRestoreSourceService.SessionReader reader2 = restoreSourceService.getSessionReader(sessionUUID2)) { + // Would throw exception if missing + } - assertNotNull(restoreSourceService.openSession(sessionUUID1, indexShard1)); - HashSet sessionsForShard = restoreSourceService.getSessionsForShard(indexShard1); - assertEquals(1, sessionsForShard.size()); - assertTrue(sessionsForShard.contains(sessionUUID1)); - assertNotNull(restoreSourceService.openSession(sessionUUID2, indexShard1)); - sessionsForShard = restoreSourceService.getSessionsForShard(indexShard1); - assertEquals(2, sessionsForShard.size()); - assertTrue(sessionsForShard.contains(sessionUUID2)); + restoreSourceService.openSession(sessionUUID3, indexShard2); - assertNull(restoreSourceService.getSessionsForShard(indexShard2)); - assertNotNull(restoreSourceService.openSession(sessionUUID3, indexShard2)); - sessionsForShard = restoreSourceService.getSessionsForShard(indexShard2); - assertEquals(1, sessionsForShard.size()); - assertTrue(sessionsForShard.contains(sessionUUID3)); + try (CcrRestoreSourceService.SessionReader reader1 = restoreSourceService.getSessionReader(sessionUUID1); + CcrRestoreSourceService.SessionReader reader2 = restoreSourceService.getSessionReader(sessionUUID2); + CcrRestoreSourceService.SessionReader reader3 = restoreSourceService.getSessionReader(sessionUUID3)) { + // Would throw exception if missing + } restoreSourceService.closeSession(sessionUUID1); restoreSourceService.closeSession(sessionUUID2); @@ -68,7 +80,6 @@ public void testCannotOpenSessionForClosedShard() throws IOException { closeShards(indexShard); String sessionUUID = UUIDs.randomBase64UUID(); expectThrows(IllegalIndexShardStateException.class, () -> restoreSourceService.openSession(sessionUUID, indexShard)); - assertNull(restoreSourceService.getOngoingRestore(sessionUUID)); } public void testCloseSession() throws IOException { @@ -82,25 +93,26 @@ public void testCloseSession() throws IOException { restoreSourceService.openSession(sessionUUID2, indexShard1); restoreSourceService.openSession(sessionUUID3, indexShard2); - assertEquals(2, restoreSourceService.getSessionsForShard(indexShard1).size()); - assertEquals(1, restoreSourceService.getSessionsForShard(indexShard2).size()); - assertNotNull(restoreSourceService.getOngoingRestore(sessionUUID1)); - assertNotNull(restoreSourceService.getOngoingRestore(sessionUUID2)); - assertNotNull(restoreSourceService.getOngoingRestore(sessionUUID3)); + try (CcrRestoreSourceService.SessionReader reader1 = restoreSourceService.getSessionReader(sessionUUID1); + CcrRestoreSourceService.SessionReader reader2 = restoreSourceService.getSessionReader(sessionUUID2); + CcrRestoreSourceService.SessionReader reader3 = restoreSourceService.getSessionReader(sessionUUID3)) { + // Would throw exception if missing + } + + assertTrue(taskQueue.hasDeferredTasks()); restoreSourceService.closeSession(sessionUUID1); - assertEquals(1, restoreSourceService.getSessionsForShard(indexShard1).size()); - assertNull(restoreSourceService.getOngoingRestore(sessionUUID1)); - assertFalse(restoreSourceService.getSessionsForShard(indexShard1).contains(sessionUUID1)); - assertTrue(restoreSourceService.getSessionsForShard(indexShard1).contains(sessionUUID2)); + expectThrows(IllegalArgumentException.class, () -> restoreSourceService.getSessionReader(sessionUUID1)); restoreSourceService.closeSession(sessionUUID2); - assertNull(restoreSourceService.getSessionsForShard(indexShard1)); - assertNull(restoreSourceService.getOngoingRestore(sessionUUID2)); + expectThrows(IllegalArgumentException.class, () -> restoreSourceService.getSessionReader(sessionUUID2)); restoreSourceService.closeSession(sessionUUID3); - assertNull(restoreSourceService.getSessionsForShard(indexShard2)); - assertNull(restoreSourceService.getOngoingRestore(sessionUUID3)); + expectThrows(IllegalArgumentException.class, () -> restoreSourceService.getSessionReader(sessionUUID3)); + + taskQueue.runAllTasks(); + // The tasks will not be rescheduled as the sessions are closed. + assertFalse(taskQueue.hasDeferredTasks()); closeShards(indexShard1, indexShard2); } @@ -116,14 +128,20 @@ public void testCloseShardListenerFunctionality() throws IOException { restoreSourceService.openSession(sessionUUID2, indexShard1); restoreSourceService.openSession(sessionUUID3, indexShard2); - assertEquals(2, restoreSourceService.getSessionsForShard(indexShard1).size()); - assertEquals(1, restoreSourceService.getSessionsForShard(indexShard2).size()); + try (CcrRestoreSourceService.SessionReader reader1 = restoreSourceService.getSessionReader(sessionUUID1); + CcrRestoreSourceService.SessionReader reader2 = restoreSourceService.getSessionReader(sessionUUID2); + CcrRestoreSourceService.SessionReader reader3 = restoreSourceService.getSessionReader(sessionUUID3)) { + // Would throw exception if missing + } restoreSourceService.afterIndexShardClosed(indexShard1.shardId(), indexShard1, Settings.EMPTY); - assertNull(restoreSourceService.getSessionsForShard(indexShard1)); - assertNull(restoreSourceService.getOngoingRestore(sessionUUID1)); - assertNull(restoreSourceService.getOngoingRestore(sessionUUID2)); + expectThrows(IllegalArgumentException.class, () -> restoreSourceService.getSessionReader(sessionUUID1)); + expectThrows(IllegalArgumentException.class, () -> restoreSourceService.getSessionReader(sessionUUID2)); + + try (CcrRestoreSourceService.SessionReader reader3 = restoreSourceService.getSessionReader(sessionUUID3)) { + // Would throw exception if missing + } restoreSourceService.closeSession(sessionUUID3); closeShards(indexShard1, indexShard2); @@ -167,24 +185,59 @@ public void testGetSessionDoesNotLeakFileIfClosed() throws IOException { indexDoc(indexShard, "_doc", Integer.toString(i)); flushShard(indexShard, true); } - final String sessionUUID1 = UUIDs.randomBase64UUID(); + final String sessionUUID = UUIDs.randomBase64UUID(); - restoreSourceService.openSession(sessionUUID1, indexShard); + restoreSourceService.openSession(sessionUUID, indexShard); ArrayList files = new ArrayList<>(); indexShard.snapshotStoreMetadata().forEach(files::add); - try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID1)) { + try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) { sessionReader.readFileBytes(files.get(0).name(), new BytesArray(new byte[10])); } // Request a second file to ensure that original file is not leaked - try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID1)) { + try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) { sessionReader.readFileBytes(files.get(1).name(), new BytesArray(new byte[10])); } - restoreSourceService.closeSession(sessionUUID1); + restoreSourceService.closeSession(sessionUUID); closeShards(indexShard); // Exception will be thrown if file is not closed. } + + public void testSessionCanTimeout() throws Exception { + IndexShard indexShard = newStartedShard(true); + + final String sessionUUID = UUIDs.randomBase64UUID(); + + restoreSourceService.openSession(sessionUUID, indexShard); + + // Session starts as not idle. First task will mark it as idle + assertTrue(taskQueue.hasDeferredTasks()); + taskQueue.advanceTime(); + taskQueue.runAllRunnableTasks(); + // Task is still scheduled + assertTrue(taskQueue.hasDeferredTasks()); + + // Accessing session marks it as not-idle + try (CcrRestoreSourceService.SessionReader reader = restoreSourceService.getSessionReader(sessionUUID)) { + // Check session exists + } + + assertTrue(taskQueue.hasDeferredTasks()); + taskQueue.advanceTime(); + taskQueue.runAllRunnableTasks(); + // Task is still scheduled + assertTrue(taskQueue.hasDeferredTasks()); + + taskQueue.advanceTime(); + taskQueue.runAllRunnableTasks(); + // Task is cancelled when the session times out + assertFalse(taskQueue.hasDeferredTasks()); + + expectThrows(IllegalArgumentException.class, () -> restoreSourceService.getSessionReader(sessionUUID)); + + closeShards(indexShard); + } }