From 2c134ecb731b92fb85e15266261880c9081100da Mon Sep 17 00:00:00 2001 From: lmcnatt <85642387+lucymcnatt@users.noreply.github.com> Date: Thu, 23 Jan 2025 15:54:01 -0500 Subject: [PATCH 1/2] [AN-333] Prevent infinite DRS download retries (#7679) --- .../downloaders/GcsUriDownloader.scala | 50 +++++++++---------- .../downloaders/GcsUriDownloaderSpec.scala | 24 +++++++++ 2 files changed, 47 insertions(+), 27 deletions(-) diff --git a/cromwell-drs-localizer/src/main/scala/drs/localizer/downloaders/GcsUriDownloader.scala b/cromwell-drs-localizer/src/main/scala/drs/localizer/downloaders/GcsUriDownloader.scala index 74f5bc64621..04856f069c4 100644 --- a/cromwell-drs-localizer/src/main/scala/drs/localizer/downloaders/GcsUriDownloader.scala +++ b/cromwell-drs-localizer/src/main/scala/drs/localizer/downloaders/GcsUriDownloader.scala @@ -57,33 +57,29 @@ case class GcsUriDownloader(gcsUrl: String, downloadAttempt: Int = 0 ): IO[DownloadResult] = { - def maybeRetryForDownloadFailure(t: Throwable): IO[DownloadResult] = - if (downloadAttempt < downloadRetries) { - backoff foreach { b => Thread.sleep(b.backoffMillis) } - logger.warn(s"Attempting download retry $downloadAttempt of $downloadRetries for a GCS url", t) - downloadWithRetries(downloadRetries, - backoff map { - _.next - }, - downloadAttempt + 1 - ) - } else { - IO.raiseError(new RuntimeException(s"Exhausted $downloadRetries resolution retries to download GCS file", t)) - } - - runDownloadCommand.redeemWith( - recover = maybeRetryForDownloadFailure, - bind = { - case s: DownloadSuccess.type => - IO.pure(s) - case _: RecognizedRetryableDownloadFailure => - downloadWithRetries(downloadRetries, backoff, downloadAttempt + 1) - case _: UnrecognizedRetryableDownloadFailure => - downloadWithRetries(downloadRetries, backoff, downloadAttempt + 1) - case _ => - downloadWithRetries(downloadRetries, backoff, downloadAttempt + 1) - } - ) + // Necessary function to handle the throwable when trying to recover a failed download + def handleDownloadFailure(t: Throwable): IO[DownloadResult] = + downloadWithRetries(downloadRetries, backoff, downloadAttempt + 1) + + if (downloadAttempt < downloadRetries) { + backoff foreach { b => Thread.sleep(b.backoffMillis) } + logger.info(s"Attempting download attempt $downloadAttempt of $downloadRetries for a GCS url") + runDownloadCommand.redeemWith( + recover = handleDownloadFailure, + bind = { + case s: DownloadSuccess.type => + IO.pure(s) + case _: RecognizedRetryableDownloadFailure => + downloadWithRetries(downloadRetries, backoff, downloadAttempt + 1) + case _: UnrecognizedRetryableDownloadFailure => + downloadWithRetries(downloadRetries, backoff, downloadAttempt + 1) + case _ => + downloadWithRetries(downloadRetries, backoff, downloadAttempt + 1) + } + ) + } else { + IO.raiseError(new RuntimeException(s"Exhausted $downloadRetries resolution retries to download GCS file")) + } } /** diff --git a/cromwell-drs-localizer/src/test/scala/drs/localizer/downloaders/GcsUriDownloaderSpec.scala b/cromwell-drs-localizer/src/test/scala/drs/localizer/downloaders/GcsUriDownloaderSpec.scala index 07eb5ede181..3428ea50a62 100644 --- a/cromwell-drs-localizer/src/test/scala/drs/localizer/downloaders/GcsUriDownloaderSpec.scala +++ b/cromwell-drs-localizer/src/test/scala/drs/localizer/downloaders/GcsUriDownloaderSpec.scala @@ -1,6 +1,7 @@ package drs.localizer.downloaders import common.assertion.CromwellTimeoutSpec +import org.mockito.Mockito.{spy, times, verify} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -96,4 +97,27 @@ class GcsUriDownloaderSpec extends AnyFlatSpec with CromwellTimeoutSpec with Mat downloader.generateDownloadScript(gcsUrl, Option(fakeSAJsonPath)) shouldBe expectedDownloadScript } + + it should "fail to download GCS URL after 5 attempts" in { + val gcsUrl = "gs://foo/bar.bam" + val downloader = spy( + new GcsUriDownloader( + gcsUrl = gcsUrl, + downloadLoc = fakeDownloadLocation, + requesterPaysProjectIdOption = Option(fakeRequesterPaysId), + serviceAccountJson = None + ) + ) + + val result = downloader.downloadWithRetries(5, None).attempt.unsafeRunSync() + + result.isLeft shouldBe true + // attempts to download the 1st time and the 5th time, but doesn't attempt a 6th + verify(downloader, times(1)).downloadWithRetries(5, None, 1) + verify(downloader, times(1)).downloadWithRetries(5, None, 5) + verify(downloader, times(0)).downloadWithRetries(5, None, 6) + // attempts the actual download command 5 times + verify(downloader, times(5)).runDownloadCommand + + } } From 5a7f7f8700c1982ca3e1b93ae061a57d98f65a39 Mon Sep 17 00:00:00 2001 From: LizBaldo Date: Tue, 4 Feb 2025 16:21:03 -0500 Subject: [PATCH 2/2] [AN-393] Add waiting for quota to quota messages (#7686) Co-authored-by: Adam Nichols --- .../cromwell/backend/async/KnownJobFailureException.scala | 4 +++- .../main/resources/standardTestCases/quota_fail_retry.test | 1 + .../backend/google/pipelines/common/errors/package.scala | 3 ++- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/backend/src/main/scala/cromwell/backend/async/KnownJobFailureException.scala b/backend/src/main/scala/cromwell/backend/async/KnownJobFailureException.scala index e106f06fcd2..23ad85e957d 100644 --- a/backend/src/main/scala/cromwell/backend/async/KnownJobFailureException.scala +++ b/backend/src/main/scala/cromwell/backend/async/KnownJobFailureException.scala @@ -5,7 +5,9 @@ import common.exception.ThrowableAggregation import cromwell.core.path.Path import wom.expression.{NoIoFunctionSet, WomExpression} -abstract class KnownJobFailureException extends Exception { +import scala.util.control.NoStackTrace + +abstract class KnownJobFailureException extends Exception with NoStackTrace { def stderrPath: Option[Path] } diff --git a/centaur/src/main/resources/standardTestCases/quota_fail_retry.test b/centaur/src/main/resources/standardTestCases/quota_fail_retry.test index db6d29e1d8f..7203a9958ae 100644 --- a/centaur/src/main/resources/standardTestCases/quota_fail_retry.test +++ b/centaur/src/main/resources/standardTestCases/quota_fail_retry.test @@ -1,3 +1,4 @@ +ignore: true # GCP seems to have fixed the quota handling bug, which makes this test unnecessary name: quota_fail_retry testFormat: workflowfailure # In PAPI v2 there seems to be a quota exhaustion message in a reasonably timely manner, while in GCP Batch the job diff --git a/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/errors/package.scala b/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/errors/package.scala index ade5f84e00f..2a90f588803 100644 --- a/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/errors/package.scala +++ b/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/errors/package.scala @@ -7,7 +7,8 @@ package object errors { "usage too high", "no available zones", "resource_exhausted", - "quota too low" + "quota too low", + "waiting for quota" ) def isQuotaMessage(msg: String): Boolean =