From 4b1cca9635eadde282dc6746d47ccc4e69e7dfce Mon Sep 17 00:00:00 2001 From: Jay Yu <103467857+jayy04@users.noreply.github.com> Date: Wed, 20 Mar 2024 08:48:20 -0400 Subject: [PATCH 1/4] [CT-700] separate indexer and grpc streaming events --- protocol/mocks/ClobKeeper.go | 5 ++ protocol/mocks/MemClobKeeper.go | 5 ++ .../streaming/grpc/grpc_streaming_manager.go | 20 ++++---- protocol/testutil/memclob/keeper.go | 6 +++ protocol/x/clob/keeper/keeper.go | 10 +++- protocol/x/clob/keeper/orders.go | 2 - protocol/x/clob/memclob/memclob.go | 27 +++++++++++ .../x/clob/memclob/memclob_grpc_streaming.go | 48 +++++++++++++++++-- .../memclob/memclob_grpc_streaming_test.go | 14 +++--- protocol/x/clob/types/clob_keeper.go | 5 ++ protocol/x/clob/types/mem_clob_keeper.go | 4 ++ 11 files changed, 124 insertions(+), 22 deletions(-) diff --git a/protocol/mocks/ClobKeeper.go b/protocol/mocks/ClobKeeper.go index fc5a177f70..1b2fd17fa4 100644 --- a/protocol/mocks/ClobKeeper.go +++ b/protocol/mocks/ClobKeeper.go @@ -1130,6 +1130,11 @@ func (_m *ClobKeeper) RemoveOrderFillAmount(ctx types.Context, orderId clobtypes _m.Called(ctx, orderId) } +// SendOrderbookUpdates provides a mock function with given fields: offchainUpdates, snapshot +func (_m *ClobKeeper) SendOrderbookUpdates(offchainUpdates *clobtypes.OffchainUpdates, snapshot bool) { + _m.Called(offchainUpdates, snapshot) +} + // SetLongTermOrderPlacement provides a mock function with given fields: ctx, order, blockHeight func (_m *ClobKeeper) SetLongTermOrderPlacement(ctx types.Context, order clobtypes.Order, blockHeight uint32) { _m.Called(ctx, order, blockHeight) diff --git a/protocol/mocks/MemClobKeeper.go b/protocol/mocks/MemClobKeeper.go index 641f107d19..e9b6d3456a 100644 --- a/protocol/mocks/MemClobKeeper.go +++ b/protocol/mocks/MemClobKeeper.go @@ -434,6 +434,11 @@ func (_m *MemClobKeeper) ReplayPlaceOrder(ctx types.Context, msg *clobtypes.MsgP return r0, r1, r2, r3 } +// SendOrderbookUpdates provides a mock function with given fields: offchainUpdates, snapshot +func (_m *MemClobKeeper) SendOrderbookUpdates(offchainUpdates *clobtypes.OffchainUpdates, snapshot bool) { + _m.Called(offchainUpdates, snapshot) +} + // SetLongTermOrderPlacement provides a mock function with given fields: ctx, order, blockHeight func (_m *MemClobKeeper) SetLongTermOrderPlacement(ctx types.Context, order clobtypes.Order, blockHeight uint32) { _m.Called(ctx, order, blockHeight) diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index 95ae0a984e..8db6daca6b 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -103,19 +103,21 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( // Send updates to subscribers. idsToRemove := make([]uint32, 0) for id, subscription := range sm.orderbookSubscriptions { + updatesToSend := make([]ocutypes.OffChainUpdateV1, 0) for _, clobPairId := range subscription.clobPairIds { if updates, ok := v1updates[clobPairId]; ok { - if err := subscription.srv.Send( - &clobtypes.StreamOrderbookUpdatesResponse{ - Updates: updates, - Snapshot: snapshot, - }, - ); err != nil { - idsToRemove = append(idsToRemove, id) - break - } + updatesToSend = append(updatesToSend, updates...) } } + + if err := subscription.srv.Send( + &clobtypes.StreamOrderbookUpdatesResponse{ + Updates: updatesToSend, + Snapshot: snapshot, + }, + ); err != nil { + idsToRemove = append(idsToRemove, id) + } } // Clean up subscriptions that have been closed. diff --git a/protocol/testutil/memclob/keeper.go b/protocol/testutil/memclob/keeper.go index 83b368608f..6ba8856285 100644 --- a/protocol/testutil/memclob/keeper.go +++ b/protocol/testutil/memclob/keeper.go @@ -511,3 +511,9 @@ func (f *FakeMemClobKeeper) ValidateSubaccountEquityTierLimitForStatefulOrder( func (f *FakeMemClobKeeper) Logger(ctx sdk.Context) log.Logger { return ctx.Logger() } + +func (f *FakeMemClobKeeper) SendOrderbookUpdates( + offchainUpdates *types.OffchainUpdates, + snapshot bool, +) { +} diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index 300cc2a066..612c59fadd 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -233,5 +233,13 @@ func (k Keeper) InitializeNewGrpcStreams(ctx sdk.Context) { allUpdates.Append(update) } - streamingManager.SendOrderbookUpdates(allUpdates, true) + k.SendOrderbookUpdates(allUpdates, true) +} + +// SendOrderbookUpdates sends the offchain updates to the gRPC streaming manager. +func (k Keeper) SendOrderbookUpdates( + offchainUpdates *types.OffchainUpdates, + snapshot bool, +) { + k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, snapshot) } diff --git a/protocol/x/clob/keeper/orders.go b/protocol/x/clob/keeper/orders.go index f65aff2fce..a2c7773557 100644 --- a/protocol/x/clob/keeper/orders.go +++ b/protocol/x/clob/keeper/orders.go @@ -1310,8 +1310,6 @@ func (k Keeper) SendOffchainMessages( } k.GetIndexerEventManager().SendOffchainData(update) } - - k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, false) } // getFillQuoteQuantums returns the total fillAmount price in quote quantums based on the maker subticks. diff --git a/protocol/x/clob/memclob/memclob.go b/protocol/x/clob/memclob/memclob.go index 8f17780a85..7cbcc4b471 100644 --- a/protocol/x/clob/memclob/memclob.go +++ b/protocol/x/clob/memclob/memclob.go @@ -1216,6 +1216,18 @@ func (m *MemClobPriceTimePriority) PurgeInvalidMemclobState( m.RemoveOrderIfFilled(ctx, orderId) } + // For order matches in the last block, send an orderbook update + // to the grpc streams. + // Note that fully filled orders are removed from the orderbook in `RemoveOrderIfFilled` above. + allUpdates := types.NewOffchainUpdates() + for _, orderId := range filledOrderIds { + if m.openOrders.hasOrder(ctx, orderId) { + orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, orderId) + allUpdates.Append(orderbookUpdate) + } + } + m.clobKeeper.SendOrderbookUpdates(allUpdates, false) + // Remove all canceled stateful order IDs from the memclob if they exist. // If the slice has non-stateful order IDs or contains duplicates, panic. if lib.ContainsDuplicates(canceledStatefulOrderIds) { @@ -1510,6 +1522,10 @@ func (m *MemClobPriceTimePriority) mustAddOrderToOrderbook( } m.openOrders.mustAddOrderToOrderbook(ctx, newOrder, forceToFrontOfLevel) + + // Send an orderbook update to grpc streams. + orderbookUpdate := m.GetOrderbookUpdatesForOrderPlacement(ctx, newOrder) + m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) } // mustPerformTakerOrderMatching performs matching using the provided taker order while the order @@ -1944,6 +1960,10 @@ func (m *MemClobPriceTimePriority) mustRemoveOrder( !m.operationsToPropose.IsOrderPlacementInOperationsQueue(order) { m.operationsToPropose.RemoveShortTermOrderTxBytes(order) } + + // Send an orderbook update to grpc streams. + orderbookUpdate := m.GetOrderbookUpdatesForOrderRemoval(ctx, order.OrderId) + m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) } // mustUpdateOrderbookStateWithMatchedMakerOrder updates the orderbook with a matched maker order. @@ -1969,6 +1989,13 @@ func (m *MemClobPriceTimePriority) mustUpdateOrderbookStateWithMatchedMakerOrder m.mustRemoveOrder(ctx, makerOrderId) } + // If the order was fully filled, an orderbook update for removal was already sent in `mustRemoveOrder`. + // If the order was partially filled, send an orderbook update for the order's new total filled amount. + if newTotalFilledAmount < makerOrderBaseQuantums { + orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, makerOrder.OrderId) + m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) + } + if m.generateOffchainUpdates { // Send an off-chain update message to the indexer to update the total filled size of the maker // order. diff --git a/protocol/x/clob/memclob/memclob_grpc_streaming.go b/protocol/x/clob/memclob/memclob_grpc_streaming.go index a1c86d4d0c..7eee3594b7 100644 --- a/protocol/x/clob/memclob/memclob_grpc_streaming.go +++ b/protocol/x/clob/memclob/memclob_grpc_streaming.go @@ -3,6 +3,8 @@ package memclob import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates" + ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" + indexersharedtypes "github.com/dydxprotocol/v4-chain/protocol/indexer/shared/types" "github.com/dydxprotocol/v4-chain/protocol/lib" "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" ) @@ -27,7 +29,7 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot( level.LevelOrders.Front.Each( func(order types.ClobOrder) { offchainUpdates.Append( - m.GetOffchainUpdatesForOrder(ctx, order.Order), + m.GetOrderbookUpdatesForOrderPlacement(ctx, order.Order), ) }, ) @@ -44,7 +46,7 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot( level.LevelOrders.Front.Each( func(order types.ClobOrder) { offchainUpdates.Append( - m.GetOffchainUpdatesForOrder(ctx, order.Order), + m.GetOrderbookUpdatesForOrderPlacement(ctx, order.Order), ) }, ) @@ -54,10 +56,10 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot( return offchainUpdates } -// GetOffchainUpdatesForOrder returns a place order offchain message and +// GetOrderbookUpdatesForOrderPlacement returns a place order offchain message and // a update order offchain message used to construct an order for // the orderbook snapshot grpc stream. -func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrder( +func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderPlacement( ctx sdk.Context, order types.Order, ) (offchainUpdates *types.OffchainUpdates) { @@ -86,3 +88,41 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrder( return offchainUpdates } + +// GetOrderbookUpdatesForOrderRemoval returns a place order offchain message and +func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderRemoval( + ctx sdk.Context, + orderId types.OrderId, +) (offchainUpdates *types.OffchainUpdates) { + offchainUpdates = types.NewOffchainUpdates() + if message, success := off_chain_updates.CreateOrderRemoveMessageWithReason( + ctx, + orderId, + indexersharedtypes.OrderRemovalReason_ORDER_REMOVAL_REASON_UNSPECIFIED, + ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_BEST_EFFORT_CANCELED, + ); success { + offchainUpdates.AddRemoveMessage(orderId, message) + } + return offchainUpdates +} + +// GetOrderbookUpdatesForOrderRemoval returns a place order offchain message and +func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderUpdate( + ctx sdk.Context, + orderId types.OrderId, +) (offchainUpdates *types.OffchainUpdates) { + offchainUpdates = types.NewOffchainUpdates() + + // Get the current fill amount of the order. + fillAmount := m.GetOrderFilledAmount(ctx, orderId) + + // Generate an update message updating the total filled amount of order. + if message, success := off_chain_updates.CreateOrderUpdateMessage( + ctx, + orderId, + fillAmount, + ); success { + offchainUpdates.AddUpdateMessage(orderId, message) + } + return offchainUpdates +} diff --git a/protocol/x/clob/memclob/memclob_grpc_streaming_test.go b/protocol/x/clob/memclob/memclob_grpc_streaming_test.go index e5bd2bf3f2..5a865b25f9 100644 --- a/protocol/x/clob/memclob/memclob_grpc_streaming_test.go +++ b/protocol/x/clob/memclob/memclob_grpc_streaming_test.go @@ -21,6 +21,7 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Buy(t *testing.T) { mock.Anything, mock.Anything, ).Return(false, satypes.BaseQuantums(0), uint32(0)) + clobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return() memclob := NewMemClobPriceTimePriority(false) memclob.SetClobKeeper(clobKeeper) @@ -44,9 +45,9 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Buy(t *testing.T) { expected := types.NewOffchainUpdates() // Buy orders are in descending order. - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2])) - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0])) - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[2])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[0])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[1])) require.Equal(t, expected, offchainUpdates) } @@ -60,6 +61,7 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Sell(t *testing.T) { mock.Anything, mock.Anything, ).Return(false, satypes.BaseQuantums(0), uint32(0)) + clobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return() memclob := NewMemClobPriceTimePriority(false) memclob.SetClobKeeper(clobKeeper) @@ -83,9 +85,9 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Sell(t *testing.T) { expected := types.NewOffchainUpdates() // Sell orders are in ascending order. - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1])) - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2])) - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[1])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[2])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[0])) require.Equal(t, expected, offchainUpdates) } diff --git a/protocol/x/clob/types/clob_keeper.go b/protocol/x/clob/types/clob_keeper.go index 474d799b03..d96b046ad5 100644 --- a/protocol/x/clob/types/clob_keeper.go +++ b/protocol/x/clob/types/clob_keeper.go @@ -144,5 +144,10 @@ type ClobKeeper interface { clobPair ClobPair, ) error UpdateLiquidationsConfig(ctx sdk.Context, config LiquidationsConfig) error + // Gprc streaming InitializeNewGrpcStreams(ctx sdk.Context) + SendOrderbookUpdates( + offchainUpdates *OffchainUpdates, + snapshot bool, + ) } diff --git a/protocol/x/clob/types/mem_clob_keeper.go b/protocol/x/clob/types/mem_clob_keeper.go index 298e701cb9..7ca02f2569 100644 --- a/protocol/x/clob/types/mem_clob_keeper.go +++ b/protocol/x/clob/types/mem_clob_keeper.go @@ -115,4 +115,8 @@ type MemClobKeeper interface { Logger( ctx sdk.Context, ) log.Logger + SendOrderbookUpdates( + offchainUpdates *OffchainUpdates, + snapshot bool, + ) } From 225a3835b29fa13b623760c1602998e5aa8ec409 Mon Sep 17 00:00:00 2001 From: Jay Yu <103467857+jayy04@users.noreply.github.com> Date: Wed, 20 Mar 2024 09:35:42 -0400 Subject: [PATCH 2/4] fix tests --- ...emclob_purge_invalid_memclob_state_test.go | 19 ++++++++++++++++++- .../clob/memclob/memclob_remove_order_test.go | 1 + 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go b/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go index 2f6e78a68e..b15df32321 100644 --- a/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go +++ b/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go @@ -249,10 +249,11 @@ func TestPurgeInvalidMemclobState(t *testing.T) { // Setup memclob state. ctx, _, _ := sdktest.NewSdkContextWithMultistore() ctx = ctx.WithIsCheckTx(true) - mockMemClobKeeper := &mocks.MemClobKeeper{} memclob := NewMemClobPriceTimePriority(true) + mockMemClobKeeper := &mocks.MemClobKeeper{} memclob.SetClobKeeper(mockMemClobKeeper) mockMemClobKeeper.On("Logger", mock.Anything).Return(log.NewNopLogger()).Maybe() + mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() for _, operation := range tc.placedOperations { switch operation.Operation.(type) { @@ -339,6 +340,10 @@ func TestPurgeInvalidMemclobState_PanicsWhenCalledWithDuplicateCanceledStatefulO ctx, _, _ := sdktest.NewSdkContextWithMultistore() ctx = ctx.WithIsCheckTx(true) memclob := NewMemClobPriceTimePriority(true) + mockMemClobKeeper := &mocks.MemClobKeeper{} + memclob.SetClobKeeper(mockMemClobKeeper) + mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() + canceledStatefulOrderIds := []types.OrderId{ constants.LongTermOrder_Alice_Num0_Id0_Clob0_Buy5_Price10_GTBT15.OrderId, constants.LongTermOrder_Alice_Num0_Id0_Clob0_Buy5_Price10_GTBT15.OrderId, @@ -368,6 +373,10 @@ func TestPurgeInvalidMemclobState_PanicsWhenNonStatefulOrderIsCanceled(t *testin ctx, _, _ := sdktest.NewSdkContextWithMultistore() ctx = ctx.WithIsCheckTx(true) memclob := NewMemClobPriceTimePriority(true) + mockMemClobKeeper := &mocks.MemClobKeeper{} + memclob.SetClobKeeper(mockMemClobKeeper) + mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() + shortTermOrderId := constants.Order_Alice_Num0_Id0_Clob2_Buy5_Price10_GTB15.OrderId require.PanicsWithValue( @@ -395,6 +404,10 @@ func TestPurgeInvalidMemclobState_PanicsWhenCalledWithDuplicateExpiredStatefulOr ctx = ctx.WithIsCheckTx(true) memclob := NewMemClobPriceTimePriority(true) + mockMemClobKeeper := &mocks.MemClobKeeper{} + memclob.SetClobKeeper(mockMemClobKeeper) + mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() + expiredStatefulOrderIds := []types.OrderId{ constants.LongTermOrder_Alice_Num0_Id0_Clob0_Buy5_Price10_GTBT15.OrderId, constants.LongTermOrder_Alice_Num0_Id0_Clob0_Buy5_Price10_GTBT15.OrderId, @@ -425,6 +438,10 @@ func TestPurgeInvalidMemclobState_PanicsWhenCalledWithShortTermExpiredStatefulOr ctx = ctx.WithIsCheckTx(true) memclob := NewMemClobPriceTimePriority(true) + mockMemClobKeeper := &mocks.MemClobKeeper{} + memclob.SetClobKeeper(mockMemClobKeeper) + mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() + shortTermOrderId := constants.Order_Alice_Num0_Id0_Clob2_Buy5_Price10_GTB15.OrderId require.PanicsWithValue( diff --git a/protocol/x/clob/memclob/memclob_remove_order_test.go b/protocol/x/clob/memclob/memclob_remove_order_test.go index 7d3aa4e0b9..15c1d88803 100644 --- a/protocol/x/clob/memclob/memclob_remove_order_test.go +++ b/protocol/x/clob/memclob/memclob_remove_order_test.go @@ -330,6 +330,7 @@ func TestRemoveOrderIfFilled(t *testing.T) { memClobKeeper.On("AddOrderToOrderbookCollatCheck", mock.Anything, mock.Anything, mock.Anything). Return(true, make(map[satypes.SubaccountId]satypes.UpdateResult)) memClobKeeper.On("ValidateSubaccountEquityTierLimitForNewOrder", mock.Anything, mock.Anything).Return(nil) + memClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() // Set initial fill amount to `0` for all orders. initialCall := memClobKeeper.On("GetOrderFillAmount", mock.Anything, mock.Anything). From 9cc7ba84e609f7361c27c5d39f63b08901acb2ca Mon Sep 17 00:00:00 2001 From: Jay Yu <103467857+jayy04@users.noreply.github.com> Date: Wed, 20 Mar 2024 14:26:34 -0400 Subject: [PATCH 3/4] comments --- protocol/app/app.go | 5 +- protocol/mocks/MemClob.go | 60 +++++++++++++++++++ protocol/x/clob/abci.go | 12 ++++ protocol/x/clob/memclob/memclob.go | 51 ++++++++-------- .../x/clob/memclob/memclob_grpc_streaming.go | 10 ++-- protocol/x/clob/types/memclob.go | 12 ++++ 6 files changed, 119 insertions(+), 31 deletions(-) diff --git a/protocol/app/app.go b/protocol/app/app.go index ae84165c57..be2cd02ceb 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -994,9 +994,8 @@ func New( clobFlags := clobflags.GetClobFlagValuesFromOptions(appOpts) logger.Info("Parsed CLOB flags", "Flags", clobFlags) - memClob := clobmodulememclob.NewMemClobPriceTimePriority( - app.IndexerEventManager.Enabled() || app.GrpcStreamingManager.Enabled(), - ) + memClob := clobmodulememclob.NewMemClobPriceTimePriority(app.IndexerEventManager.Enabled()) + memClob.SetGenerateOrderbookUpdates(app.GrpcStreamingManager.Enabled()) app.ClobKeeper = clobmodulekeeper.NewKeeper( appCodec, diff --git a/protocol/mocks/MemClob.go b/protocol/mocks/MemClob.go index cf2144e923..143319fa30 100644 --- a/protocol/mocks/MemClob.go +++ b/protocol/mocks/MemClob.go @@ -319,6 +319,66 @@ func (_m *MemClob) GetOrderRemainingAmount(ctx types.Context, order clobtypes.Or return r0, r1 } +// GetOrderbookUpdatesForOrderPlacement provides a mock function with given fields: ctx, order +func (_m *MemClob) GetOrderbookUpdatesForOrderPlacement(ctx types.Context, order clobtypes.Order) *clobtypes.OffchainUpdates { + ret := _m.Called(ctx, order) + + if len(ret) == 0 { + panic("no return value specified for GetOrderbookUpdatesForOrderPlacement") + } + + var r0 *clobtypes.OffchainUpdates + if rf, ok := ret.Get(0).(func(types.Context, clobtypes.Order) *clobtypes.OffchainUpdates); ok { + r0 = rf(ctx, order) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clobtypes.OffchainUpdates) + } + } + + return r0 +} + +// GetOrderbookUpdatesForOrderRemoval provides a mock function with given fields: ctx, orderId +func (_m *MemClob) GetOrderbookUpdatesForOrderRemoval(ctx types.Context, orderId clobtypes.OrderId) *clobtypes.OffchainUpdates { + ret := _m.Called(ctx, orderId) + + if len(ret) == 0 { + panic("no return value specified for GetOrderbookUpdatesForOrderRemoval") + } + + var r0 *clobtypes.OffchainUpdates + if rf, ok := ret.Get(0).(func(types.Context, clobtypes.OrderId) *clobtypes.OffchainUpdates); ok { + r0 = rf(ctx, orderId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clobtypes.OffchainUpdates) + } + } + + return r0 +} + +// GetOrderbookUpdatesForOrderUpdate provides a mock function with given fields: ctx, orderId +func (_m *MemClob) GetOrderbookUpdatesForOrderUpdate(ctx types.Context, orderId clobtypes.OrderId) *clobtypes.OffchainUpdates { + ret := _m.Called(ctx, orderId) + + if len(ret) == 0 { + panic("no return value specified for GetOrderbookUpdatesForOrderUpdate") + } + + var r0 *clobtypes.OffchainUpdates + if rf, ok := ret.Get(0).(func(types.Context, clobtypes.OrderId) *clobtypes.OffchainUpdates); ok { + r0 = rf(ctx, orderId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clobtypes.OffchainUpdates) + } + } + + return r0 +} + // GetPricePremium provides a mock function with given fields: ctx, clobPair, params func (_m *MemClob) GetPricePremium(ctx types.Context, clobPair clobtypes.ClobPair, params perpetualstypes.GetPricePremiumParams) (int32, error) { ret := _m.Called(ctx, clobPair, params) diff --git a/protocol/x/clob/abci.go b/protocol/x/clob/abci.go index a2201e3035..27f4492589 100644 --- a/protocol/x/clob/abci.go +++ b/protocol/x/clob/abci.go @@ -166,6 +166,18 @@ func PrepareCheckState( offchainUpdates, ) + // For orders that are filled in the last block, send an orderbook update to the grpc streams. + if keeper.GetGrpcStreamingManager().Enabled() { + allUpdates := types.NewOffchainUpdates() + for _, orderId := range processProposerMatchesEvents.OrderIdsFilledInLastBlock { + if _, exists := keeper.MemClob.GetOrder(ctx, orderId); exists { + orderbookUpdate := keeper.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId) + allUpdates.Append(orderbookUpdate) + } + } + keeper.SendOrderbookUpdates(allUpdates, false) + } + // 3. Place all stateful order placements included in the last block on the memclob. // Note telemetry is measured outside of the function call because `PlaceStatefulOrdersFromLastBlock` // is called within `PlaceConditionalOrdersTriggeredInLastBlock`. diff --git a/protocol/x/clob/memclob/memclob.go b/protocol/x/clob/memclob/memclob.go index 7cbcc4b471..7206169d19 100644 --- a/protocol/x/clob/memclob/memclob.go +++ b/protocol/x/clob/memclob/memclob.go @@ -47,6 +47,9 @@ type MemClobPriceTimePriority struct { // ---- Fields for determining if off-chain update messages should be generated ---- generateOffchainUpdates bool + + // ---- Fields for determining if orderbook updates should be generated ---- + generateOrderbookUpdates bool } type OrderWithRemovalReason struct { @@ -58,10 +61,11 @@ func NewMemClobPriceTimePriority( generateOffchainUpdates bool, ) *MemClobPriceTimePriority { return &MemClobPriceTimePriority{ - openOrders: newMemclobOpenOrders(), - cancels: newMemclobCancels(), - operationsToPropose: *types.NewOperationsToPropose(), - generateOffchainUpdates: generateOffchainUpdates, + openOrders: newMemclobOpenOrders(), + cancels: newMemclobCancels(), + operationsToPropose: *types.NewOperationsToPropose(), + generateOffchainUpdates: generateOffchainUpdates, + generateOrderbookUpdates: false, } } @@ -73,6 +77,11 @@ func (m *MemClobPriceTimePriority) SetClobKeeper(clobKeeper types.MemClobKeeper) m.clobKeeper = clobKeeper } +// SetGenerateOffchainUpdates sets the `generateOffchainUpdates` field of the MemClob. +func (m *MemClobPriceTimePriority) SetGenerateOrderbookUpdates(generateOrderbookUpdates bool) { + m.generateOrderbookUpdates = generateOrderbookUpdates +} + // CancelOrder removes a Short-Term order by `OrderId` (if it exists) from all order-related data structures // in the memclob. This method manages only Short-Term cancellations. For stateful cancellations, see // `msg_server_cancel_orders.go`. @@ -1216,18 +1225,6 @@ func (m *MemClobPriceTimePriority) PurgeInvalidMemclobState( m.RemoveOrderIfFilled(ctx, orderId) } - // For order matches in the last block, send an orderbook update - // to the grpc streams. - // Note that fully filled orders are removed from the orderbook in `RemoveOrderIfFilled` above. - allUpdates := types.NewOffchainUpdates() - for _, orderId := range filledOrderIds { - if m.openOrders.hasOrder(ctx, orderId) { - orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, orderId) - allUpdates.Append(orderbookUpdate) - } - } - m.clobKeeper.SendOrderbookUpdates(allUpdates, false) - // Remove all canceled stateful order IDs from the memclob if they exist. // If the slice has non-stateful order IDs or contains duplicates, panic. if lib.ContainsDuplicates(canceledStatefulOrderIds) { @@ -1523,9 +1520,11 @@ func (m *MemClobPriceTimePriority) mustAddOrderToOrderbook( m.openOrders.mustAddOrderToOrderbook(ctx, newOrder, forceToFrontOfLevel) - // Send an orderbook update to grpc streams. - orderbookUpdate := m.GetOrderbookUpdatesForOrderPlacement(ctx, newOrder) - m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) + if m.generateOrderbookUpdates { + // Send an orderbook update to grpc streams. + orderbookUpdate := m.GetOrderbookUpdatesForOrderPlacement(ctx, newOrder) + m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) + } } // mustPerformTakerOrderMatching performs matching using the provided taker order while the order @@ -1961,9 +1960,11 @@ func (m *MemClobPriceTimePriority) mustRemoveOrder( m.operationsToPropose.RemoveShortTermOrderTxBytes(order) } - // Send an orderbook update to grpc streams. - orderbookUpdate := m.GetOrderbookUpdatesForOrderRemoval(ctx, order.OrderId) - m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) + if m.generateOrderbookUpdates { + // Send an orderbook update to grpc streams. + orderbookUpdate := m.GetOrderbookUpdatesForOrderRemoval(ctx, order.OrderId) + m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) + } } // mustUpdateOrderbookStateWithMatchedMakerOrder updates the orderbook with a matched maker order. @@ -1992,8 +1993,10 @@ func (m *MemClobPriceTimePriority) mustUpdateOrderbookStateWithMatchedMakerOrder // If the order was fully filled, an orderbook update for removal was already sent in `mustRemoveOrder`. // If the order was partially filled, send an orderbook update for the order's new total filled amount. if newTotalFilledAmount < makerOrderBaseQuantums { - orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, makerOrder.OrderId) - m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) + if m.generateOrderbookUpdates { + orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, makerOrder.OrderId) + m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) + } } if m.generateOffchainUpdates { diff --git a/protocol/x/clob/memclob/memclob_grpc_streaming.go b/protocol/x/clob/memclob/memclob_grpc_streaming.go index 7eee3594b7..4bce8ec772 100644 --- a/protocol/x/clob/memclob/memclob_grpc_streaming.go +++ b/protocol/x/clob/memclob/memclob_grpc_streaming.go @@ -57,8 +57,8 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot( } // GetOrderbookUpdatesForOrderPlacement returns a place order offchain message and -// a update order offchain message used to construct an order for -// the orderbook snapshot grpc stream. +// a update order offchain message used to add an order for +// the orderbook grpc stream. func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderPlacement( ctx sdk.Context, order types.Order, @@ -89,7 +89,8 @@ func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderPlacement( return offchainUpdates } -// GetOrderbookUpdatesForOrderRemoval returns a place order offchain message and +// GetOrderbookUpdatesForOrderRemoval returns a remove order offchain message +// used to remove an order for the orderbook grpc stream. func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderRemoval( ctx sdk.Context, orderId types.OrderId, @@ -106,7 +107,8 @@ func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderRemoval( return offchainUpdates } -// GetOrderbookUpdatesForOrderRemoval returns a place order offchain message and +// GetOrderbookUpdatesForOrderUpdate returns an update order offchain message +// used to update an order for the orderbook grpc stream. func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderUpdate( ctx sdk.Context, orderId types.OrderId, diff --git a/protocol/x/clob/types/memclob.go b/protocol/x/clob/types/memclob.go index feb4ab6f49..3196320e39 100644 --- a/protocol/x/clob/types/memclob.go +++ b/protocol/x/clob/types/memclob.go @@ -137,4 +137,16 @@ type MemClob interface { ctx sdk.Context, clobPairId ClobPairId, ) (offchainUpdates *OffchainUpdates) + GetOrderbookUpdatesForOrderPlacement( + ctx sdk.Context, + order Order, + ) (offchainUpdates *OffchainUpdates) + GetOrderbookUpdatesForOrderRemoval( + ctx sdk.Context, + orderId OrderId, + ) (offchainUpdates *OffchainUpdates) + GetOrderbookUpdatesForOrderUpdate( + ctx sdk.Context, + orderId OrderId, + ) (offchainUpdates *OffchainUpdates) } From 7b5297cbd4a377d2560e48da24f2032252c6decc Mon Sep 17 00:00:00 2001 From: Jay Yu <103467857+jayy04@users.noreply.github.com> Date: Wed, 20 Mar 2024 18:55:26 -0400 Subject: [PATCH 4/4] update --- protocol/x/clob/keeper/keeper.go | 4 ++++ protocol/x/clob/memclob/memclob.go | 15 ++++++--------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index 612c59fadd..e6e34619ac 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -241,5 +241,9 @@ func (k Keeper) SendOrderbookUpdates( offchainUpdates *types.OffchainUpdates, snapshot bool, ) { + if len(offchainUpdates.Messages) == 0 { + return + } + k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, snapshot) } diff --git a/protocol/x/clob/memclob/memclob.go b/protocol/x/clob/memclob/memclob.go index 7206169d19..11579c6488 100644 --- a/protocol/x/clob/memclob/memclob.go +++ b/protocol/x/clob/memclob/memclob.go @@ -1982,6 +1982,12 @@ func (m *MemClobPriceTimePriority) mustUpdateOrderbookStateWithMatchedMakerOrder panic("Total filled size of maker order greater than the order size") } + // Send an orderbook update for the order's new total filled amount. + if m.generateOrderbookUpdates { + orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, makerOrder.OrderId) + m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) + } + // If the order is fully filled, remove it from the orderbook. // Note we shouldn't remove Short-Term order hashes from `ShortTermOrderTxBytes` here since // the order was matched. @@ -1990,15 +1996,6 @@ func (m *MemClobPriceTimePriority) mustUpdateOrderbookStateWithMatchedMakerOrder m.mustRemoveOrder(ctx, makerOrderId) } - // If the order was fully filled, an orderbook update for removal was already sent in `mustRemoveOrder`. - // If the order was partially filled, send an orderbook update for the order's new total filled amount. - if newTotalFilledAmount < makerOrderBaseQuantums { - if m.generateOrderbookUpdates { - orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, makerOrder.OrderId) - m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) - } - } - if m.generateOffchainUpdates { // Send an off-chain update message to the indexer to update the total filled size of the maker // order.