Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the ability to control latency by time and point to point #178

Merged
merged 1 commit into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions sim/cmd/f3sim/f3sim.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"flag"
"fmt"
"log"
"os"
"time"

"github.com/filecoin-project/go-f3/gpbft"
Expand Down Expand Up @@ -37,15 +39,18 @@ func main() {
Delta: time.Duration(*graniteDelta * float64(time.Second)),
DeltaBackOffExponent: *deltaBackOffExponent,
}
sm := sim.NewSimulation(simConfig, graniteConfig, *traceLevel)
sm, err := sim.NewSimulation(simConfig, graniteConfig, *traceLevel)
if err != nil {
log.Panicf("failed to instantiate simulation: %v\n", err)
}

// Same chain for everyone.
candidate := sm.Base(0).Extend(sm.TipGen.Sample())
sm.SetChains(sim.ChainCount{Count: *participantCount, Chain: candidate})

err := sm.Run(1, *maxRounds)
if err != nil {
if err := sm.Run(1, *maxRounds); err != nil {
sm.PrintResults()
os.Exit(1)
}
}
}
32 changes: 12 additions & 20 deletions sim/latency/latency.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,20 @@
package latency

import (
"math"
"math/rand"
"time"

"github.com/filecoin-project/go-f3/gpbft"
)

// A model for network latency.
// Model represents a latency model of cross participant communication. The model
// offers the ability for implementation of varying latency across a simulation,
// as well as specialised latency across specific participants.
//
// See LogNormal.
type Model interface {
Sample() time.Duration
}

type LogNormal struct {
rng *rand.Rand
mean time.Duration
}

func NewLogNormal(seed int64, mean time.Duration) *LogNormal {
rng := rand.New(rand.NewSource(seed))
return &LogNormal{rng: rng, mean: mean}
}

func (l *LogNormal) Sample() time.Duration {
norm := l.rng.NormFloat64()
lognorm := math.Exp(norm)
return time.Duration(lognorm * float64(l.mean))
// Sample returns an artificial latency at time t for communications from a
// participant to another participant.
//
// See: gpbft.Host, gpbft.Clock.
Sample(t time.Time, from, to gpbft.ActorID) time.Duration
}
39 changes: 39 additions & 0 deletions sim/latency/log_normal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package latency

import (
"errors"
"math"
"math/rand"
"time"

"github.com/filecoin-project/go-f3/gpbft"
)

var _ Model = (*LogNormal)(nil)

// LogNormal represents a log normal latency distribution with a configurable
// mean latency. This latency model does not specialise based on host clock time
// nor participants.
type LogNormal struct {
rng *rand.Rand
mean time.Duration
}

// NewLogNormal instantiates a new latency model of log normal latency
// distribution with the given mean.
func NewLogNormal(seed int64, mean time.Duration) (*LogNormal, error) {
if mean < 0 {
return nil, errors.New("mean duration cannot be negative")
}
return &LogNormal{rng: rand.New(rand.NewSource(seed)), mean: mean}, nil
}

// Sample returns latency samples that correspond to the log normal distribution
// with the configured mean. The samples returned disregard time and
// participants, i.e. all the samples returned correspond to a fixed log normal
// distribution.
func (l *LogNormal) Sample(time.Time, gpbft.ActorID, gpbft.ActorID) time.Duration {
norm := l.rng.NormFloat64()
lognorm := math.Exp(norm)
return time.Duration(lognorm * float64(l.mean))
}
39 changes: 39 additions & 0 deletions sim/latency/zipf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package latency

import (
"errors"
"fmt"
"math/rand"
"time"

"github.com/filecoin-project/go-f3/gpbft"
)

var _ Model = (*Zipf)(nil)

// Zipf represents a log normal latency distribution with a configurable
// max latency. This latency model does not specialise based on host clock time
// nor participants.
type Zipf struct {
dist *rand.Zipf
}

// NewZipf instantiates a new latency model of ZipF latency distribution with the
// given max.
func NewZipf(seed int64, s, v float64, max time.Duration) (*Zipf, error) {
if max < 0 {
return nil, errors.New("max duration cannot be negative")
}
dist := rand.NewZipf(rand.New(rand.NewSource(seed)), s, v, uint64(max))
if dist == nil {
return nil, fmt.Errorf("zipf parameters are out of band: s=%f, v=%f", s, v)
}
return &Zipf{dist: dist}, nil
}

