Skip to content

Commit

Permalink
multiple peer support
Browse files Browse the repository at this point in the history
Signed-off-by: SamYuan1990 <[email protected]>
  • Loading branch information
SamYuan1990 authored and guoger committed Nov 6, 2020
1 parent dc97cff commit 16c2492
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 48 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Clone this repo and run `go build` at root dir. This is a go module project so y
Modify `config.json` according to your network. This is a sample:
```json
{
"peer_addr": "peer0.org1.example.com:7051",
"peer_addrs": ["peer0.org1.example.com:7051","peer0.org2.example.com:9051"],
"orderer_addr": "orderer.example.com:7050",
"channel": "mychannel",
"chaincode": "mycc",
Expand All @@ -50,13 +50,13 @@ Modify `config.json` according to your network. This is a sample:
"mspid": "Org1MSP",
"private_key": "wallet/priv.key",
"sign_cert": "wallet/sign.crt",
"tls_ca_certs": ["wallet/pca.crt","wallet/oca.crt"],
"tls_ca_certs": ["wallet/pca1.crt","wallet/pca2.crt","wallet/oca.crt"],
"num_of_conn": 20,
"client_per_conn": 40
}
```

`peer_addr`: peer address in IP:Port format. It does not support sending traffic to multiple peers, yet. You may need to add peer name, i.e. `peer0.org1.example.com` to your `/etc/hosts`
`peer_addrs`: peer address in IP:Port format. You may need to add peer name, i.e. `peer0.org1.example.com,peer0.org2.example.com` to your `/etc/hosts`

`orderer_addr`: orderer address in IP:Port format. It does not support sending traffic to multiple orderers, yet. You may need to add orderer name, i.e. `orderer.example.com` to your `/etc/hosts`

