Skip to content

Commit

Permalink
doc(sagas): add more docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Reasno committed Mar 10, 2021
1 parent 300eb2a commit 6fb1392
Show file tree
Hide file tree
Showing 24 changed files with 221 additions and 99 deletions.
3 changes: 0 additions & 3 deletions dtransaction/doc.go

This file was deleted.

45 changes: 0 additions & 45 deletions dtransaction/sagas/saga.go

This file was deleted.

2 changes: 1 addition & 1 deletion dtransaction/correlation_id.go → dtx/correlation_id.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dtransaction
package dtx

type correlationIDType string

Expand Down
43 changes: 43 additions & 0 deletions dtx/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Package dtx contains common utilities in the context of distributed transaction.
Context Passing
It is curial for all parties in the distributed transaction to share an
transaction id. This package provides utility to pass this id across services.
HTTPToContext() http.RequestFunc
ContextToHTTP() http.RequestFunc
GRPCToContext() grpc.ServerRequestFunc
ContextToGRPC() grpc.ClientRequestFunc
Idempotency
Certain operations will be retried by the client more than once. A middleware is
provided for the server to shield against repeated request in the same
transaction.
func MakeIdempotence(s Oncer) endpoint.Middleware
Lock
Certain resource in transaction cannot be concurrently accessed. A middleware is
provided to lock such resources.
func MakeLock(l Locker) endpoint.Middleware
Allow Null Compensation and Prevent Resource Suspension
Transaction participants may receive the compensation
order before performing normal operations due to network exceptions. In this
case, null compensation is required.
If the forward operation arrives later than the compensating operation due to
network exceptions, the forward operation must be discarded. Otherwise, resource
suspension occurs.
func MakeAttempt(s Sequencer) endpoint.Middleware
func MakeCancel(s Sequencer) endpoint.Middleware
*/
package dtx
20 changes: 14 additions & 6 deletions dtransaction/middleware.go → dtx/middleware.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dtransaction
package dtx

import (
"context"
Expand Down Expand Up @@ -34,7 +34,7 @@ func MakeIdempotence(s Oncer) endpoint.Middleware {
}
}

// Locker is an interface for distributed lock.
// Locker is an interface for the distributed lock.
type Locker interface {
// Lock should return true only when it successfully grabs the lock.
Lock(ctx context.Context, key string) bool
Expand All @@ -58,17 +58,21 @@ func MakeLock(l Locker) endpoint.Middleware {
}
}

