Skip to content

Commit

Permalink
ledger-api-health: Use the Scala health status values everywhere. (#1…
Browse files Browse the repository at this point in the history
…0640)

The methods are intended to be used by Java code, where Scala
`case object` values can be unidiomatic.

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
SamirTalwar authored Aug 20, 2021
1 parent 5b837ec commit 733590d
Show file tree
Hide file tree
Showing 12 changed files with 43 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import com.codahale.metrics.MetricRegistry
import com.daml.dec.DirectExecutionContext
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.configuration.LedgerId
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.kvutils.api.{
Expand Down Expand Up @@ -64,7 +64,7 @@ object Main {
dataSource
}

override def currentHealth(): HealthStatus = HealthStatus.healthy
override def currentHealth(): HealthStatus = Healthy

override def ledgerId(): LedgerId = IndexerBenchmark.LedgerId
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.codahale.metrics.{MetricRegistry, Snapshot}
import com.daml.dec.DirectExecutionContext
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.configuration.{Configuration, LedgerInitialConditions, LedgerTimeModel}
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.v1.{ReadService, Update}
Expand Down Expand Up @@ -198,7 +198,7 @@ class IndexerBenchmark() {
assert(beginAfter.isEmpty, s"beginAfter is $beginAfter")
Source.fromIterator(() => updates)
}
override def currentHealth(): HealthStatus = HealthStatus.healthy
override def currentHealth(): HealthStatus = Healthy
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ sealed abstract class HealthStatus extends Product with Serializable {
}
}

case object Healthy extends HealthStatus

case object Unhealthy extends HealthStatus

// These methods are intended to be used by Java code, where the Scala values can be unidiomatic.
// You are encouraged to use the Scala values above when writing Scala code.
object HealthStatus {
val healthy: HealthStatus = Healthy

val unhealthy: HealthStatus = Unhealthy
}

case object Healthy extends HealthStatus

case object Unhealthy extends HealthStatus
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicReference

import akka.actor.Scheduler
import akka.pattern.after
import com.daml.ledger.api.health.{HealthStatus, ReportsHealth}
import com.daml.ledger.api.health.{HealthStatus, Healthy, ReportsHealth, Unhealthy}
import com.daml.ledger.resources.{Resource, ResourceContext}
import com.daml.logging.{ContextualizedLogger, LoggingContext}