// Sample returns latency samples that correspond to this ZipF numerical
// distribution. The samples returned disregard time and participants, i.e. the
// distribution does not vary over time nor for specific participants.
func (l *Zipf) Sample(time.Time, gpbft.ActorID, gpbft.ActorID) time.Duration {
return time.Duration(l.dist.Uint64())
}
10 changes: 5 additions & 5 deletions sim/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ func (n *Network) NetworkName() gpbft.NetworkName {

func (n *Network) Broadcast(msg *gpbft.GMessage) {
n.log(TraceSent, "P%d ↗ %v", msg.Sender, msg)
for _, k := range n.participantIDs {
if k != msg.Sender {
latency := n.latency.Sample()
for _, dest := range n.participantIDs {
if dest != msg.Sender {
latencySample := n.latency.Sample(n.Time(), msg.Sender, dest)
n.queue.Insert(
messageInFlight{
source: msg.Sender,
dest: k,
dest: dest,
payload: *msg,
deliverAt: n.clock.Add(latency),
deliverAt: n.clock.Add(latencySample),
})
}
}
Expand Down
9 changes: 6 additions & 3 deletions sim/sim.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ type Simulation struct {
TipGen *TipGen
}

func NewSimulation(simConfig Config, graniteConfig gpbft.GraniteConfig, traceLevel int) *Simulation {
func NewSimulation(simConfig Config, graniteConfig gpbft.GraniteConfig, traceLevel int) (*Simulation, error) {
// Create a network to deliver messages.
lat := latency.NewLogNormal(simConfig.LatencySeed, simConfig.LatencyMean)
lat, err := latency.NewLogNormal(simConfig.LatencySeed, simConfig.LatencyMean)
if err != nil {
return nil, err
}
sb := simConfig.SigningBacked

if sb == nil {
Expand Down Expand Up @@ -69,7 +72,7 @@ func NewSimulation(simConfig Config, graniteConfig gpbft.GraniteConfig, traceLev
Adversary: nil,
Decisions: decisions,
TipGen: tipGen,
}
}, nil
}

func (s *Simulation) Base(instance uint64) gpbft.ECChain {
Expand Down
3 changes: 2 additions & 1 deletion test/absent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
func TestAbsent(t *testing.T) {
t.Parallel()
repeatInParallel(t, ASYNC_ITERS, func(t *testing.T, repetition int) {
sm := sim.NewSimulation(AsyncConfig(3, repetition), GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(AsyncConfig(3, repetition), GraniteConfig(), sim.TraceNone)
require.NoError(t, err)
// Adversary has 1/4 of power.
sm.SetAdversary(adversary.NewAbsent(99, sm.HostFor(99)), 1)

Expand Down
5 changes: 3 additions & 2 deletions test/decide_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
)

func TestImmediateDecide(t *testing.T) {
sm := sim.NewSimulation(AsyncConfig(1, 0), GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(AsyncConfig(1, 0), GraniteConfig(), sim.TraceNone)
require.NoError(t, err)

// Create adversarial node
value := sm.Base(0).Extend(sm.TipGen.Sample())
Expand All @@ -22,7 +23,7 @@ func TestImmediateDecide(t *testing.T) {
// The honest node starts with a different chain (on the same base).
sm.SetChains(sim.ChainCount{Count: 1, Chain: sm.Base(0).Extend(sm.TipGen.Sample())})
adv.Begin()
err := sm.Run(1, MAX_ROUNDS)
err = sm.Run(1, MAX_ROUNDS)
if err != nil {
fmt.Printf("%s", sm.Describe())
sm.PrintResults()
Expand Down
36 changes: 24 additions & 12 deletions test/honest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
///// Tests for a single instance with no adversaries.

func TestSingleton(t *testing.T) {
sm := sim.NewSimulation(SyncConfig(1), GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(SyncConfig(1), GraniteConfig(), sim.TraceNone)
require.NoError(t, err)
a := sm.Base(0).Extend(sm.TipGen.Sample())
sm.SetChains(sim.ChainCount{Count: 1, Chain: a})

Expand All @@ -37,7 +38,8 @@ func TestSyncPair(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
sm := sim.NewSimulation(test.config, GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(test.config, GraniteConfig(), sim.TraceNone)
require.NoError(t, err)
a := sm.Base(0).Extend(sm.TipGen.Sample())
sm.SetChains(sim.ChainCount{Count: len(sm.Participants), Chain: a})

Expand All @@ -50,7 +52,8 @@ func TestSyncPair(t *testing.T) {
func TestASyncPair(t *testing.T) {
t.Parallel()
repeatInParallel(t, ASYNC_ITERS, func(t *testing.T, repetition int) {
sm := sim.NewSimulation(AsyncConfig(2, repetition), GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(AsyncConfig(2, repetition), GraniteConfig(), sim.TraceNone)
require.NoError(t, err)
a := sm.Base(0).Extend(sm.TipGen.Sample())
sm.SetChains(sim.ChainCount{Count: len(sm.Participants), Chain: a})

Expand All @@ -77,7 +80,8 @@ func TestSyncPairDisagree(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
sm := sim.NewSimulation(test.config, GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(test.config, GraniteConfig(), sim.TraceNone)
require.NoError(t, err)
a := sm.Base(0).Extend(sm.TipGen.Sample())
b := sm.Base(0).Extend(sm.TipGen.Sample())
sm.SetChains(sim.ChainCount{Count: 1, Chain: a}, sim.ChainCount{Count: 1, Chain: b})
Expand All @@ -91,7 +95,8 @@ func TestSyncPairDisagree(t *testing.T) {

func TestAsyncPairDisagree(t *testing.T) {
repeatInParallel(t, ASYNC_ITERS, func(t *testing.T, repetition int) {
sm := sim.NewSimulation(AsyncConfig(2, repetition), GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(AsyncConfig(2, repetition), GraniteConfig(), sim.TraceNone)
require.NoError(t, err)
a := sm.Base(0).Extend(sm.TipGen.Sample())
b := sm.Base(0).Extend(sm.TipGen.Sample())
sm.SetChains(sim.ChainCount{Count: 1, Chain: a}, sim.ChainCount{Count: 1, Chain: b})
Expand All @@ -105,7 +110,8 @@ func TestAsyncPairDisagree(t *testing.T) {
func TestSyncAgreement(t *testing.T) {
repeatInParallel(t, 50, func(t *testing.T, repetition int) {
honestCount := 3 + repetition
sm := sim.NewSimulation(SyncConfig(honestCount), GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(SyncConfig(honestCount), GraniteConfig(), sim.TraceNone)
require.NoError(t, err)
a := sm.Base(0).Extend(sm.TipGen.Sample())
sm.SetChains(sim.ChainCount{Count: len(sm.Participants), Chain: a})
require.NoErrorf(t, sm.Run(1, MAX_ROUNDS), "%s", sm.Describe())
Expand All @@ -125,7 +131,8 @@ func TestAsyncAgreement(t *testing.T) {
honestCount := n
t.Run(fmt.Sprintf("honest count %d", honestCount), func(t *testing.T) {
repeatInParallel(t, ASYNC_ITERS, func(t *testing.T, repetition int) {
sm := sim.NewSimulation(AsyncConfig(honestCount, repetition), GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(AsyncConfig(honestCount, repetition), GraniteConfig(), sim.TraceNone)
require.NoError(t, err)
a := sm.Base(0).Extend(sm.TipGen.Sample())
sm.SetChains(sim.ChainCount{Count: len(sm.Participants), Chain: a})

Expand All @@ -140,7 +147,8 @@ func TestSyncHalves(t *testing.T) {
t.Parallel()
repeatInParallel(t, 15, func(t *testing.T, repetition int) {
honestCount := repetition*2 + 2
sm := sim.NewSimulation(SyncConfig(honestCount), GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(SyncConfig(honestCount), GraniteConfig(), sim.TraceNone)
require.NoError(t, err)
a := sm.Base(0).Extend(sm.TipGen.Sample())
b := sm.Base(0).Extend(sm.TipGen.Sample())
sm.SetChains(sim.ChainCount{Count: honestCount / 2, Chain: a}, sim.ChainCount{Count: honestCount / 2, Chain: b})
Expand All @@ -159,7 +167,8 @@ func TestSyncHalvesBLS(t *testing.T) {
t.Parallel()
repeatInParallel(t, 3, func(t *testing.T, repetition int) {
honestCount := repetition*2 + 2
sm := sim.NewSimulation(SyncConfig(honestCount).UseBLS(), GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(SyncConfig(honestCount).UseBLS(), GraniteConfig(), sim.TraceNone)
require.NoError(t, err)
a := sm.Base(0).Extend(sm.TipGen.Sample())
b := sm.Base(0).Extend(sm.TipGen.Sample())
sm.SetChains(sim.ChainCount{Count: honestCount / 2, Chain: a}, sim.ChainCount{Count: honestCount / 2, Chain: b})
Expand All @@ -180,7 +189,8 @@ func TestAsyncHalves(t *testing.T) {
honestCount := n
t.Run(fmt.Sprintf("honest count %d", honestCount), func(t *testing.T) {
repeatInParallel(t, ASYNC_ITERS, func(t *testing.T, repetition int) {
sm := sim.NewSimulation(AsyncConfig(honestCount, repetition), GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(AsyncConfig(honestCount, repetition), GraniteConfig(), sim.TraceNone)
require.NoError(t, err)
a := sm.Base(0).Extend(sm.TipGen.Sample())
b := sm.Base(0).Extend(sm.TipGen.Sample())
sm.SetChains(sim.ChainCount{Count: honestCount / 2, Chain: a}, sim.ChainCount{Count: honestCount / 2, Chain: b})
Expand All @@ -200,7 +210,8 @@ func TestRequireStrongQuorumToProgress(t *testing.T) {

t.Parallel()
repeatInParallel(t, ASYNC_ITERS, func(t *testing.T, repetition int) {
sm := sim.NewSimulation(AsyncConfig(30, repetition), GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(AsyncConfig(30, repetition), GraniteConfig(), sim.TraceNone)
require.NoError(t, err)
a := sm.Base(0).Extend(sm.TipGen.Sample())
b := sm.Base(0).Extend(sm.TipGen.Sample())
// No strict > quorum.
Expand All @@ -215,7 +226,8 @@ func TestRequireStrongQuorumToProgress(t *testing.T) {
func TestLongestCommonPrefix(t *testing.T) {
// This test uses a synchronous configuration to ensure timely message delivery.
// If async, it is possible to decide the base chain if QUALITY messages are delayed.
sm := sim.NewSimulation(SyncConfig(4), GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(SyncConfig(4), GraniteConfig(), sim.TraceNone)
require.NoError(t, err)
ab := sm.Base(0).Extend(sm.TipGen.Sample())
abc := ab.Extend(sm.TipGen.Sample())
abd := ab.Extend(sm.TipGen.Sample())
Expand Down
15 changes: 10 additions & 5 deletions test/multi_instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
const INSTANCE_COUNT = 4000

func TestMultiSingleton(t *testing.T) {
sm := sim.NewSimulation(SyncConfig(1), GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(SyncConfig(1), GraniteConfig(), sim.TraceNone)
require.NoError(t, err)
a := sm.Base(0).Extend(sm.TipGen.Sample())
sm.SetChains(sim.ChainCount{Count: 1, Chain: a})

Expand All @@ -22,7 +23,8 @@ func TestMultiSingleton(t *testing.T) {
}

func TestMultiSyncPair(t *testing.T) {
sm := sim.NewSimulation(SyncConfig(2), GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(SyncConfig(2), GraniteConfig(), sim.TraceNone)
require.NoError(t, err)
a := sm.Base(0).Extend(sm.TipGen.Sample())
sm.SetChains(sim.ChainCount{Count: len(sm.Participants), Chain: a})

Expand All @@ -32,7 +34,8 @@ func TestMultiSyncPair(t *testing.T) {
}

func TestMultiASyncPair(t *testing.T) {
sm := sim.NewSimulation(AsyncConfig(2, 0), GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(AsyncConfig(2, 0), GraniteConfig(), sim.TraceNone)
require.NoError(t, err)
a := sm.Base(0).Extend(sm.TipGen.Sample())
sm.SetChains(sim.ChainCount{Count: len(sm.Participants), Chain: a})

Expand All @@ -52,7 +55,8 @@ func TestMultiSyncAgreement(t *testing.T) {
t.Parallel()
repeatInParallel(t, 9, func(t *testing.T, repetition int) {
honestCount := repetition + 3
sm := sim.NewSimulation(SyncConfig(honestCount), GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(SyncConfig(honestCount), GraniteConfig(), sim.TraceNone)
require.NoError(t, err)
a := sm.Base(0).Extend(sm.TipGen.Sample())
// All nodes start with the same chain and will observe the same extensions of that chain
// in subsequent instances.
Expand All @@ -72,7 +76,8 @@ func TestMultiAsyncAgreement(t *testing.T) {
t.Parallel()
repeatInParallel(t, 9, func(t *testing.T, repetition int) {
honestCount := repetition + 3
sm := sim.NewSimulation(AsyncConfig(honestCount, 0), GraniteConfig(), sim.TraceNone)
sm, err := sim.NewSimulation(AsyncConfig(honestCount, 0), GraniteConfig(), sim.TraceNone)
require.NoError(t, err)
sm.SetChains(sim.ChainCount{Count: honestCount, Chain: sm.Base(0).Extend(sm.TipGen.Sample())})

require.NoErrorf(t, sm.Run(INSTANCE_COUNT, MAX_ROUNDS), "%s", sm.Describe())
Expand Down
Loading
Loading