Skip to content

Commit

Permalink
Track command - use types for error handling instead of grpc statuses…
Browse files Browse the repository at this point in the history
… [KVL-1005] (#10503)

* Track command response using an Either instead of passing the completion with the grpc code.

This makes it clearer as to the result of command tracking. We no longer count on the grpc status to determine if there was an error or not, and instead use types for that.

CHANGELOG_BEGIN
akka-bindings: `LedgerClientBinding.commands` now returns a flow of `Either[CompletionFailure, CompletionSuccess]` instead of `Completion` for clearer error handling. For backwards compatiblity the new return type can be turned back into a `Completion` using `CompletionResponse.toCompletion`
CHANGELOG_END

* Fix formatting

* Code review changes

- remove usages of Symbol in tests
- clean curly braces

* Remove change added from another PR

* Fix import

* Fix import

* Fix retry flow and extract one more match case

* Un-nest matches to a single level for simplicity

* fix typo

Co-authored-by: Samir Talwar <[email protected]>

* Be consistent in assertions and prefer `inside` for pattern matching

* Inline CompletionResponse to use the full type

* Use simpler matcher

* Formatting

* Add a way to convert back an `Either[CompletionFailure, CompletionSuccess]` to a `Completion` for backwards compatibility. This simplifies update for systems that are tightly coupled to `Completion`

* Add test for converting to/from CompletionResponse

* Remove unnecessary brackets

* Add missing header

* Use checked exceptions to preserve backwards compatiblity

* Fix unapply

Co-authored-by: Samir Talwar <[email protected]>
  • Loading branch information
nicu-da and SamirTalwar authored Aug 12, 2021
1 parent 93c25f3 commit ee34d0f
Show file tree
Hide file tree
Showing 21 changed files with 508 additions and 241 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import com.daml.api.util.TimeProvider
import com.daml.ledger.api.refinements.ApiTypes.{ApplicationId, LedgerId, Party}
import com.daml.ledger.api.refinements.{CompositeCommand, CompositeCommandAdapter}
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.api.v1.event.Event
import com.daml.ledger.api.v1.ledger_identity_service.{
GetLedgerIdentityRequest,
Expand All @@ -25,6 +24,10 @@ import com.daml.ledger.client.binding.DomainTransactionMapper.DecoderType
import com.daml.ledger.client.binding.retrying.{CommandRetryFlow, RetryInfo}
import com.daml.ledger.client.binding.util.Slf4JLogger
import com.daml.ledger.client.configuration.LedgerClientConfiguration
import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
CompletionFailure,
CompletionSuccess,
}
import com.daml.util.Ctx
import io.grpc.ManagedChannel
import io.grpc.netty.NegotiationType.TLS
Expand Down Expand Up @@ -84,7 +87,8 @@ class LedgerClientBinding(
.via(DomainTransactionMapper(decoder))
}

type CommandTrackingFlow[C] = Flow[Ctx[C, CompositeCommand], Ctx[C, Completion], NotUsed]
type CommandTrackingFlow[C] =
Flow[Ctx[C, CompositeCommand], Ctx[C, Either[CompletionFailure, CompletionSuccess]], NotUsed]

private val compositeCommandAdapter = new CompositeCommandAdapter(
LedgerId(ledgerClient.ledgerId.unwrap),
Expand Down Expand Up @@ -115,7 +119,8 @@ class LedgerClientBinding(
retryInfo.request
}

type CommandsFlow[C] = Flow[Ctx[C, CompositeCommand], Ctx[C, Completion], NotUsed]
type CommandsFlow[C] =
Flow[Ctx[C, CompositeCommand], Ctx[C, Either[CompletionFailure, CompletionSuccess]], NotUsed]

def commands[C](party: Party)(implicit ec: ExecutionContext): Future[CommandsFlow[C]] = {
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,25 @@ import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition}
import com.daml.api.util.TimeProvider
import com.daml.ledger.api.refinements.ApiTypes.Party
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.client.services.commands.CommandClient
import com.daml.ledger.client.services.commands.tracker.CompletionResponse
import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
CompletionFailure,
CompletionSuccess,
}
import com.daml.util.Ctx
import com.google.rpc.Code
import com.google.rpc.status.Status
import scalaz.syntax.tag._

import scala.concurrent.{ExecutionContext, Future}

object CommandRetryFlow {

type In[C] = Ctx[C, SubmitRequest]
type Out[C] = Ctx[C, Completion]
type Out[C] = Ctx[C, Either[CompletionFailure, CompletionSuccess]]
type SubmissionFlowType[C] = Flow[In[C], Out[C], NotUsed]
type CreateRetryFn[C] = (RetryInfo[C], Completion) => SubmitRequest
type CreateRetryFn[C] =
(RetryInfo[C], Either[CompletionFailure, CompletionSuccess]) => SubmitRequest

private val RETRY_PORT = 0
private val PROPAGATE_PORT = 1
Expand Down Expand Up @@ -70,25 +74,34 @@ object CommandRetryFlow {
{
case Ctx(
RetryInfo(request, nrOfRetries, firstSubmissionTime, _),
Completion(_, Some(status: Status), _),
Left(CompletionResponse.NotOkResponse(_, status)),
_,
) =>
if (status.code == Code.OK_VALUE) {
PROPAGATE_PORT
} else if (
(firstSubmissionTime plus maxRetryTime) isBefore timeProvider.getCurrentTime
) {
RetryLogger.logStopRetrying(request, status, nrOfRetries, firstSubmissionTime)
) if RETRYABLE_ERROR_CODES.contains(status.code) =>
if ((firstSubmissionTime plus maxRetryTime) isBefore timeProvider.getCurrentTime) {
RetryLogger.logStopRetrying(
request,
status,
nrOfRetries,
firstSubmissionTime,
)
PROPAGATE_PORT
} else if (RETRYABLE_ERROR_CODES.contains(status.code)) {
} else {
RetryLogger.logNonFatal(request, status, nrOfRetries)
RETRY_PORT
} else {
RetryLogger.logFatal(request, status, nrOfRetries)
PROPAGATE_PORT
}
case Ctx(_, Completion(commandId, _, _), _) =>
case Ctx(
RetryInfo(request, nrOfRetries, _, _),
Left(CompletionResponse.NotOkResponse(_, status)),
_,
) =>
RetryLogger.logFatal(request, status, nrOfRetries)
PROPAGATE_PORT
case Ctx(_, Left(CompletionResponse.TimeoutResponse(_)), _) =>
PROPAGATE_PORT
case Ctx(_, Left(CompletionResponse.NoStatusInResponse(commandId)), _) =>
statusNotFoundError(commandId)
case Ctx(_, Right(_), _) =>
PROPAGATE_PORT
},
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,25 @@ import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
import com.daml.ledger.api.v1.commands.Commands
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.client.binding.retrying.CommandRetryFlow.{In, Out, SubmissionFlowType}
import com.daml.ledger.client.services.commands.tracker.CompletionResponse
import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
CompletionFailure,
CompletionSuccess,
NotOkResponse,
}
import com.daml.ledger.client.testing.AkkaTest
import com.daml.util.Ctx
import com.google.protobuf.duration.{Duration => protoDuration}
import com.google.rpc.Code
import com.google.rpc.status.Status
import org.scalatest.Inside
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec

import scala.annotation.nowarn
import scala.concurrent.Future

class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest {
class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest with Inside {

/** Uses the status received in the context for the first time,
* then replies OK status as the ledger effective time is stepped.
Expand All @@ -32,14 +39,11 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest {
.map {
case Ctx(context @ RetryInfo(_, _, _, status), SubmitRequest(Some(commands)), _) =>
if (commands.deduplicationTime.get.nanos == 0) {
Ctx(context, Completion(commands.commandId, Some(status)))
Ctx(context, CompletionResponse(Completion(commands.commandId, Some(status))))
} else {
Ctx(
context,
Completion(
commands.commandId,
Some(status.copy(code = Code.OK_VALUE)),
),
Right(CompletionResponse.CompletionSuccess(commands.commandId, "", status)),
)
}
case x =>
Expand All @@ -49,8 +53,11 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest {
private val timeProvider = TimeProvider.Constant(Instant.ofEpochSecond(60))
private val maxRetryTime = Duration.ofSeconds(30)

@nowarn("msg=parameter value completion .* is never used") // matches createGraph signature
private def createRetry(retryInfo: RetryInfo[Status], completion: Completion) = {
@nowarn("msg=parameter value response .* is never used") // matches createGraph signature
private def createRetry(
retryInfo: RetryInfo[Status],
response: Either[CompletionFailure, CompletionSuccess],
) = {
val commands = retryInfo.request.commands.get
val dedupTime = commands.deduplicationTime.get
val newDedupTime = dedupTime.copy(nanos = dedupTime.nanos + 1)
Expand Down Expand Up @@ -87,11 +94,11 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest {

CommandRetryFlow.getClass.getSimpleName should {

"propagete OK status" in {
"propagate OK status" in {
submitRequest(Code.OK_VALUE, Instant.ofEpochSecond(45)) map { result =>
result.size shouldBe 1
result.head.context.nrOfRetries shouldBe 0
result.head.value.status.get.code shouldBe Code.OK_VALUE
inside(result) { case Seq(Ctx(context, Right(_), _)) =>
context.nrOfRetries shouldBe 0
}
}
}

Expand All @@ -101,39 +108,42 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest {
.values()
.toList
.filterNot(c =>
c == Code.UNRECOGNIZED || CommandRetryFlow.RETRYABLE_ERROR_CODES.contains(c.getNumber)
c == Code.UNRECOGNIZED || CommandRetryFlow.RETRYABLE_ERROR_CODES
.contains(c.getNumber) || c == Code.OK
)
val failedSubmissions = codesToFail.map { code =>
submitRequest(code.getNumber, Instant.ofEpochSecond(45)) map { result =>
result.size shouldBe 1
result.head.context.nrOfRetries shouldBe 0
result.head.value.status.get.code shouldBe code.getNumber
inside(result) { case Seq(Ctx(context, Left(NotOkResponse(_, grpcStatus)), _)) =>
context.nrOfRetries shouldBe 0
grpcStatus.code shouldBe code.getNumber
}
}
}
Future.sequence(failedSubmissions).map(_ => succeed)
}

"retry RESOURCE_EXHAUSTED status" in {
submitRequest(Code.RESOURCE_EXHAUSTED_VALUE, Instant.ofEpochSecond(45)) map { result =>
result.size shouldBe 1
result.head.context.nrOfRetries shouldBe 1
result.head.value.status.get.code shouldBe Code.OK_VALUE
inside(result) { case Seq(Ctx(context, Right(_), _)) =>
context.nrOfRetries shouldBe 1
}
}
}

"retry UNAVAILABLE status" in {
submitRequest(Code.UNAVAILABLE_VALUE, Instant.ofEpochSecond(45)) map { result =>
result.size shouldBe 1
result.head.context.nrOfRetries shouldBe 1
result.head.value.status.get.code shouldBe Code.OK_VALUE
inside(result) { case Seq(Ctx(context, Right(_), _)) =>
context.nrOfRetries shouldBe 1
}
}
}

"stop retrying after maxRetryTime" in {
submitRequest(Code.RESOURCE_EXHAUSTED_VALUE, Instant.ofEpochSecond(15)) map { result =>
result.size shouldBe 1
result.head.context.nrOfRetries shouldBe 0
result.head.value.status.get.code shouldBe Code.RESOURCE_EXHAUSTED_VALUE
inside(result) { case Seq(Ctx(context, Left(NotOkResponse(_, grpcStatus)), _)) =>
context.nrOfRetries shouldBe 0
grpcStatus.code shouldBe Code.RESOURCE_EXHAUSTED_VALUE.intValue
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions ledger/ledger-api-client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ da_scala_library(
"//libs-scala/grpc-utils",
"//libs-scala/ports",
"//libs-scala/resources",
"@maven//:com_google_api_grpc_proto_google_common_protos",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:io_grpc_grpc_netty",
"@maven//:io_netty_netty_handler",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import java.util.concurrent.TimeUnit
import akka.NotUsed
import akka.stream.scaladsl.{Sink, Source}
import com.daml.api.util.TimeProvider
import com.daml.dec.DirectExecutionContext
import com.daml.ledger.api.domain
import com.daml.ledger.api.testing.utils.{
IsStatusException,
Expand All @@ -21,27 +22,30 @@ import com.daml.ledger.api.v1.command_submission_service.{
SubmitRequest,
}
import com.daml.ledger.api.v1.commands.{Command, CreateCommand, ExerciseCommand}
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset.LedgerBoundary.LEDGER_BEGIN
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset.Value.Boundary
import com.daml.ledger.api.v1.testing.time_service.TimeServiceGrpc
import com.daml.ledger.api.v1.value.{Record, RecordField}
import com.daml.ledger.client.configuration.CommandClientConfiguration
import com.daml.ledger.client.services.commands.tracker.CompletionResponse.{
CompletionFailure,
CompletionSuccess,
NotOkResponse,
}
import com.daml.ledger.client.services.commands.{CommandClient, CompletionStreamElement}
import com.daml.ledger.client.services.testing.time.StaticTime
import com.daml.platform.common.LedgerIdMode
import com.daml.dec.DirectExecutionContext
import com.daml.platform.participant.util.ValueConversions._
import com.daml.platform.sandbox.config.SandboxConfig
import com.daml.platform.sandbox.services.{SandboxFixture, TestCommands}
import com.daml.util.Ctx
import com.google.rpc.code.Code
import io.grpc.{Status, StatusRuntimeException}
import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._
import org.scalatest._
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._
import org.scalatest.wordspec.AsyncWordSpec
import scalaz.syntax.tag._

Expand All @@ -56,7 +60,8 @@ final class CommandClientIT
with SandboxFixture
with Matchers
with SuiteResourceManagementAroundAll
with TryValues {
with TryValues
with Inside {

private val defaultCommandClientConfiguration =
CommandClientConfiguration(
Expand Down Expand Up @@ -144,17 +149,21 @@ final class CommandClientIT
.runWith(Sink.seq)
.map(_.last) // one element is guaranteed

private def submitCommand(req: SubmitRequest): Future[Completion] =
private def submitCommand(
req: SubmitRequest
): Future[Either[CompletionFailure, CompletionSuccess]] =
commandClient().flatMap(_.trackSingleCommand(req))

private def assertCommandFailsWithCode(
submitRequest: SubmitRequest,
expectedErrorCode: Code,
expectedMessageSubString: String,
): Future[Assertion] =
submitCommand(submitRequest).map { completion =>
completion.getStatus.code should be(expectedErrorCode.value)
completion.getStatus.message should include(expectedMessageSubString)
submitCommand(submitRequest).map { result =>
inside(result) { case Left(NotOkResponse(_, grpcStatus)) =>
grpcStatus.code should be(expectedErrorCode.value)
grpcStatus.message should include(expectedMessageSubString)
}
}(DirectExecutionContext)

/** Reads a set of command IDs expected in the given client after the given checkpoint.
Expand Down
Loading

0 comments on commit ee34d0f

Please sign in to comment.