diff --git a/CHANGELOG_UNRELEASED.md b/CHANGELOG_UNRELEASED.md index b14627293e..bec4f4ac3c 100644 --- a/CHANGELOG_UNRELEASED.md +++ b/CHANGELOG_UNRELEASED.md @@ -20,6 +20,7 @@ ### Emitter & Flusher +- (feat) [\#2241](https://github.com/bandprotocol/bandchain/pull/2241) Implement handle MsgDeposit for emitter and flusher. - (impv) [\#2240](https://github.com/bandprotocol/bandchain/pull/2240) Emit Kafka msg for staking genesis state. - (feat) [\#2238](https://github.com/bandprotocol/bandchain/pull/2238) Implement handle MsgSubmitProposal for emitter and flusher. diff --git a/chain/emitter/gov.go b/chain/emitter/gov.go index 05de025a47..d4ef30a759 100644 --- a/chain/emitter/gov.go +++ b/chain/emitter/gov.go @@ -6,15 +6,27 @@ import ( "github.com/cosmos/cosmos-sdk/x/gov/types" ) -func (app *App) emitSetDeposit(txHash []byte, id uint64, depositor sdk.AccAddress, amount sdk.Coins) { +func (app *App) emitSetDeposit(txHash []byte, id uint64, depositor sdk.AccAddress) { + deposit, _ := app.GovKeeper.GetDeposit(app.DeliverContext, id, depositor) app.Write("SET_DEPOSIT", JsDict{ "proposal_id": id, "depositor": depositor, - "amount": amount.String(), + "amount": deposit.Amount.String(), "tx_hash": txHash, }) } +func (app *App) emitUpdateProposalAfterDeposit(id uint64) { + proposal, _ := app.GovKeeper.GetProposal(app.DeliverContext, id) + app.Write("UPDATE_PROPOSAL", JsDict{ + "id": id, + "status": int(proposal.Status), + "total_deposit": proposal.TotalDeposit.String(), + "voting_time": proposal.VotingStartTime.UnixNano(), + "voting_end_time": proposal.VotingEndTime.UnixNano(), + }) +} + // handleMsgSubmitProposal implements emitter handler for MsgSubmitProposal. func (app *App) handleMsgSubmitProposal( txHash []byte, msg gov.MsgSubmitProposal, evMap EvMap, extra JsDict, @@ -35,5 +47,13 @@ func (app *App) handleMsgSubmitProposal( "voting_time": proposal.VotingStartTime.UnixNano(), "voting_end_time": proposal.VotingEndTime.UnixNano(), }) - app.emitSetDeposit(txHash, proposalId, msg.Proposer, msg.InitialDeposit) + app.emitSetDeposit(txHash, proposalId, msg.Proposer) +} + +// handleMsgDeposit implements emitter handler for MsgDeposit. +func (app *App) handleMsgDeposit( + txHash []byte, msg gov.MsgDeposit, evMap EvMap, extra JsDict, +) { + app.emitSetDeposit(txHash, msg.ProposalID, msg.Depositor) + app.emitUpdateProposalAfterDeposit(msg.ProposalID) } diff --git a/chain/emitter/handler.go b/chain/emitter/handler.go index b48a479b4a..3f652ba6a0 100644 --- a/chain/emitter/handler.go +++ b/chain/emitter/handler.go @@ -71,6 +71,8 @@ func (app *App) handleMsg(txHash []byte, msg sdk.Msg, log sdk.ABCIMessageLog, ex app.handleMsgUnjail(txHash, msg, evMap, extra) case gov.MsgSubmitProposal: app.handleMsgSubmitProposal(txHash, msg, evMap, extra) + case gov.MsgDeposit: + app.handleMsgDeposit(txHash, msg, evMap, extra) } } diff --git a/flusher/flusher/handler.py b/flusher/flusher/handler.py index c5cd66d6f0..01dd9c4419 100644 --- a/flusher/flusher/handler.py +++ b/flusher/flusher/handler.py @@ -191,3 +191,9 @@ def handle_set_deposit(self, msg): .values(**msg) .on_conflict_do_update(constraint="deposits_pkey", set_=msg) ) + + def handle_update_proposal(self, msg): + condition = True + for col in proposals.primary_key.columns.values(): + condition = (col == msg[col.name]) & condition + self.conn.execute(proposals.update().where(condition).values(**msg))