Expand Down Expand Up @@ -47,7 +47,7 @@ private[indexer] final class RecoveringIndexer(

val firstSubscription = subscribe().map(handle => {
logger.info("Started Indexer Server")
updateHealthStatus(HealthStatus.healthy)
updateHealthStatus(Healthy)
handle
})
subscription.set(firstSubscription)
Expand Down Expand Up @@ -85,7 +85,7 @@ private[indexer] final class RecoveringIndexer(
if (subscription.compareAndSet(oldSubscription, newSubscription)) {
resubscribeOnFailure(newSubscription)
newSubscription.asFuture.map { _ =>
updateHealthStatus(HealthStatus.healthy)
updateHealthStatus(Healthy)
logger.info("Restarted Indexer Server")
}
} else { // we must have stopped the server during the restart
Expand Down Expand Up @@ -143,14 +143,14 @@ private[indexer] final class RecoveringIndexer(
.release()
.flatMap(_ => complete.future)
.map(_ => {
updateHealthStatus(HealthStatus.unhealthy)
updateHealthStatus(Unhealthy)
logger.info("Stopped Indexer Server")
})
})
}

private def reportErrorState(errorMessage: String, exception: Throwable): Unit = {
updateHealthStatus(HealthStatus.unhealthy)
updateHealthStatus(Unhealthy)
logger.error(errorMessage, exception)
}
}
Expand All @@ -159,7 +159,7 @@ private[indexer] object RecoveringIndexer {
def apply(scheduler: Scheduler, executionContext: ExecutionContext, restartDelay: FiniteDuration)(
implicit loggingContext: LoggingContext
): RecoveringIndexer = {
val healthStatusRef = new AtomicReference[HealthStatus](HealthStatus.unhealthy)
val healthStatusRef = new AtomicReference[HealthStatus](Unhealthy)

val healthReporter: ReportsHealth = () => healthStatusRef.get()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package com.daml.platform.indexer

import akka.stream.Materializer
import com.daml.ledger.api.health.{HealthStatus, ReportsHealth}
import com.daml.ledger.api.health.{Healthy, ReportsHealth}
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
Expand Down Expand Up @@ -74,7 +74,7 @@ final class StandaloneIndexerServer(
.fromFuture(indexerFactory.validateAndWaitOnly())
.map[ReportsHealth] { _ =>
logger.debug("Waiting for the indexer to validate the schema migrations.")
() => HealthStatus.healthy
() => Healthy
}
case IndexerStartupMode.MigrateOnEmptySchemaAndStart =>
Resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.util.concurrent.{Executors, TimeUnit}
import akka.NotUsed
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{KillSwitch, KillSwitches, Materializer, UniqueKillSwitch}
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.health.Healthy
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
Expand All @@ -23,10 +23,10 @@ import com.daml.platform.indexer.parallel.AsyncSupport._
import com.daml.platform.indexer.{IndexFeedHandle, Indexer}
import com.daml.platform.store.appendonlydao.DbDispatcher
import com.daml.platform.store.appendonlydao.events.{CompressionStrategy, LfValueTranslation}
import com.daml.platform.store.{EventSequentialId, backend}
import com.daml.platform.store.backend.DataSourceStorageBackend.DataSourceConfig
import com.daml.platform.store.backend.ParameterStorageBackend.LedgerEnd
import com.daml.platform.store.backend.{DbDto, StorageBackend}
import com.daml.platform.store.{EventSequentialId, backend}
import com.google.common.util.concurrent.ThreadFactoryBuilder

import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -132,7 +132,7 @@ object ParallelIndexerFactory {
.keepAlive( // TODO ha: remove as stable. This keepAlive approach was introduced for safety with async commit. This is still needed until HA is mandatory for Postgres to ensure safety with async commit. This will not needed anymore if HA is enabled by default, since the Ha mutual exclusion implementation with advisory locks makes impossible to let a db-shutdown go undetected.
keepAliveMaxIdleDuration,
() =>
if (dbDispatcher.currentHealth() == HealthStatus.healthy) {
if (dbDispatcher.currentHealth() == Healthy) {
logger.debug("Indexer keep-alive: database connectivity OK")
} else {
logger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import java.util.concurrent.atomic.AtomicReference
import akka.actor.ActorSystem
import akka.pattern.after
import ch.qos.logback.classic.Level
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.health.HealthStatus.{healthy, unhealthy}
import com.daml.ledger.api.health.{HealthStatus, Healthy, Unhealthy}
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner, TestResourceContext}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.indexer.RecoveringIndexerSpec._
Expand Down Expand Up @@ -75,7 +74,7 @@ final class RecoveringIndexerSpec
)
testIndexer.openSubscriptions shouldBe mutable.Set.empty
// When the indexer is shutdown, its status should be unhealthy/not serving
healthReporter.currentHealth() shouldBe unhealthy
healthReporter.currentHealth() shouldBe Unhealthy
}
}

Expand Down Expand Up @@ -110,7 +109,7 @@ final class RecoveringIndexerSpec
)
testIndexer.openSubscriptions shouldBe mutable.Set.empty
// When the indexer is shutdown, its status should be unhealthy/not serving
healthReporter.currentHealth() shouldBe HealthStatus.unhealthy
healthReporter.currentHealth() shouldBe Unhealthy
}
}

Expand Down Expand Up @@ -141,7 +140,7 @@ final class RecoveringIndexerSpec
Level.INFO -> "Stopped Indexer Server",
)
testIndexer.openSubscriptions shouldBe mutable.Set.empty
healthReporter.currentHealth() shouldBe HealthStatus.unhealthy
healthReporter.currentHealth() shouldBe Unhealthy
}
}

Expand Down Expand Up @@ -236,14 +235,14 @@ final class RecoveringIndexerSpec
testIndexer.openSubscriptions shouldBe mutable.Set.empty

