Skip to content

Commit

Permalink
Improvements that significantly reduce the chances of request timeouts
Browse files Browse the repository at this point in the history
Request Timeouts started plaguing IDE due to numerous
`executionContext/***Visualization` requests. While caused by a bug
they revealed a bigger problem in the Language Server when serving large
amounts of requests:
1) Long and short lived jobs are fighting for various locks. Lock
   contention leads to some jobs waiting for a long time until they can
   be executed. Increasing timeout wait is not a solution.
2) Requests coming from IDE are served almost instantly and handled by
   various commands. Commands can issue further jobs that serve the
   request. We apparently have and always had a single-thread thread
   pool for serving such jobs, leading to immediate thread starvation.

Both reasons increase the chances of Request Timeouts.
For 2) I noticed that while we used to set the
`enso-runtime-server.jobParallelism` option descriptor key to some
machine-dependent value (most likely > 1), the value set would only be
available for instrumentation. `JobExecutionEngine` where it is actually
used would always get the default, i.e. a single-threaded ThreadPool.
Option descriptors were simply misused, since the option was introduced.
Moved that option to runtime options so that it can be set and retrieved
during normal operation.

Adding parallelism intensified problem 1, because now we could execute
multiple jobs and they would compete for resources. It also revealed a
scenario for a yet another deadlock scenario, due to invalid order of
lock acquisition. See `ExecuteJob` vs `UpsertVisualisationJob` order for
details.

Still, a number of requests would continue to randomly timeout due to
lock contention. It became apparent that
`Attach/Modify/Detach-VisualisationCmd` should not wait until a
triggered `UpsertVisualisationJob` sends a response to the client; long
and short lived jobs will always compete for resources and we cannot
guarantee that they will not timeout that way. So a response is sent
quicker from the command handler.

This brings another problematic scenario:
1. `AttachVisualisationCmd` is executed, response sent to the client,
   `UpsertVisualisationJob` scheduled
2. In the meantime `ModifyVisualisationCmd` comes and fails because it
   cannot find the visualization that will only be added by
   `UpsertVisualisationJob` that hasn't yet been scheduled to run.

Remedied that by checking visualisation-related jobs that are still
in progress. It also allowed for cancelling jobs which results wouldn't
be used anyway (`ModifyVisualisationCmd` sends its own
`UpsertVisualisationJob`). This is not a theoretical scenario, it
happened frequently due to IDE requests.

This change does not fully solve the rather problematic setup of
numerous locks, which are requested by short and long lived jobs. A
better design should still be investigated. But it significantly reduces
the chances of Request Timeouts which IDE had to deal with.

I haven't been able to experience Request Timeouts for relatively modest
projects anymore.

I added the possibility of logging wait times for locks to better
investigate further problems.
  • Loading branch information
