Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix oracle service behaviour #2074

Merged
merged 2 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 62 additions & 5 deletions pkg/core/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"path"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -54,7 +55,7 @@ func getTestOracle(t *testing.T, bc *Blockchain, walletPath, pass string) (
m := make(map[uint64]*responseWithSig)
ch := make(chan *transaction.Transaction, 5)
orcCfg := getOracleConfig(t, bc, walletPath, pass)
orcCfg.ResponseHandler = saveToMapBroadcaster{m}
orcCfg.ResponseHandler = &saveToMapBroadcaster{m: m}
orcCfg.OnTransaction = saveTxToChan(ch)
orcCfg.URIValidator = func(u *url.URL) error {
if strings.HasPrefix(u.Host, "private") {
Expand Down Expand Up @@ -298,18 +299,74 @@ func TestOracleFull(t *testing.T) {
require.True(t, txes[0].HasAttribute(transaction.OracleResponseT))
}

func TestNotYetRunningOracle(t *testing.T) {
bc := initTestChain(t, nil, nil)
acc, orc, _, _ := getTestOracle(t, bc, "./testdata/oracle2.json", "two")
mp := bc.GetMemPool()
orc.OnTransaction = func(tx *transaction.Transaction) { _ = mp.Add(tx, bc) }
bc.SetOracle(orc)

cs := getOracleContractState(bc.contracts.Oracle.Hash, bc.contracts.Std.Hash)
require.NoError(t, bc.contracts.Management.PutContractState(bc.dao, cs))

go bc.Run()
bc.setNodesByRole(t, true, noderoles.Oracle, keys.PublicKeys{acc.PrivateKey().PublicKey()})

var req state.OracleRequest
var reqs = make(map[uint64]*state.OracleRequest)
for i := uint64(0); i < 3; i++ {
reqs[i] = &req
}
orc.AddRequests(reqs) // 0, 1, 2 added to pending.

var ids = []uint64{0, 1}
orc.RemoveRequests(ids) // 0, 1 removed from pending, 2 left.

reqs = make(map[uint64]*state.OracleRequest)
for i := uint64(3); i < 5; i++ {
reqs[i] = &req
}
orc.AddRequests(reqs) // 3, 4 added to pending -> 2, 3, 4 in pending.

ids = []uint64{3}
orc.RemoveRequests(ids) // 3 removed from pending -> 2, 4 in pending.

go orc.Run()
t.Cleanup(orc.Shutdown)

require.Eventually(t, func() bool { return mp.Count() == 2 },
time.Second*3, time.Millisecond*200)
txes := mp.GetVerifiedTransactions()
require.Len(t, txes, 2)
var txids []uint64
for _, tx := range txes {
for _, attr := range tx.Attributes {
if attr.Type == transaction.OracleResponseT {
resp := attr.Value.(*transaction.OracleResponse)
txids = append(txids, resp.ID)
}
}
}
require.Len(t, txids, 2)
require.Contains(t, txids, uint64(2))
require.Contains(t, txids, uint64(4))
}

type saveToMapBroadcaster struct {
m map[uint64]*responseWithSig
mtx sync.RWMutex
m map[uint64]*responseWithSig
}

func (b saveToMapBroadcaster) SendResponse(_ *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) {
func (b *saveToMapBroadcaster) SendResponse(_ *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) {
b.mtx.Lock()
defer b.mtx.Unlock()
b.m[resp.ID] = &responseWithSig{
resp: resp,
txSig: txSig,
}
}
func (saveToMapBroadcaster) Run() {}
func (saveToMapBroadcaster) Shutdown() {}
func (*saveToMapBroadcaster) Run() {}
func (*saveToMapBroadcaster) Shutdown() {}

type responseWithSig struct {
resp *transaction.OracleResponse
Expand Down
21 changes: 19 additions & 2 deletions pkg/services/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ type (
requestCh chan request
requestMap chan map[uint64]*state.OracleRequest

// respMtx protects responses map.
respMtx sync.RWMutex
// respMtx protects responses and pending maps.
respMtx sync.RWMutex
// running is false until Run() is invoked.
running bool
// pending contains requests for not yet started service.
pending map[uint64]*state.OracleRequest
// responses contains active not completely processed requests.
responses map[uint64]*incompleteTx
// removed contains ids of requests which won't be processed further due to expiration.
removed map[uint64]bool
Expand Down Expand Up @@ -102,6 +107,7 @@ func NewOracle(cfg Config) (*Oracle, error) {

close: make(chan struct{}),
requestMap: make(chan map[uint64]*state.OracleRequest, 1),
pending: make(map[uint64]*state.OracleRequest),
responses: make(map[uint64]*incompleteTx),
removed: make(map[uint64]bool),
}
Expand Down Expand Up @@ -165,7 +171,18 @@ func (o *Oracle) Shutdown() {

// Run runs must be executed in a separate goroutine.
func (o *Oracle) Run() {
o.respMtx.Lock()
if o.running {
o.respMtx.Unlock()
return
}
o.Log.Info("starting oracle service")

o.requestMap <- o.pending // Guaranteed to not block, only AddRequests sends to it.
o.pending = nil
o.running = true
o.respMtx.Unlock()

for i := 0; i < o.MainCfg.MaxConcurrentRequests; i++ {
go o.runRequestWorker()
}
Expand Down
25 changes: 23 additions & 2 deletions pkg/services/oracle/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,14 @@ func (o *Oracle) runRequestWorker() {
func (o *Oracle) RemoveRequests(ids []uint64) {
o.respMtx.Lock()
defer o.respMtx.Unlock()
for _, id := range ids {
delete(o.responses, id)
if !o.running {
for _, id := range ids {
delete(o.pending, id)
}
} else {
for _, id := range ids {
delete(o.responses, id)
}
}
}

Expand All @@ -57,6 +63,16 @@ func (o *Oracle) AddRequests(reqs map[uint64]*state.OracleRequest) {
return
}

o.respMtx.Lock()
if !o.running {
for id, r := range reqs {
o.pending[id] = r
}
o.respMtx.Unlock()
return
}
o.respMtx.Unlock()

select {
case o.requestMap <- reqs:
default:
Expand Down Expand Up @@ -172,6 +188,7 @@ func (o *Oracle) processRequest(priv *keys.PrivateKey, req request) error {
o.Log.Debug("oracle request processed", zap.String("url", req.Req.URL), zap.Int("code", int(resp.Code)), zap.String("result", string(resp.Result)))

currentHeight := o.Chain.BlockHeight()
vubInc := o.Chain.GetConfig().MaxValidUntilBlockIncrement
_, h, err := o.Chain.GetTransaction(req.Req.OriginalTxID)
if err != nil {
if !errors.Is(err, storage.ErrKeyNotFound) {
Expand All @@ -180,10 +197,14 @@ func (o *Oracle) processRequest(priv *keys.PrivateKey, req request) error {
// The only reason tx can be not found is if it wasn't yet persisted from DAO.
h = currentHeight
}
h += vubInc // Main tx is only valid for RequestHeight + ValidUntilBlock.
tx, err := o.CreateResponseTx(int64(req.Req.GasForResponse), h, resp)
if err != nil {
return err
}
for h <= currentHeight { // Backup tx must be valid in any event.
h += vubInc
}
backupTx, err := o.CreateResponseTx(int64(req.Req.GasForResponse), h, &transaction.OracleResponse{
ID: req.ID,
Code: transaction.ConsensusUnreachable,
Expand Down
4 changes: 2 additions & 2 deletions pkg/services/oracle/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ func readResponse(rc gio.ReadCloser, limit int) ([]byte, error) {
}

// CreateResponseTx creates unsigned oracle response transaction.
func (o *Oracle) CreateResponseTx(gasForResponse int64, height uint32, resp *transaction.OracleResponse) (*transaction.Transaction, error) {
func (o *Oracle) CreateResponseTx(gasForResponse int64, vub uint32, resp *transaction.OracleResponse) (*transaction.Transaction, error) {
tx := transaction.New(o.oracleResponse, 0)
tx.Nonce = uint32(resp.ID)
tx.ValidUntilBlock = height + o.Chain.GetConfig().MaxValidUntilBlockIncrement
tx.ValidUntilBlock = vub
tx.Attributes = []transaction.Attribute{{
Type: transaction.OracleResponseT,
Value: resp,
Expand Down