From 16c24927abd226de5a90486c017a88dbfd1c46bf Mon Sep 17 00:00:00 2001 From: SamYuan1990 Date: Sun, 15 Mar 2020 21:55:50 +0800 Subject: [PATCH] multiple peer support Signed-off-by: SamYuan1990 --- README.md | 8 ++++---- config.json | 5 +++-- config_sample.json | 14 ++++++++++++++ infra/assembler.go | 16 +++++++++++----- infra/config.go | 2 +- infra/proposal.go | 18 +++++------------ infra/proposer.go | 48 ++++++++++++++++++++++++++++------------------ main.go | 13 +++++++++---- 8 files changed, 76 insertions(+), 48 deletions(-) create mode 100644 config_sample.json diff --git a/README.md b/README.md index 0b40b9ae..1f358edf 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ Clone this repo and run `go build` at root dir. This is a go module project so y Modify `config.json` according to your network. This is a sample: ```json { - "peer_addr": "peer0.org1.example.com:7051", + "peer_addrs": ["peer0.org1.example.com:7051","peer0.org2.example.com:9051"], "orderer_addr": "orderer.example.com:7050", "channel": "mychannel", "chaincode": "mycc", @@ -50,13 +50,13 @@ Modify `config.json` according to your network. This is a sample: "mspid": "Org1MSP", "private_key": "wallet/priv.key", "sign_cert": "wallet/sign.crt", - "tls_ca_certs": ["wallet/pca.crt","wallet/oca.crt"], + "tls_ca_certs": ["wallet/pca1.crt","wallet/pca2.crt","wallet/oca.crt"], "num_of_conn": 20, "client_per_conn": 40 } ``` -`peer_addr`: peer address in IP:Port format. It does not support sending traffic to multiple peers, yet. You may need to add peer name, i.e. `peer0.org1.example.com` to your `/etc/hosts` +`peer_addrs`: peer address in IP:Port format. You may need to add peer name, i.e. `peer0.org1.example.com,peer0.org2.example.com` to your `/etc/hosts` `orderer_addr`: orderer address in IP:Port format. It does not support sending traffic to multiple orderers, yet. You may need to add orderer name, i.e. `orderer.example.com` to your `/etc/hosts` @@ -74,7 +74,7 @@ crypto-config/peerOrganizations/org1.example.com/users/User1@org1.example.com/ms crypto-config/peerOrganizations/org1.example.com/users/User1@org1.example.com/msp/signcerts/User1@org1.example.com-cert.pem ``` -`tls_ca_certs`: this contains TLS CA certificates of peer and orderer. If tls is disabled, leave this blank. Otherwise, it can be `crypto-config/peerOrganizations/org1.example.com/tlsca/tlsca.org1.example.com-cert.pem` from peer and `crypto-config/ordererOrganizations/example.com/tlsca/tlsca.example.com-cert.pem` from orderer +`tls_ca_certs`: this contains TLS CA certificates of peer and orderer. If tls is disabled, leave this blank. Otherwise, it can be [peer0 tls, peer1 tls ... ordere rtls]. `crypto-config/peerOrganizations/org1.example.com/tlsca/tlsca.org1.example.com-cert.pem` from peer and `crypto-config/ordererOrganizations/example.com/tlsca/tlsca.example.com-cert.pem` from orderer `channel`: channel name diff --git a/config.json b/config.json index a4f075b3..4a0ba4d3 100644 --- a/config.json +++ b/config.json @@ -1,9 +1,10 @@ { - "peer_addr": "peer0.org1.example.com:7051", + "peer_addrs": ["peer0.org1.example.com:7051","peer0.org2.example.com:9051"], "orderer_addr": "orderer.example.com:7050", "channel": "mychannel", "chaincode": "mycc", - "args": ["put", "key", "value"], + "version": "", + "args": ["query","a"], "mspid": "Org1MSP", "private_key": "/path/to/private.key", "sign_cert": "/path/to/sign.cert", diff --git a/config_sample.json b/config_sample.json new file mode 100644 index 00000000..dbe80261 --- /dev/null +++ b/config_sample.json @@ -0,0 +1,14 @@ +{ + "peer_addrs": ["peer0.org1.example.com:7051","peer0.org2.example.com:9051"], + "orderer_addr": "orderer.example.com:7050", + "channel": "mychannel", + "chaincode": "mycc", + "version": "", + "args": ["query","a"], + "mspid": "Org1MSP", + "private_key": "./crypto-config/peerOrganizations/org1.example.com/users/User1@org1.example.com/msp/keystore/4d75bedcf454389483dfe0efbf12602b64d8c0e2903451c688b1d5d9ba38e655_sk", + "sign_cert": "./crypto-config/peerOrganizations/org1.example.com/users/User1@org1.example.com/msp/signcerts/User1@org1.example.com-cert.pem", + "tls_ca_certs": ["./crypto-config/peerOrganizations/org1.example.com/peers/peer0.org1.example.com/msp/tlscacerts/tlsca.org1.example.com-cert.pem", "./crypto-config/peerOrganizations/org2.example.com/peers/peer0.org2.example.com/msp/tlscacerts/tlsca.org2.example.com-cert.pem","./crypto-config/ordererOrganizations/example.com/msp/tlscacerts/tlsca.example.com-cert.pem"], + "num_of_conn": 10, + "client_per_conn": 10 +} diff --git a/infra/assembler.go b/infra/assembler.go index 346ffefc..bdffb35d 100644 --- a/infra/assembler.go +++ b/infra/assembler.go @@ -1,6 +1,8 @@ package infra import ( + "sync" + "github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/peer" ) @@ -8,7 +10,8 @@ import ( type Elecments struct { Proposal *peer.Proposal SignedProp *peer.SignedProposal - Response *peer.ProposalResponse + Responses []*peer.ProposalResponse + lock sync.Mutex Envelope *common.Envelope } @@ -17,7 +20,7 @@ type Assembler struct { } func (a *Assembler) assemble(e *Elecments) *Elecments { - env, err := CreateSignedTx(e.Proposal, a.Signer, e.Response) + env, err := CreateSignedTx(e.Proposal, a.Signer, e.Responses) if err != nil { panic(err) } @@ -33,15 +36,18 @@ func (a *Assembler) sign(e *Elecments) *Elecments { } e.SignedProp = sprop + return e } -func (a *Assembler) StartSigner(raw, signed chan *Elecments, done <-chan struct{}) { +func (a *Assembler) StartSigner(raw chan *Elecments, signed []chan *Elecments, done <-chan struct{}) { for { select { case r := <-raw: - signed <- a.sign(r) - + t := a.sign(r) + for _, v := range signed { + v <- t + } case <-done: return } diff --git a/infra/config.go b/infra/config.go index 82ecc7fb..c90fcf2d 100644 --- a/infra/config.go +++ b/infra/config.go @@ -9,7 +9,7 @@ import ( ) type Config struct { - PeerAddr string `json:"peer_addr"` + PeerAddrs []string `json:"peer_addrs"` OrdererAddr string `json:"orderer_addr"` Channel string `json:"channel"` Chaincode string `json:"chaincode"` diff --git a/infra/proposal.go b/infra/proposal.go index 7c897396..44f64efa 100644 --- a/infra/proposal.go +++ b/infra/proposal.go @@ -53,7 +53,7 @@ func SignProposal(prop *peer.Proposal, signer *Crypto) (*peer.SignedProposal, er return &peer.SignedProposal{ProposalBytes: propBytes, Signature: sig}, nil } -func CreateSignedTx(proposal *peer.Proposal, signer *Crypto, resps ...*peer.ProposalResponse) (*common.Envelope, error) { +func CreateSignedTx(proposal *peer.Proposal, signer *Crypto, resps []*peer.ProposalResponse) (*common.Envelope, error) { if len(resps) == 0 { return nil, errors.New("at least one proposal response is required") } @@ -92,6 +92,8 @@ func CreateSignedTx(proposal *peer.Proposal, signer *Crypto, resps ...*peer.Prop return nil, err } + endorsements := make([]*peer.Endorsement, 0) + // ensure that all actions are bitwise equal and that they are successful var a1 []byte for n, r := range resps { @@ -100,22 +102,14 @@ func CreateSignedTx(proposal *peer.Proposal, signer *Crypto, resps ...*peer.Prop if r.Response.Status < 200 || r.Response.Status >= 400 { return nil, errors.Errorf("proposal response was not successful, error code %d, msg %s", r.Response.Status, r.Response.Message) } - continue } - if bytes.Compare(a1, r.Payload) != 0 { return nil, errors.New("ProposalResponsePayloads do not match") } + endorsements = append(endorsements, r.Endorsement) } - - // fill endorsements - endorsements := make([]*peer.Endorsement, len(resps)) - for n, r := range resps { - endorsements[n] = r.Endorsement - } - // create ChaincodeEndorsedAction - cea := &peer.ChaincodeEndorsedAction{ProposalResponsePayload: resps[0].Payload, Endorsements: endorsements} + cea := &peer.ChaincodeEndorsedAction{ProposalResponsePayload: a1, Endorsements: endorsements} // obtain the bytes of the proposal payload that will go to the transaction propPayloadBytes, err := utils.GetBytesProposalPayloadForTx(pPayl, hdrExt.PayloadVisibility) @@ -135,7 +129,6 @@ func CreateSignedTx(proposal *peer.Proposal, signer *Crypto, resps ...*peer.Prop taas := make([]*peer.TransactionAction, 1) taas[0] = taa tx := &peer.Transaction{Actions: taas} - // serialize the tx txBytes, err := utils.GetBytesTransaction(tx) if err != nil { @@ -154,7 +147,6 @@ func CreateSignedTx(proposal *peer.Proposal, signer *Crypto, resps ...*peer.Prop if err != nil { return nil, err } - // here's the envelope return &common.Envelope{Payload: paylBytes, Signature: sig}, nil } diff --git a/infra/proposer.go b/infra/proposer.go index 0e37bd61..f59eb3cd 100644 --- a/infra/proposer.go +++ b/infra/proposer.go @@ -11,32 +11,37 @@ import ( ) type Proposers struct { - workers []*Proposer - + workers [][]*Proposer + //one proposer per connection per peer client int - index uint64 } -func CreateProposers(conn, client int, addr string, crypto *Crypto) *Proposers { - ps := make([]*Proposer, conn) - for i := 0; i < conn; i++ { - ps[i] = CreateProposer(addr, crypto) +func CreateProposers(conn, client int, addrs []string, crypto *Crypto) *Proposers { + var ps [][]*Proposer + //one proposer per connection per peer + for _, addr := range addrs { + row := make([]*Proposer, conn) + for j := 0; j < conn; j++ { + row[j] = CreateProposer(addr, crypto) + } + ps = append(ps, row) } return &Proposers{workers: ps, client: client} } -func (ps *Proposers) Start(signed, processed chan *Elecments, done <-chan struct{}) { +func (ps *Proposers) Start(signed []chan *Elecments, processed chan *Elecments, done <-chan struct{}, config Config) { fmt.Printf("Start sending transactions...\n\n") - for _, p := range ps.workers { - for i := 0; i < ps.client; i++ { - go p.Start(signed, processed, done) + for i := 0; i < len(config.PeerAddrs); i++ { + for j := 0; j < config.NumOfConn; j++ { + go ps.workers[i][j].Start(signed[i], processed, done, len(config.PeerAddrs)) } } } type Proposer struct { - e peer.EndorserClient + e peer.EndorserClient + addr string } func CreateProposer(addr string, crypto *Crypto) *Proposer { @@ -45,22 +50,27 @@ func CreateProposer(addr string, crypto *Crypto) *Proposer { panic(err) } - return &Proposer{e: endorser} + return &Proposer{e: endorser, addr: addr} } -func (p *Proposer) Start(signed, processed chan *Elecments, done <-chan struct{}) { +func (p *Proposer) Start(signed, processed chan *Elecments, done <-chan struct{}, threshold int) { for { select { case s := <-signed: + //send sign proposal to peer for endorsement r, err := p.e.ProcessProposal(context.Background(), s.SignedProp) if err != nil || r.Response.Status < 200 || r.Response.Status >= 400 { - fmt.Printf("Err processing proposal: %s, status: %d\n", err, r.Response.Status) + fmt.Printf("Err processing proposal: %s, status: %d, addr: %s \n", err, r.Response.Status, p.addr) + fmt.Println(r) continue } - - s.Response = r - processed <- s - + s.lock.Lock() + //collect for endorsement + s.Responses = append(s.Responses, r) + if len(s.Responses) >= threshold { + processed <- s + } + s.lock.Unlock() case <-done: return } diff --git a/main.go b/main.go index 8733c97f..cd24f9a3 100644 --- a/main.go +++ b/main.go @@ -23,24 +23,29 @@ func main() { crypto := config.LoadCrypto() raw := make(chan *infra.Elecments, 100) - signed := make(chan *infra.Elecments, 10) + signed := make([]chan *infra.Elecments, len(config.PeerAddrs)) processed := make(chan *infra.Elecments, 10) envs := make(chan *infra.Elecments, 10) done := make(chan struct{}) assember := &infra.Assembler{Signer: crypto} + + for i := 0; i < len(config.PeerAddrs); i++ { + signed[i] = make(chan *infra.Elecments, 10) + } + for i := 0; i < 5; i++ { go assember.StartSigner(raw, signed, done) go assember.StartIntegrator(processed, envs, done) } - proposor := infra.CreateProposers(config.NumOfConn, config.ClientPerConn, config.PeerAddr, crypto) - proposor.Start(signed, processed, done) + proposor := infra.CreateProposers(config.NumOfConn, config.ClientPerConn, config.PeerAddrs, crypto) + proposor.Start(signed, processed, done, config) broadcaster := infra.CreateBroadcasters(config.NumOfConn, config.OrdererAddr, crypto) broadcaster.Start(envs, done) - observer := infra.CreateObserver(config.PeerAddr, config.Channel, crypto) + observer := infra.CreateObserver(config.PeerAddrs[0], config.Channel, crypto) start := time.Now() go observer.Start(N, start)