Expand All @@ -74,7 +74,7 @@ crypto-config/peerOrganizations/org1.example.com/users/[email protected]/ms
crypto-config/peerOrganizations/org1.example.com/users/[email protected]/msp/signcerts/[email protected]
```

`tls_ca_certs`: this contains TLS CA certificates of peer and orderer. If tls is disabled, leave this blank. Otherwise, it can be `crypto-config/peerOrganizations/org1.example.com/tlsca/tlsca.org1.example.com-cert.pem` from peer and `crypto-config/ordererOrganizations/example.com/tlsca/tlsca.example.com-cert.pem` from orderer
`tls_ca_certs`: this contains TLS CA certificates of peer and orderer. If tls is disabled, leave this blank. Otherwise, it can be [peer0 tls, peer1 tls ... ordere rtls]. `crypto-config/peerOrganizations/org1.example.com/tlsca/tlsca.org1.example.com-cert.pem` from peer and `crypto-config/ordererOrganizations/example.com/tlsca/tlsca.example.com-cert.pem` from orderer

`channel`: channel name

Expand Down
5 changes: 3 additions & 2 deletions config.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
{
"peer_addr": "peer0.org1.example.com:7051",
"peer_addrs": ["peer0.org1.example.com:7051","peer0.org2.example.com:9051"],
"orderer_addr": "orderer.example.com:7050",
"channel": "mychannel",
"chaincode": "mycc",
"args": ["put", "key", "value"],
"version": "",
"args": ["query","a"],
"mspid": "Org1MSP",
"private_key": "/path/to/private.key",
"sign_cert": "/path/to/sign.cert",
Expand Down
14 changes: 14 additions & 0 deletions config_sample.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"peer_addrs": ["peer0.org1.example.com:7051","peer0.org2.example.com:9051"],
"orderer_addr": "orderer.example.com:7050",
"channel": "mychannel",
"chaincode": "mycc",
"version": "",
"args": ["query","a"],
"mspid": "Org1MSP",
"private_key": "./crypto-config/peerOrganizations/org1.example.com/users/[email protected]/msp/keystore/4d75bedcf454389483dfe0efbf12602b64d8c0e2903451c688b1d5d9ba38e655_sk",
"sign_cert": "./crypto-config/peerOrganizations/org1.example.com/users/[email protected]/msp/signcerts/[email protected]",
"tls_ca_certs": ["./crypto-config/peerOrganizations/org1.example.com/peers/peer0.org1.example.com/msp/tlscacerts/tlsca.org1.example.com-cert.pem", "./crypto-config/peerOrganizations/org2.example.com/peers/peer0.org2.example.com/msp/tlscacerts/tlsca.org2.example.com-cert.pem","./crypto-config/ordererOrganizations/example.com/msp/tlscacerts/tlsca.example.com-cert.pem"],
"num_of_conn": 10,
"client_per_conn": 10
}
16 changes: 11 additions & 5 deletions infra/assembler.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package infra

import (
"sync"

"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
)

type Elecments struct {
Proposal *peer.Proposal
SignedProp *peer.SignedProposal
Response *peer.ProposalResponse
Responses []*peer.ProposalResponse
lock sync.Mutex
Envelope *common.Envelope
}

Expand All @@ -17,7 +20,7 @@ type Assembler struct {
}

func (a *Assembler) assemble(e *Elecments) *Elecments {
env, err := CreateSignedTx(e.Proposal, a.Signer, e.Response)
env, err := CreateSignedTx(e.Proposal, a.Signer, e.Responses)
if err != nil {
panic(err)
}
Expand All @@ -33,15 +36,18 @@ func (a *Assembler) sign(e *Elecments) *Elecments {
}

e.SignedProp = sprop

return e
}

func (a *Assembler) StartSigner(raw, signed chan *Elecments, done <-chan struct{}) {
func (a *Assembler) StartSigner(raw chan *Elecments, signed []chan *Elecments, done <-chan struct{}) {
for {
select {
case r := <-raw:
signed <- a.sign(r)

t := a.sign(r)
for _, v := range signed {
v <- t
}
case <-done:
return
}
Expand Down
2 changes: 1 addition & 1 deletion infra/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type Config struct {
PeerAddr string `json:"peer_addr"`
PeerAddrs []string `json:"peer_addrs"`
OrdererAddr string `json:"orderer_addr"`
Channel string `json:"channel"`
Chaincode string `json:"chaincode"`
Expand Down
18 changes: 5 additions & 13 deletions infra/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func SignProposal(prop *peer.Proposal, signer *Crypto) (*peer.SignedProposal, er
return &peer.SignedProposal{ProposalBytes: propBytes, Signature: sig}, nil
}

func CreateSignedTx(proposal *peer.Proposal, signer *Crypto, resps ...*peer.ProposalResponse) (*common.Envelope, error) {
func CreateSignedTx(proposal *peer.Proposal, signer *Crypto, resps []*peer.ProposalResponse) (*common.Envelope, error) {
if len(resps) == 0 {
return nil, errors.New("at least one proposal response is required")
}
Expand Down Expand Up @@ -92,6 +92,8 @@ func CreateSignedTx(proposal *peer.Proposal, signer *Crypto, resps ...*peer.Prop
return nil, err
}

endorsements := make([]*peer.Endorsement, 0)

// ensure that all actions are bitwise equal and that they are successful
var a1 []byte
for n, r := range resps {
Expand All @@ -100,22 +102,14 @@ func CreateSignedTx(proposal *peer.Proposal, signer *Crypto, resps ...*peer.Prop
if r.Response.Status < 200 || r.Response.Status >= 400 {
return nil, errors.Errorf("proposal response was not successful, error code %d, msg %s", r.Response.Status, r.Response.Message)
}
continue
}

if bytes.Compare(a1, r.Payload) != 0 {
return nil, errors.New("ProposalResponsePayloads do not match")
}
endorsements = append(endorsements, r.Endorsement)
}

// fill endorsements
endorsements := make([]*peer.Endorsement, len(resps))
for n, r := range resps {
endorsements[n] = r.Endorsement
}

// create ChaincodeEndorsedAction
cea := &peer.ChaincodeEndorsedAction{ProposalResponsePayload: resps[0].Payload, Endorsements: endorsements}
cea := &peer.ChaincodeEndorsedAction{ProposalResponsePayload: a1, Endorsements: endorsements}

// obtain the bytes of the proposal payload that will go to the transaction
propPayloadBytes, err := utils.GetBytesProposalPayloadForTx(pPayl, hdrExt.PayloadVisibility)
Expand All @@ -135,7 +129,6 @@ func CreateSignedTx(proposal *peer.Proposal, signer *Crypto, resps ...*peer.Prop
taas := make([]*peer.TransactionAction, 1)
taas[0] = taa
tx := &peer.Transaction{Actions: taas}

// serialize the tx
txBytes, err := utils.GetBytesTransaction(tx)
if err != nil {
Expand All @@ -154,7 +147,6 @@ func CreateSignedTx(proposal *peer.Proposal, signer *Crypto, resps ...*peer.Prop
if err != nil {
return nil, err
}

// here's the envelope
return &common.Envelope{Payload: paylBytes, Signature: sig}, nil
}
Expand Down
48 changes: 29 additions & 19 deletions infra/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,37 @@ import (
)

type Proposers struct {
workers []*Proposer

workers [][]*Proposer
//one proposer per connection per peer
client int
index uint64
}

func CreateProposers(conn, client int, addr string, crypto *Crypto) *Proposers {
ps := make([]*Proposer, conn)
for i := 0; i < conn; i++ {
ps[i] = CreateProposer(addr, crypto)
func CreateProposers(conn, client int, addrs []string, crypto *Crypto) *Proposers {
var ps [][]*Proposer
//one proposer per connection per peer
for _, addr := range addrs {
row := make([]*Proposer, conn)
for j := 0; j < conn; j++ {
row[j] = CreateProposer(addr, crypto)
}
ps = append(ps, row)
}

return &Proposers{workers: ps, client: client}
}

func (ps *Proposers) Start(signed, processed chan *Elecments, done <-chan struct{}) {
func (ps *Proposers) Start(signed []chan *Elecments, processed chan *Elecments, done <-chan struct{}, config Config) {
fmt.Printf("Start sending transactions...\n\n")
for _, p := range ps.workers {
for i := 0; i < ps.client; i++ {
go p.Start(signed, processed, done)
for i := 0; i < len(config.PeerAddrs); i++ {
for j := 0; j < config.NumOfConn; j++ {
go ps.workers[i][j].Start(signed[i], processed, done, len(config.PeerAddrs))
}
}
}

type Proposer struct {
e peer.EndorserClient
e peer.EndorserClient
addr string
}

func CreateProposer(addr string, crypto *Crypto) *Proposer {
Expand All @@ -45,22 +50,27 @@ func CreateProposer(addr string, crypto *Crypto) *Proposer {
panic(err)
}

return &Proposer{e: endorser}
return &Proposer{e: endorser, addr: addr}
}

func (p *Proposer) Start(signed, processed chan *Elecments, done <-chan struct{}) {
func (p *Proposer) Start(signed, processed chan *Elecments, done <-chan struct{}, threshold int) {
for {
select {
case s := <-signed:
//send sign proposal to peer for endorsement
r, err := p.e.ProcessProposal(context.Background(), s.SignedProp)
if err != nil || r.Response.Status < 200 || r.Response.Status >= 400 {
fmt.Printf("Err processing proposal: %s, status: %d\n", err, r.Response.Status)
fmt.Printf("Err processing proposal: %s, status: %d, addr: %s \n", err, r.Response.Status, p.addr)
fmt.Println(r)
continue
}

s.Response = r
processed <- s

s.lock.Lock()
//collect for endorsement
s.Responses = append(s.Responses, r)
if len(s.Responses) >= threshold {
processed <- s
}
s.lock.Unlock()
case <-done:
return
}
Expand Down
13 changes: 9 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,29 @@ func main() {
crypto := config.LoadCrypto()

raw := make(chan *infra.Elecments, 100)
signed := make(chan *infra.Elecments, 10)
signed := make([]chan *infra.Elecments, len(config.PeerAddrs))
processed := make(chan *infra.Elecments, 10)
envs := make(chan *infra.Elecments, 10)
done := make(chan struct{})

assember := &infra.Assembler{Signer: crypto}

for i := 0; i < len(config.PeerAddrs); i++ {
signed[i] = make(chan *infra.Elecments, 10)
}

for i := 0; i < 5; i++ {
go assember.StartSigner(raw, signed, done)
go assember.StartIntegrator(processed, envs, done)
}

proposor := infra.CreateProposers(config.NumOfConn, config.ClientPerConn, config.PeerAddr, crypto)
proposor.Start(signed, processed, done)
proposor := infra.CreateProposers(config.NumOfConn, config.ClientPerConn, config.PeerAddrs, crypto)
proposor.Start(signed, processed, done, config)

broadcaster := infra.CreateBroadcasters(config.NumOfConn, config.OrdererAddr, crypto)
broadcaster.Start(envs, done)

observer := infra.CreateObserver(config.PeerAddr, config.Channel, crypto)
observer := infra.CreateObserver(config.PeerAddrs[0], config.Channel, crypto)

start := time.Now()
go observer.Start(N, start)
Expand Down

0 comments on commit 16c2492

Please sign in to comment.