healthStatusLogCapture should contain theSameElementsInOrderAs Seq(
unhealthy,
healthy,
unhealthy,
healthy,
unhealthy,
Unhealthy,
Healthy,
Unhealthy,
Healthy,
Unhealthy,
)
// When the indexer is shutdown, its status should be unhealthy/not serving
healthReporter.currentHealth() shouldBe HealthStatus.unhealthy
healthReporter.currentHealth() shouldBe Unhealthy
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import java.io.Closeable
import java.util.UUID

import akka.stream.Materializer
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.health.{HealthStatus, Unhealthy}
import com.daml.ledger.participant.state.kvutils.wire.DamlSubmissionBatch
import com.daml.ledger.participant.state.kvutils.{Envelope, Raw}
import com.daml.ledger.participant.state.v1.SubmissionResult
Expand Down Expand Up @@ -61,7 +61,7 @@ class BatchingLedgerWriter(val queue: BatchingQueue, val writer: LedgerWriter)(
if (queueHandle.isAlive)
writer.currentHealth()
else
HealthStatus.unhealthy
Unhealthy

private def commitBatch(
submissions: Seq[DamlSubmissionBatch.CorrelatedSubmission]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package com.daml.ledger.participant.state.kvutils.api

import akka.stream.Materializer
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.health.{Healthy, Unhealthy}
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.participant.state.kvutils.wire.DamlSubmissionBatch
import com.daml.ledger.participant.state.kvutils.{Envelope, Raw}
Expand Down Expand Up @@ -47,7 +47,7 @@ class BatchingLedgerWriterSpec
LoggingContext.newLoggingContext { implicit ctx =>
new BatchingLedgerWriter(queue, writer)
}
batchingWriter.currentHealth() shouldBe HealthStatus.unhealthy
batchingWriter.currentHealth() shouldBe Unhealthy
Future.successful(succeed)
}

Expand Down Expand Up @@ -87,7 +87,7 @@ class BatchingLedgerWriterSpec
any[TelemetryContext]
)
all(Seq(result1, result2, result3)) should be(SubmissionResult.Acknowledged)
batchingWriter.currentHealth() should be(HealthStatus.healthy)
batchingWriter.currentHealth() should be(Healthy)
}
}

Expand Down Expand Up @@ -131,7 +131,7 @@ object BatchingLedgerWriterSpec extends MockitoSugar with ArgumentMatchersSugar
)
.thenReturn(Future.successful(SubmissionResult.Acknowledged))
when(writer.participantId).thenReturn(Ref.ParticipantId.assertFromString("test-participant"))
when(writer.currentHealth()).thenReturn(HealthStatus.healthy)
when(writer.currentHealth()).thenReturn(Healthy)
writer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package com.daml.ledger.participant.state.kvutils.tools.integritycheck
import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.configuration.{LedgerId, LedgerInitialConditions}
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.kvutils.api.{
Expand Down Expand Up @@ -53,7 +53,7 @@ final class LogAppendingReadServiceFactory(
Source.fromIterator(() => recordedBlocksSnapshot.iterator)
}

override def currentHealth(): HealthStatus = HealthStatus.healthy
override def currentHealth(): HealthStatus = Healthy

override def ledgerId(): LedgerId = "FakeParticipantStateReaderLedgerId"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import akka.testkit.TestKit
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.configuration.LedgerInitialConditions
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.v1.Update
Expand Down Expand Up @@ -101,7 +101,7 @@ object StateUpdateExporterSpec extends MockitoSugar {
)
)

override def currentHealth(): HealthStatus = HealthStatus.healthy
override def currentHealth(): HealthStatus = Healthy
}

private def aConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.stream.{BoundedSourceQueue, Materializer, QueueOfferResult}
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.configuration.{
Configuration,
LedgerId,
Expand Down Expand Up @@ -75,7 +75,7 @@ case class ReadWriteServiceBridge(
)
)

override def currentHealth(): HealthStatus = HealthStatus.healthy
override def currentHealth(): HealthStatus = Healthy

override def allocateParty(
hint: Option[Ref.Party],
Expand Down

0 comments on commit 733590d

Please sign in to comment.