Skip to content

Commit

Permalink
Merge pull request #6 from vadiminshakov/refactor-wal
Browse files Browse the repository at this point in the history
Refactor wal
  • Loading branch information
vadiminshakov authored Sep 26, 2024
2 parents 1b066b3 + f783403 commit 28ec5b9
Show file tree
Hide file tree
Showing 20 changed files with 278 additions and 1,099 deletions.
7 changes: 3 additions & 4 deletions core/cohort/cohort.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ import (

type Cohort interface {
Propose(ctx context.Context, req *dto.ProposeRequest) (*dto.CohortResponse, error)
Precommit(ctx context.Context, index uint64, votes []*dto.Vote) (*dto.CohortResponse, error)
Precommit(ctx context.Context, index uint64) (*dto.CohortResponse, error)
Commit(ctx context.Context, in *dto.CommitRequest) (*dto.CohortResponse, error)
Height() uint64
}

type CohortImpl struct {
committer *commitalgo.Committer
commitType Mode
height uint64
}

func NewCohort(
Expand All @@ -37,12 +36,12 @@ func (c *CohortImpl) Propose(ctx context.Context, req *dto.ProposeRequest) (*dto
return c.committer.Propose(ctx, req)
}

func (s *CohortImpl) Precommit(ctx context.Context, index uint64, votes []*dto.Vote) (*dto.CohortResponse, error) {
func (s *CohortImpl) Precommit(ctx context.Context, index uint64) (*dto.CohortResponse, error) {
if s.commitType != THREE_PHASE {
return nil, errors.New("precommit is allowed for 3PC mode only")
}

return s.committer.Precommit(ctx, index, votes)
return s.committer.Precommit(ctx, index)
}

func (c *CohortImpl) Commit(ctx context.Context, in *dto.CommitRequest) (resp *dto.CohortResponse, err error) {
Expand Down
157 changes: 58 additions & 99 deletions core/cohort/commitalgo/commitalgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,76 +6,31 @@ import (
log "github.com/sirupsen/logrus"
"github.com/vadiminshakov/committer/core/dto"
"github.com/vadiminshakov/committer/io/db"
"github.com/vadiminshakov/committer/voteslog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"sync"
"sync/atomic"
"time"
)

type wal interface {
Set(index uint64, key string, value []byte) error
Get(index uint64) (string, []byte, bool)
Close() error
}

type Committer struct {
noAutoCommit map[uint64]struct{}
db db.Repository
vlog voteslog.Log
wal wal
proposeHook func(req *dto.ProposeRequest) bool
precommitHook func(height uint64) bool
commitHook func(req *dto.CommitRequest) bool
precommitDone pendingPrecommit
state *stateMachine
height uint64
timeout uint64
}

type pendingPrecommit struct {
done map[uint64]chan struct{}
mu sync.RWMutex
}

func newPendingPrecommit() pendingPrecommit {
return pendingPrecommit{
done: make(map[uint64]chan struct{}),
}
}

func (p *pendingPrecommit) add(height uint64) {
p.mu.Lock()
p.done[height] = make(chan struct{})
p.mu.Unlock()
}

func (p *pendingPrecommit) remove(height uint64) {
p.mu.RLock()
ch := p.done[height]
p.mu.RUnlock()
close(ch)

p.mu.Lock()
delete(p.done, height)
p.mu.Unlock()
}

func (p *pendingPrecommit) wait(height uint64) chan struct{} {
p.mu.RLock()
waitch := p.done[height]
p.mu.RUnlock()

return waitch
}

func (p *pendingPrecommit) signalToChan(height uint64) {
p.mu.RLock()
ch := p.done[height]
p.mu.RUnlock()

select {
case ch <- struct{}{}:
default:

}
}

func NewCommitter(d db.Repository, vlog voteslog.Log,
func NewCommitter(d db.Repository, commitType string, wal wal,
proposeHook func(req *dto.ProposeRequest) bool,
commitHook func(req *dto.CommitRequest) bool,
timeout uint64) *Committer {
Expand All @@ -84,92 +39,93 @@ func NewCommitter(d db.Repository, vlog voteslog.Log,
precommitHook: nil,
commitHook: commitHook,
db: d,
vlog: vlog,
wal: wal,
noAutoCommit: make(map[uint64]struct{}),
timeout: timeout,
precommitDone: newPendingPrecommit(),
state: newStateMachine(mode(commitType)),
}
}

func (c *Committer) Height() uint64 {
return c.height
}

func (c *Committer) Propose(_ context.Context, req *dto.ProposeRequest) (*dto.CohortResponse, error) {
var response *dto.CohortResponse
func (c *Committer) Propose(ctx context.Context, req *dto.ProposeRequest) (*dto.CohortResponse, error) {
if err := c.state.Transition(proposeStage); err != nil {
return nil, err
}

if atomic.LoadUint64(&c.height) > req.Height {
return &dto.CohortResponse{ResponseType: dto.ResponseTypeNack, Height: atomic.LoadUint64(&c.height)}, nil
}

if c.proposeHook(req) {
log.Infof("received: %s=%s\n", req.Key, string(req.Value))
c.vlog.Set(req.Height, req.Key, req.Value)
response = &dto.CohortResponse{ResponseType: dto.ResponseTypeAck, Height: req.Height}
} else {
response = &dto.CohortResponse{ResponseType: dto.ResponseTypeNack, Height: req.Height}
if !c.proposeHook(req) {
return &dto.CohortResponse{ResponseType: dto.ResponseTypeNack, Height: req.Height}, nil
}

return response, nil
}
log.Infof("received: %s=%s\n", req.Key, string(req.Value))
c.wal.Set(req.Height, req.Key, req.Value)

func (c *Committer) Precommit(ctx context.Context, index uint64, votes []*dto.Vote) (*dto.CohortResponse, error) {
c.precommitDone.add(index)
if c.state.mode == twophase {
return &dto.CohortResponse{ResponseType: dto.ResponseTypeAck, Height: req.Height}, nil
}

go func(ctx context.Context) {
deadline := time.After(time.Duration(c.timeout) * time.Millisecond)
ForLoop:
for {
select {
case <-c.precommitDone.wait(index):
break ForLoop
case <-deadline:
c.precommitDone.remove(index)
if _, ok := c.noAutoCommit[index]; ok {
if c.state.currentState == commitStage {
return
}
md := metadata.Pairs("mode", "autocommit")
ctx := metadata.NewOutgoingContext(context.Background(), md)
if !isAllNodesAccepted(votes) {
break ForLoop
}
c.Commit(ctx, &dto.CommitRequest{Height: index})
log.Warn("committed without coordinator after timeout")
break ForLoop

c.wal.Set(req.Height, "skip", nil)

log.Warn("skip proposed message after timeout")
}
}
}(ctx)
for _, v := range votes {
if !v.IsAccepted {
log.Printf("Node %s is not accepted proposal with index %d\n", v.Node, index)
return &dto.CohortResponse{ResponseType: dto.ResponseTypeNack}, nil
}
}

return &dto.CohortResponse{ResponseType: dto.ResponseTypeAck}, nil
return &dto.CohortResponse{ResponseType: dto.ResponseTypeAck, Height: req.Height}, nil
}

func isAllNodesAccepted(votes []*dto.Vote) bool {
for _, v := range votes {
if !v.IsAccepted {
return false
}
func (c *Committer) Precommit(ctx context.Context, index uint64) (*dto.CohortResponse, error) {
if err := c.state.Transition(precommitStage); err != nil {
return nil, err
}

return true
}
go func(ctx context.Context) {
deadline := time.After(time.Duration(c.timeout) * time.Millisecond)
for {
select {
case <-deadline:
if c.state.currentState == commitStage {
return
}

func (c *Committer) Commit(ctx context.Context, req *dto.CommitRequest) (*dto.CohortResponse, error) {
c.precommitDone.signalToChan(atomic.LoadUint64(&c.height))
c.Commit(ctx, &dto.CommitRequest{Height: index})
c.state.currentState = proposeStage
log.Warn("committed without coordinator after timeout")
}
}
}(ctx)

c.noAutoCommit[req.Height] = struct{}{}
return &dto.CohortResponse{ResponseType: dto.ResponseTypeAck}, nil
}

func (c *Committer) Commit(ctx context.Context, req *dto.CommitRequest) (*dto.CohortResponse, error) {
var response *dto.CohortResponse
if req.Height < atomic.LoadUint64(&c.height) {
return nil, status.Errorf(codes.AlreadyExists, "stale commit proposed by coordinator (got %d, but actual height is %d)", req.Height, c.height)
}

if err := c.state.Transition(commitStage); err != nil {
return nil, err
}

if c.commitHook(req) {
log.Printf("Committing on height: %d\n", req.Height)
key, value, ok := c.vlog.Get(req.Height)
key, value, ok := c.wal.Get(req.Height)
if !ok {
return &dto.CohortResponse{ResponseType: dto.ResponseTypeNack}, fmt.Errorf("no value in node cache on the index %d", req.Height)
}
Expand All @@ -186,5 +142,8 @@ func (c *Committer) Commit(ctx context.Context, req *dto.CommitRequest) (*dto.Co
fmt.Println("ack cohort", atomic.LoadUint64(&c.height))
atomic.AddUint64(&c.height, 1)
}

c.state.currentState = proposeStage

return response, nil
}
64 changes: 64 additions & 0 deletions core/cohort/commitalgo/fsm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package commitalgo

import (
"errors"
)

type mode string

const (
twophase = "two-phase"
threephase = "three-phase"
)
const (
proposeStage = "propose"
precommitStage = "precommit"
commitStage = "commit"
)

type stateMachine struct {
currentState string
mode mode
transitions map[string]map[string]struct{}
}

var twoPhaseTransitions = map[string]map[string]struct{}{
proposeStage: {
proposeStage: struct{}{},
commitStage: struct{}{},
},
}

var threePhaseTransitions = map[string]map[string]struct{}{
proposeStage: {
proposeStage: struct{}{},
precommitStage: struct{}{},
},
precommitStage: {
commitStage: struct{}{},
},
}

func newStateMachine(mode mode) *stateMachine {
tr := twoPhaseTransitions
if mode == threephase {
tr = threePhaseTransitions
}

return &stateMachine{
currentState: proposeStage,
mode: mode,
transitions: tr,
}
}

func (sm *stateMachine) Transition(nextState string) error {
if allowedStates, ok := sm.transitions[sm.currentState]; ok {
if _, ok = allowedStates[nextState]; ok {
sm.currentState = nextState
return nil
}
}

return errors.New("invalid state transition")
}
Loading

0 comments on commit 28ec5b9

Please sign in to comment.