// AtomicTransactioner is an interface that shields against the disordering of
// Sequencer is an interface that shields against the disordering of
// attempt and cancel in a transactional context.
type AtomicTransactioner interface {
type Sequencer interface {
MarkCancelledCheckAttempted(context.Context, string) bool
MarkAttemptedCheckCancelled(context.Context, string) bool
}

// MakeAttempt returns a middleware that wraps around an attempt endpoint. If the
// this segment of the distributed transaction is already cancelled, the next
// endpoint will never be executed.
func MakeAttempt(s AtomicTransactioner) endpoint.Middleware {
//
// If the forward operation arrives later than the compensating operation due to
// network exceptions, the forward operation must be discarded. Otherwise,
// resource suspension occurs.
func MakeAttempt(s Sequencer) endpoint.Middleware {
return func(e endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
correlationID, ok := ctx.Value(CorrelationID).(string)
Expand All @@ -86,7 +90,11 @@ func MakeAttempt(s AtomicTransactioner) endpoint.Middleware {
// MakeCancel returns a middleware that wraps around the cancellation endpoint.
// It guarantees if this segment of the distributed transaction is never attempted,
// the cancellation endpoint will not be executed.
func MakeCancel(s AtomicTransactioner) endpoint.Middleware {
//
// Transaction participants may receive the compensation order before performing
// normal operations due to network exceptions. In this case, null compensation
// is required.
func MakeCancel(s Sequencer) endpoint.Middleware {
return func(e endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
correlationID, ok := ctx.Value(CorrelationID).(string)
Expand Down
2 changes: 1 addition & 1 deletion dtransaction/middleware_test.go → dtx/middleware_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dtransaction
package dtx

import (
"context"
Expand Down
6 changes: 3 additions & 3 deletions dtransaction/redis_store.go → dtx/redis_store.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dtransaction
package dtx

import (
"context"
Expand All @@ -8,7 +8,7 @@ import (
"github.com/go-redis/redis/v8"
)

// RedisStore is an implementation of Oncer, Locker and AtomicTransactioner.
// RedisStore is an implementation of Oncer, Locker and Sequencer.
type RedisStore struct {
keyer contract.Keyer
client redis.UniversalClient
Expand Down Expand Up @@ -41,7 +41,7 @@ return 0
}

// Lock grabs the lock for the given key. It returns true if the lock is
// successfully held. If the lock is not available, this method will block until
// successfully acquired. If the lock is not available, this method will block until
// the lock is released or the context expired. In latter case, false is
// returned.
func (r RedisStore) Lock(ctx context.Context, key string) bool {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// +build integration

package dtransaction
package dtx

import (
"context"
Expand Down
25 changes: 22 additions & 3 deletions dtransaction/sagas/dependency.go → dtx/sagas/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/DoNewsCode/core/config"
"github.com/DoNewsCode/core/contract"
"github.com/DoNewsCode/core/di"
"github.com/go-kit/kit/endpoint"
Expand All @@ -20,11 +21,10 @@ Providers returns a set of dependency providers.
[]*Step `group:"saga"`
Provide:
*Registry
recoverInterval
SagaEndpoints
*/
func Providers() di.Deps {
return []interface{}{provide}
return []interface{}{provide, provideConfig}
}

// in is the injection parameter for saga module.
Expand Down Expand Up @@ -56,7 +56,7 @@ func provide(in in) out {
if in.Store == nil {
in.Store = NewInProcessStore()
}
timeoutSec := in.Conf.Float64("sagas.defaultSagaTimeoutSecond")
timeoutSec := in.Conf.Float64("sagas.sagaTimeoutSecond")
if timeoutSec == 0 {
timeoutSec = 600
}
Expand Down Expand Up @@ -97,3 +97,22 @@ func (m out) ProvideRunGroup(group *run.Group) {
ticker.Stop()
})
}

type configOut struct {
Config []config.ExportedConfig
}

func provideConfig() configOut {
return configOut{Config: []config.ExportedConfig{
{
Owner: "sagas",
Data: map[string]interface{}{
"sagas": map[string]interface{}{
"sagaTimeoutSecond": "600",
"recoverIntervalSecond": "60",
},
},
Comment: "The saga config",
},
}}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/DoNewsCode/core"
"github.com/DoNewsCode/core/di"
"github.com/ghodss/yaml"
"github.com/oklog/run"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -46,6 +47,12 @@ func TestNew(t *testing.T) {
})
}

func TestExportedConfigs(t *testing.T) {
conf := provideConfig()
_, err := yaml.Marshal(conf)
assert.NoError(t, err)
}

func timeout(duration time.Duration, g *run.Group) {
ctx, cancel := context.WithTimeout(context.Background(), duration)
g.Add(func() error {
Expand Down
91 changes: 91 additions & 0 deletions dtx/sagas/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
Package sagas implements the orchestration based saga pattern.
See https://microservices.io/patterns/data/saga.html
Introduction
A saga is a sequence of local transactions. Each local transaction updates the
database and publishes a message or event to trigger the next local
transaction in the saga. If a local transaction fails because it violates a
business rule then the saga executes a series of compensating transactions
that undo the changes that were made by the preceding local transactions.
Usage
The saga is managed by sagas.Registry. Each saga step has an forward operation
and a rollback counterpart. They must be registered beforehand by calling
Registry.AddStep. A new endpoint will be returned to the caller. Use the
returned endpoint to perform transactional operation.
store := sagas.NewInProcessStore()
registry := sagas.NewRegistry(store)
addOrder := registry.AddStep(&sagas.Step{
Name: "Add Order",
Do: func(ctx context.Context, request interface{}) (response interface{}, err error) {
resp, err := orderEndpoint(ctx, request.(OrderRequest))
if err != nil {
return nil, err
}
return resp, nil
},
Undo: func(ctx context.Context, req interface{}) (response interface{}, err error) {
return orderCancelEndpoint(ctx, req)
},
})
makePayment := registry.AddStep(&sagas.Step{
Name: "Make Payment",
Do: func(ctx context.Context, request interface{}) (response interface{}, err error) {
resp, err := paymentEndpoint(ctx, request.(PaymentRequest))
if err != nil {
return nil, err
}
return resp, nil
},
Undo: func(ctx context.Context, req interface{}) (response interface{}, err error) {
return paymentCancelEndpoint(ctx)
},
})
Initiate the transaction by calling registry.StartTX. Pass the context returned
to the transaction branches. You can rollback or commit at your will. If the
TX.Rollback is called, the previously registered rollback operations will be
applied automatically, on condition that the forward operation is indeed
executed within the transaction.
tx, ctx := registry.StartTX(context.Background())
resp, err := addOrder(ctx, OrderRequest{Sku: "1"})
if err != nil {
tx.Rollback(ctx)
}
resp, err = makePayment(ctx, PaymentRequest{})
if err != nil {
tx.Rollback(ctx)
}
tx.Commit(ctx)
Integration
The package leader exports configuration in this format:
saga:
sagaTimeoutSecond: 600
recoverIntervalSecond: 60
To use package sagas with package core:
var c *core.C = core.Default()
c.Provide(sagas.Providers)
c.Invoke(func(registry *sagas.Registry) {
tx, ctx := registry.StartTX(context.Background())
resp, err := addOrder(ctx, OrderRequest{Sku: "1"})
if err != nil {
tx.Rollback(ctx)
}
resp, err = makePayment(ctx, PaymentRequest{})
if err != nil {
tx.Rollback(ctx)
}
tx.Commit(ctx)
})
*/
package sagas
Loading

0 comments on commit 6fb1392

Please sign in to comment.