Skip to content

Commit

Permalink
[CT-700] separate indexer and grpc streaming events (#1209)
Browse files Browse the repository at this point in the history
* [CT-700] separate indexer and grpc streaming events

* fix tests

* comments

* update
  • Loading branch information
jayy04 authored Mar 20, 2024
1 parent 45c91d7 commit 75937e1
Show file tree
Hide file tree
Showing 17 changed files with 241 additions and 32 deletions.
5 changes: 2 additions & 3 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,9 +1005,8 @@ func New(
clobFlags := clobflags.GetClobFlagValuesFromOptions(appOpts)
logger.Info("Parsed CLOB flags", "Flags", clobFlags)

memClob := clobmodulememclob.NewMemClobPriceTimePriority(
app.IndexerEventManager.Enabled() || app.GrpcStreamingManager.Enabled(),
)
memClob := clobmodulememclob.NewMemClobPriceTimePriority(app.IndexerEventManager.Enabled())
memClob.SetGenerateOrderbookUpdates(app.GrpcStreamingManager.Enabled())

app.ClobKeeper = clobmodulekeeper.NewKeeper(
appCodec,
Expand Down
5 changes: 5 additions & 0 deletions protocol/mocks/ClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 60 additions & 0 deletions protocol/mocks/MemClob.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions protocol/mocks/MemClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 11 additions & 9 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,21 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
// Send updates to subscribers.
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
updatesToSend := make([]ocutypes.OffChainUpdateV1, 0)
for _, clobPairId := range subscription.clobPairIds {
if updates, ok := v1updates[clobPairId]; ok {
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updates,
Snapshot: snapshot,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
break
}
updatesToSend = append(updatesToSend, updates...)
}
}

if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updatesToSend,
Snapshot: snapshot,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
}
}

// Clean up subscriptions that have been closed.
Expand Down
6 changes: 6 additions & 0 deletions protocol/testutil/memclob/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,9 @@ func (f *FakeMemClobKeeper) ValidateSubaccountEquityTierLimitForStatefulOrder(
func (f *FakeMemClobKeeper) Logger(ctx sdk.Context) log.Logger {
return ctx.Logger()
}

func (f *FakeMemClobKeeper) SendOrderbookUpdates(
offchainUpdates *types.OffchainUpdates,
snapshot bool,
) {
}
12 changes: 12 additions & 0 deletions protocol/x/clob/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,18 @@ func PrepareCheckState(
offchainUpdates,
)

// For orders that are filled in the last block, send an orderbook update to the grpc streams.
if keeper.GetGrpcStreamingManager().Enabled() {
allUpdates := types.NewOffchainUpdates()
for _, orderId := range processProposerMatchesEvents.OrderIdsFilledInLastBlock {
if _, exists := keeper.MemClob.GetOrder(ctx, orderId); exists {
orderbookUpdate := keeper.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId)
allUpdates.Append(orderbookUpdate)
}
}
keeper.SendOrderbookUpdates(allUpdates, false)
}

// 3. Place all stateful order placements included in the last block on the memclob.
// Note telemetry is measured outside of the function call because `PlaceStatefulOrdersFromLastBlock`
// is called within `PlaceConditionalOrdersTriggeredInLastBlock`.
Expand Down
14 changes: 13 additions & 1 deletion protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,5 +233,17 @@ func (k Keeper) InitializeNewGrpcStreams(ctx sdk.Context) {
allUpdates.Append(update)
}

streamingManager.SendOrderbookUpdates(allUpdates, true)
k.SendOrderbookUpdates(allUpdates, true)
}

// SendOrderbookUpdates sends the offchain updates to the gRPC streaming manager.
func (k Keeper) SendOrderbookUpdates(
offchainUpdates *types.OffchainUpdates,
snapshot bool,
) {
if len(offchainUpdates.Messages) == 0 {
return
}

k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, snapshot)
}
2 changes: 0 additions & 2 deletions protocol/x/clob/keeper/orders.go
Original file line number Diff line number Diff line change
Expand Up @@ -1310,8 +1310,6 @@ func (k Keeper) SendOffchainMessages(
}
k.GetIndexerEventManager().SendOffchainData(update)
}