hubertp committed Jun 15, 2023
1 parent 372bc8f commit a5fbd17
Show file tree
Hide file tree
Showing 19 changed files with 200 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ class MainModule(serverConfig: LanguageServerConfig, logLevel: LogLevel) {
.option(RuntimeOptions.LOG_MASKING, Masking.isMaskingEnabled.toString)
.option(RuntimeOptions.EDITION_OVERRIDE, Info.currentEdition)
.option(
RuntimeServerInfo.JOB_PARALLELISM_OPTION,
RuntimeOptions.JOB_PARALLELISM,
Runtime.getRuntime.availableProcessors().toString
)
.option(RuntimeOptions.PREINITIALIZE, "js")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public class RuntimeOptions {
INTERPRETER_SEQUENTIAL_COMMAND_EXECUTION)
.build();

public static final String JOB_PARALLELISM = interpreterOptionName("jobParallelism");
public static final OptionKey<Integer> JOB_PARALLELISM_KEY = new OptionKey<>(1);
public static final OptionDescriptor JOB_PARALLELISM_DESCRIPTOR =
OptionDescriptor.newBuilder(JOB_PARALLELISM_KEY, JOB_PARALLELISM).build();

public static final String ENABLE_PROJECT_SUGGESTIONS = optionName("enableProjectSuggestions");
public static final OptionKey<Boolean> ENABLE_PROJECT_SUGGESTIONS_KEY = new OptionKey<>(true);
private static final OptionDescriptor ENABLE_PROJECT_SUGGESTIONS_DESCRIPTOR =
Expand Down Expand Up @@ -128,6 +133,7 @@ public class RuntimeOptions {
LANGUAGE_HOME_OVERRIDE_DESCRIPTOR,
EDITION_OVERRIDE_DESCRIPTOR,
INTERPRETER_SEQUENTIAL_COMMAND_EXECUTION_DESCRIPTOR,
JOB_PARALLELISM_DESCRIPTOR,
DISABLE_IR_CACHES_DESCRIPTOR,
PREINITIALIZE_DESCRIPTOR,
WAIT_FOR_PENDING_SERIALIZATION_JOBS_DESCRIPTOR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,5 @@ public class RuntimeServerInfo {
public static final String URI = "enso://runtime-server";
public static final String INSTRUMENT_NAME = "enso-runtime-server";
public static final String ENABLE_OPTION = INSTRUMENT_NAME + ".enable";

public static final String JOB_PARALLELISM_OPTION = INSTRUMENT_NAME + ".jobParallelism";
public static final OptionKey<Integer> JOB_PARALLELISM_KEY = new OptionKey<>(1);
public static final OptionDescriptor JOB_PARALLELISM_DESCRIPTOR =
OptionDescriptor.newBuilder(JOB_PARALLELISM_KEY, JOB_PARALLELISM_OPTION).build();
public static final OptionKey<String> ENABLE_OPTION_KEY = new OptionKey<>("");
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.enso.interpreter.instrument.command;

import java.util.UUID;
import java.util.logging.Level;

import org.enso.interpreter.instrument.CacheInvalidation;
import org.enso.interpreter.instrument.InstrumentFrame;
import org.enso.interpreter.instrument.execution.RuntimeContext;
Expand Down Expand Up @@ -41,8 +43,10 @@ public Future<BoxedUnit> execute(RuntimeContext ctx, ExecutionContext ec) {

private void setExecutionEnvironment(
Runtime$Api$ExecutionEnvironment executionEnvironment, UUID contextId, RuntimeContext ctx) {
ctx.locking().acquireContextLock(contextId);
var logger = ctx.executionService().getLogger();
var contextLockTimestamp = ctx.locking().acquireContextLock(contextId);
ctx.locking().acquireWriteCompilationLock();

try {
Stack<InstrumentFrame> stack = ctx.contextManager().getStack(contextId);
ctx.jobControlPlane().abortJobs(contextId);
Expand All @@ -55,6 +59,11 @@ private void setExecutionEnvironment(
} finally {
ctx.locking().releaseWriteCompilationLock();
ctx.locking().releaseContextLock(contextId);
logger.log(
Level.FINEST,
"Kept context lock [UpsertVisualisationJob] for "
+ (System.currentTimeMillis() - contextLockTimestamp)
+ " milliseconds");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package org.enso.interpreter.instrument.command
import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.interpreter.instrument.job.{ExecuteJob, UpsertVisualisationJob}
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.polyglot.runtime.Runtime.Api.RequestId

import java.util.logging.Level
import scala.concurrent.{ExecutionContext, Future}

/** A command that attaches a visualisation to an expression.
Expand All @@ -13,7 +13,7 @@ import scala.concurrent.{ExecutionContext, Future}
* @param request a request for a service
*/
class AttachVisualisationCmd(
maybeRequestId: Option[RequestId],
maybeRequestId: Option[Api.RequestId],
request: Api.AttachVisualisation
) extends Command(maybeRequestId) {

Expand All @@ -22,8 +22,9 @@ class AttachVisualisationCmd(
ctx: RuntimeContext,
ec: ExecutionContext
): Future[Unit] = {
val contextId = request.visualisationConfig.executionContextId
ctx.locking.acquireContextLock(contextId)
val logger = ctx.executionService.getLogger
val contextId = request.visualisationConfig.executionContextId
val lockTimestamp = ctx.locking.acquireContextLock(contextId)
try {
if (doesContextExist) {
attachVisualisation()
Expand All @@ -32,6 +33,10 @@ class AttachVisualisationCmd(
}
} finally {
ctx.locking.releaseContextLock(contextId)
logger.log(
Level.FINEST,
s"Kept context lock [AttachVisualisationCmd] for ${System.currentTimeMillis() - lockTimestamp} milliseconds"
)
}
}

Expand All @@ -45,11 +50,13 @@ class AttachVisualisationCmd(
ctx: RuntimeContext,
ec: ExecutionContext
): Future[Unit] = {
ctx.endpoint.sendToClient(
Api.Response(maybeRequestId, Api.VisualisationAttached())
)
val maybeFutureExecutable =
ctx.jobProcessor.run(
new UpsertVisualisationJob(
maybeRequestId,
Api.VisualisationAttached(),
request.visualisationId,
request.expressionId,
request.visualisationConfig
Expand All @@ -73,4 +80,8 @@ class AttachVisualisationCmd(
}
}

override def toString: String = {
"AttachVisualizationCmd(visualizationId: " + request.visualisationId + ",expressionId=" + request.expressionId + ")"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.polyglot.runtime.Runtime.Api.RequestId

import java.util.logging.Level
import scala.concurrent.{ExecutionContext, Future}

/** A command that destroys the specified execution context.
Expand Down Expand Up @@ -34,13 +35,18 @@ class DestroyContextCmd(
}

private def removeContext()(implicit ctx: RuntimeContext): Unit = {
val logger = ctx.executionService.getLogger
ctx.jobControlPlane.abortJobs(request.contextId)
ctx.locking.acquireContextLock(request.contextId)
val lockTimestamp = ctx.locking.acquireContextLock(request.contextId)
try {
ctx.contextManager.destroy(request.contextId)
reply(Api.DestroyContextResponse(request.contextId))
} finally {
ctx.locking.releaseContextLock(request.contextId)
logger.log(
Level.FINEST,
s"Kept context lock [DestroyContextCmd] for ${System.currentTimeMillis() - lockTimestamp} milliseconds"
)
ctx.locking.removeContextLock(request.contextId)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.enso.interpreter.instrument.job.DetachVisualisationJob
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.polyglot.runtime.Runtime.Api.RequestId

import java.util.logging.Level
import scala.concurrent.{ExecutionContext, Future}

/** A command that detaches a visualisation from the expression.
Expand All @@ -22,7 +23,8 @@ class DetachVisualisationCmd(
ctx: RuntimeContext,
ec: ExecutionContext
): Future[Unit] = {
ctx.locking.acquireContextLock(request.contextId)
val logger = ctx.executionService.getLogger
val lockTimestamp = ctx.locking.acquireContextLock(request.contextId)
try {
if (doesContextExist) {
detachVisualization()
Expand All @@ -31,6 +33,10 @@ class DetachVisualisationCmd(
}
} finally {
ctx.locking.releaseContextLock(request.contextId)
logger.log(
Level.FINEST,
s"Kept context lock [DetachVisualisationCmd] for ${System.currentTimeMillis() - lockTimestamp} milliseconds"
)
}
}

Expand All @@ -40,16 +46,18 @@ class DetachVisualisationCmd(

private def detachVisualization()(implicit
ctx: RuntimeContext
): Future[Unit] =
): Future[Unit] = {
ctx.endpoint.sendToClient(
Api.Response(maybeRequestId, Api.VisualisationDetached())
)
ctx.jobProcessor.run(
new DetachVisualisationJob(
maybeRequestId,
request.visualisationId,
request.expressionId,
request.contextId,
Api.VisualisationDetached()
request.contextId
)
)
}

private def replyWithContextNotExistError()(implicit
ctx: RuntimeContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.interpreter.instrument.job.{
EnsureCompiledJob,
ExecuteJob,
Job,
UpsertVisualisationJob
}
import org.enso.polyglot.runtime.Runtime.Api
import org.enso.polyglot.runtime.Runtime.Api.RequestId
import org.enso.polyglot.runtime.Runtime.Api.ExpressionId

import java.util.logging.Level
import scala.concurrent.{ExecutionContext, Future}

/** A command that modifies a visualisation.
Expand All @@ -17,7 +19,7 @@ import scala.concurrent.{ExecutionContext, Future}
* @param request a request for a service
*/
class ModifyVisualisationCmd(
maybeRequestId: Option[RequestId],
maybeRequestId: Option[Api.RequestId],
request: Api.ModifyVisualisation
) extends Command(maybeRequestId) {

Expand All @@ -26,8 +28,9 @@ class ModifyVisualisationCmd(
ctx: RuntimeContext,
ec: ExecutionContext
): Future[Unit] = {
val contextId = request.visualisationConfig.executionContextId
ctx.locking.acquireContextLock(contextId)
val logger = ctx.executionService.getLogger
val contextId = request.visualisationConfig.executionContextId
val lockTimestamp = ctx.locking.acquireContextLock(contextId)
try {
if (doesContextExist) {
modifyVisualisation()
Expand All @@ -36,33 +39,50 @@ class ModifyVisualisationCmd(
}
} finally {
ctx.locking.releaseContextLock(contextId)
logger.log(
Level.FINEST,
s"Kept context lock [UpsertVisualisationJob] for ${System.currentTimeMillis() - lockTimestamp} milliseconds"
)
}
}

private def modifyVisualisation()(implicit
ctx: RuntimeContext,
ec: ExecutionContext
): Future[Unit] = {
val maybeVisualisation = ctx.contextManager.getVisualisationById(
val exisitingVisualisation = ctx.contextManager.getVisualisationById(
request.visualisationConfig.executionContextId,
request.visualisationId
)
maybeVisualisation match {
val visualisationPresent: Option[ExpressionId] =
exisitingVisualisation.map(_.expressionId).orElse {
val jobFilter = (job: Job[_]) =>
job match {
case upsert: UpsertVisualisationJob
if upsert.getVisualizationId() == request.visualisationId =>
Some(upsert.key)
case _ => None
}
ctx.jobControlPlane.jobInProgress(jobFilter)
}
visualisationPresent match {
case None =>
Future {
ctx.endpoint.sendToClient(
Api.Response(maybeRequestId, Api.VisualisationNotFound())
)
}

case Some(visualisation) =>
case Some(expressionId) =>
ctx.endpoint.sendToClient(
Api.Response(maybeRequestId, Api.VisualisationModified())
)
val maybeFutureExecutable =
ctx.jobProcessor.run(
new UpsertVisualisationJob(
maybeRequestId,
Api.VisualisationModified(),
request.visualisationId,
visualisation.expressionId,
expressionId,
request.visualisationConfig
)
)
Expand Down Expand Up @@ -98,4 +118,8 @@ class ModifyVisualisationCmd(
}
}

override def toString: String = {
"ModifyVisualisationCmd(visualizationId: " + request.visualisationId + ")"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class CommandExecutionEngine(interpreterContext: InterpreterContext)
.get(RuntimeOptions.INTERPRETER_SEQUENTIAL_COMMAND_EXECUTION_KEY)
.booleanValue()

private val locking = new ReentrantLocking
private val locking = new ReentrantLocking(
interpreterContext.executionService.getLogger
)

private val executionState = new ExecutionState()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,8 @@ trait JobControlPlane {
* already running.
*/
def startBackgroundJobs(): Boolean

/** Finds the first in-progress job satisfying the `filter` condition
*/
def jobInProgress[T](filter: Job[_] => Option[T]): Option[T]
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,11 @@ final class JobExecutionEngine(
delayedBackgroundJobsQueue.forEach(job => runBackground(job))
delayedBackgroundJobsQueue.clear()
}

override def jobInProgress[T](filter: Job[_] => Option[T]): Option[T] = {
val allJobs = runningJobsRef.get()
allJobs
.find(runningJob => filter(runningJob.job).nonEmpty)
.flatMap(runningJob => filter(runningJob.job))
}
}
Loading

0 comments on commit a5fbd17

Please sign in to comment.