k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, false)
}

// getFillQuoteQuantums returns the total fillAmount price in quote quantums based on the maker subticks.
Expand Down
35 changes: 31 additions & 4 deletions protocol/x/clob/memclob/memclob.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type MemClobPriceTimePriority struct {

// ---- Fields for determining if off-chain update messages should be generated ----
generateOffchainUpdates bool

// ---- Fields for determining if orderbook updates should be generated ----
generateOrderbookUpdates bool
}

type OrderWithRemovalReason struct {
Expand All @@ -58,10 +61,11 @@ func NewMemClobPriceTimePriority(
generateOffchainUpdates bool,
) *MemClobPriceTimePriority {
return &MemClobPriceTimePriority{
openOrders: newMemclobOpenOrders(),
cancels: newMemclobCancels(),
operationsToPropose: *types.NewOperationsToPropose(),
generateOffchainUpdates: generateOffchainUpdates,
openOrders: newMemclobOpenOrders(),
cancels: newMemclobCancels(),
operationsToPropose: *types.NewOperationsToPropose(),
generateOffchainUpdates: generateOffchainUpdates,
generateOrderbookUpdates: false,
}
}

Expand All @@ -73,6 +77,11 @@ func (m *MemClobPriceTimePriority) SetClobKeeper(clobKeeper types.MemClobKeeper)
m.clobKeeper = clobKeeper
}

// SetGenerateOffchainUpdates sets the `generateOffchainUpdates` field of the MemClob.
func (m *MemClobPriceTimePriority) SetGenerateOrderbookUpdates(generateOrderbookUpdates bool) {
m.generateOrderbookUpdates = generateOrderbookUpdates
}

// CancelOrder removes a Short-Term order by `OrderId` (if it exists) from all order-related data structures
// in the memclob. This method manages only Short-Term cancellations. For stateful cancellations, see
// `msg_server_cancel_orders.go`.
Expand Down Expand Up @@ -1510,6 +1519,12 @@ func (m *MemClobPriceTimePriority) mustAddOrderToOrderbook(
}

m.openOrders.mustAddOrderToOrderbook(ctx, newOrder, forceToFrontOfLevel)

if m.generateOrderbookUpdates {
// Send an orderbook update to grpc streams.
orderbookUpdate := m.GetOrderbookUpdatesForOrderPlacement(ctx, newOrder)
m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false)
}
}

// mustPerformTakerOrderMatching performs matching using the provided taker order while the order
Expand Down Expand Up @@ -1944,6 +1959,12 @@ func (m *MemClobPriceTimePriority) mustRemoveOrder(
!m.operationsToPropose.IsOrderPlacementInOperationsQueue(order) {
m.operationsToPropose.RemoveShortTermOrderTxBytes(order)
}

if m.generateOrderbookUpdates {
// Send an orderbook update to grpc streams.
orderbookUpdate := m.GetOrderbookUpdatesForOrderRemoval(ctx, order.OrderId)
m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false)
}
}

// mustUpdateOrderbookStateWithMatchedMakerOrder updates the orderbook with a matched maker order.
Expand All @@ -1961,6 +1982,12 @@ func (m *MemClobPriceTimePriority) mustUpdateOrderbookStateWithMatchedMakerOrder
panic("Total filled size of maker order greater than the order size")
}

// Send an orderbook update for the order's new total filled amount.
if m.generateOrderbookUpdates {
orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, makerOrder.OrderId)
m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false)
}

// If the order is fully filled, remove it from the orderbook.
// Note we shouldn't remove Short-Term order hashes from `ShortTermOrderTxBytes` here since
// the order was matched.
Expand Down
54 changes: 48 additions & 6 deletions protocol/x/clob/memclob/memclob_grpc_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package memclob
import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates"
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
indexersharedtypes "github.com/dydxprotocol/v4-chain/protocol/indexer/shared/types"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)
Expand All @@ -27,7 +29,7 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot(
level.LevelOrders.Front.Each(
func(order types.ClobOrder) {
offchainUpdates.Append(
m.GetOffchainUpdatesForOrder(ctx, order.Order),
m.GetOrderbookUpdatesForOrderPlacement(ctx, order.Order),
)
},
)
Expand All @@ -44,7 +46,7 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot(
level.LevelOrders.Front.Each(
func(order types.ClobOrder) {
offchainUpdates.Append(
m.GetOffchainUpdatesForOrder(ctx, order.Order),
m.GetOrderbookUpdatesForOrderPlacement(ctx, order.Order),
)
},
)
Expand All @@ -54,10 +56,10 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot(
return offchainUpdates
}

// GetOffchainUpdatesForOrder returns a place order offchain message and
// a update order offchain message used to construct an order for
// the orderbook snapshot grpc stream.
func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrder(
// GetOrderbookUpdatesForOrderPlacement returns a place order offchain message and
// a update order offchain message used to add an order for
// the orderbook grpc stream.
func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderPlacement(
ctx sdk.Context,
order types.Order,
) (offchainUpdates *types.OffchainUpdates) {
Expand Down Expand Up @@ -86,3 +88,43 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrder(

return offchainUpdates
}

// GetOrderbookUpdatesForOrderRemoval returns a remove order offchain message
// used to remove an order for the orderbook grpc stream.
func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderRemoval(
ctx sdk.Context,
orderId types.OrderId,
) (offchainUpdates *types.OffchainUpdates) {
offchainUpdates = types.NewOffchainUpdates()
if message, success := off_chain_updates.CreateOrderRemoveMessageWithReason(
ctx,
orderId,
indexersharedtypes.OrderRemovalReason_ORDER_REMOVAL_REASON_UNSPECIFIED,
ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_BEST_EFFORT_CANCELED,
); success {
offchainUpdates.AddRemoveMessage(orderId, message)
}
return offchainUpdates
}

// GetOrderbookUpdatesForOrderUpdate returns an update order offchain message
// used to update an order for the orderbook grpc stream.
func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderUpdate(
ctx sdk.Context,
orderId types.OrderId,
) (offchainUpdates *types.OffchainUpdates) {
offchainUpdates = types.NewOffchainUpdates()

// Get the current fill amount of the order.
fillAmount := m.GetOrderFilledAmount(ctx, orderId)

// Generate an update message updating the total filled amount of order.
if message, success := off_chain_updates.CreateOrderUpdateMessage(
ctx,
orderId,
fillAmount,
); success {
offchainUpdates.AddUpdateMessage(orderId, message)
}
return offchainUpdates
}
14 changes: 8 additions & 6 deletions protocol/x/clob/memclob/memclob_grpc_streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Buy(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(false, satypes.BaseQuantums(0), uint32(0))
clobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return()

memclob := NewMemClobPriceTimePriority(false)
memclob.SetClobKeeper(clobKeeper)
Expand All @@ -44,9 +45,9 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Buy(t *testing.T) {

expected := types.NewOffchainUpdates()
// Buy orders are in descending order.
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1]))
expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[2]))
expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[0]))
expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[1]))

require.Equal(t, expected, offchainUpdates)
}
Expand All @@ -60,6 +61,7 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Sell(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(false, satypes.BaseQuantums(0), uint32(0))
clobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return()

memclob := NewMemClobPriceTimePriority(false)
memclob.SetClobKeeper(clobKeeper)
Expand All @@ -83,9 +85,9 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Sell(t *testing.T) {

expected := types.NewOffchainUpdates()
// Sell orders are in ascending order.
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0]))
expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[1]))
expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[2]))
expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[0]))

require.Equal(t, expected, offchainUpdates)
}
Loading

0 comments on commit 75937e1

Please sign in to comment.