Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-700] separate indexer and grpc streaming events #1209

Merged
merged 4 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,9 +994,8 @@ func New(
clobFlags := clobflags.GetClobFlagValuesFromOptions(appOpts)
logger.Info("Parsed CLOB flags", "Flags", clobFlags)

memClob := clobmodulememclob.NewMemClobPriceTimePriority(
app.IndexerEventManager.Enabled() || app.GrpcStreamingManager.Enabled(),
)
memClob := clobmodulememclob.NewMemClobPriceTimePriority(app.IndexerEventManager.Enabled())
memClob.SetGenerateOrderbookUpdates(app.GrpcStreamingManager.Enabled())
Comment on lines +997 to +998
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably also get rid to the app.IndexerEventManager.Enabled() in the builder method and use the Set... method instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hard to add arguments to the constructor method since we have a ton of tests that need to be modified


app.ClobKeeper = clobmodulekeeper.NewKeeper(
appCodec,
Comment on lines 994 to 1001
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 NOTE
This review was outside the diff hunks, and no overlapping diff hunk was found. Original lines [726-726]

The gRPC server is initialized without specifying SSL/TLS credentials, which could expose the server to security vulnerabilities such as man-in-the-middle attacks. It's crucial to secure the gRPC connection by including credentials derived from an SSL certificate. You can create credentials using credentials.NewServerTLSFromFile("cert.pem", "cert.key") and include them in the server options.

- app.Server = daemonserver.NewServer(logger, grpc.NewServer(), &daemontypes.FileHandlerImpl{}, daemonFlags.Shared.SocketAddress)
+ creds, err := credentials.NewServerTLSFromFile("cert.pem", "cert.key")
+ if err != nil {
+     panic(err)
+ }
+ app.Server = daemonserver.NewServer(logger, grpc.NewServer(grpc.Creds(creds)), &daemontypes.FileHandlerImpl{}, daemonFlags.Shared.SocketAddress)

Expand Down
5 changes: 5 additions & 0 deletions protocol/mocks/ClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 60 additions & 0 deletions protocol/mocks/MemClob.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions protocol/mocks/MemClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 11 additions & 9 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,21 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
// Send updates to subscribers.
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
updatesToSend := make([]ocutypes.OffChainUpdateV1, 0)
for _, clobPairId := range subscription.clobPairIds {
if updates, ok := v1updates[clobPairId]; ok {
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updates,
Snapshot: snapshot,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
break
}
updatesToSend = append(updatesToSend, updates...)
}
}

if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updatesToSend,
Snapshot: snapshot,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
}
Comment on lines +113 to +120
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send one snapshot for all updates instead of one snapshot / clob pair

}

// Clean up subscriptions that have been closed.
Expand Down
6 changes: 6 additions & 0 deletions protocol/testutil/memclob/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,9 @@ func (f *FakeMemClobKeeper) ValidateSubaccountEquityTierLimitForStatefulOrder(
func (f *FakeMemClobKeeper) Logger(ctx sdk.Context) log.Logger {
return ctx.Logger()
}

func (f *FakeMemClobKeeper) SendOrderbookUpdates(
offchainUpdates *types.OffchainUpdates,
snapshot bool,
) {
}
12 changes: 12 additions & 0 deletions protocol/x/clob/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,18 @@ func PrepareCheckState(
offchainUpdates,
)

// For orders that are filled in the last block, send an orderbook update to the grpc streams.
if keeper.GetGrpcStreamingManager().Enabled() {
allUpdates := types.NewOffchainUpdates()
for _, orderId := range processProposerMatchesEvents.OrderIdsFilledInLastBlock {
if _, exists := keeper.MemClob.GetOrder(ctx, orderId); exists {
orderbookUpdate := keeper.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId)
allUpdates.Append(orderbookUpdate)
}
}
keeper.SendOrderbookUpdates(allUpdates, false)
}
Comment on lines +169 to +179
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes from lines 169 to 179 introduce a loop to send orderbook updates for orders filled in the last block. Consider the following improvements:

  • Add error handling for the retrieval of orderbook updates in case GetOrderbookUpdatesForOrderUpdate fails.
  • Evaluate the performance impact of this loop, especially in scenarios with a high volume of orders. Consider batching updates or other optimization strategies if necessary.


// 3. Place all stateful order placements included in the last block on the memclob.
// Note telemetry is measured outside of the function call because `PlaceStatefulOrdersFromLastBlock`
// is called within `PlaceConditionalOrdersTriggeredInLastBlock`.
Expand Down
14 changes: 13 additions & 1 deletion protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,5 +233,17 @@ func (k Keeper) InitializeNewGrpcStreams(ctx sdk.Context) {
allUpdates.Append(update)
}

streamingManager.SendOrderbookUpdates(allUpdates, true)
k.SendOrderbookUpdates(allUpdates, true)
}

// SendOrderbookUpdates sends the offchain updates to the gRPC streaming manager.
func (k Keeper) SendOrderbookUpdates(
offchainUpdates *types.OffchainUpdates,
snapshot bool,
) {
if len(offchainUpdates.Messages) == 0 {
return
}

k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, snapshot)
}
2 changes: 0 additions & 2 deletions protocol/x/clob/keeper/orders.go
Original file line number Diff line number Diff line change
Expand Up @@ -1310,8 +1310,6 @@ func (k Keeper) SendOffchainMessages(
}
k.GetIndexerEventManager().SendOffchainData(update)
}

k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, false)
Comment on lines -1313 to -1314
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decouple grpc events and indexer events

}

// getFillQuoteQuantums returns the total fillAmount price in quote quantums based on the maker subticks.
Expand Down
35 changes: 31 additions & 4 deletions protocol/x/clob/memclob/memclob.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type MemClobPriceTimePriority struct {

// ---- Fields for determining if off-chain update messages should be generated ----
generateOffchainUpdates bool

// ---- Fields for determining if orderbook updates should be generated ----
generateOrderbookUpdates bool
}

type OrderWithRemovalReason struct {
Expand All @@ -58,10 +61,11 @@ func NewMemClobPriceTimePriority(
generateOffchainUpdates bool,
) *MemClobPriceTimePriority {
return &MemClobPriceTimePriority{
openOrders: newMemclobOpenOrders(),
cancels: newMemclobCancels(),
operationsToPropose: *types.NewOperationsToPropose(),
generateOffchainUpdates: generateOffchainUpdates,
openOrders: newMemclobOpenOrders(),
cancels: newMemclobCancels(),
operationsToPropose: *types.NewOperationsToPropose(),
generateOffchainUpdates: generateOffchainUpdates,
generateOrderbookUpdates: false,
}
}

Expand All @@ -73,6 +77,11 @@ func (m *MemClobPriceTimePriority) SetClobKeeper(clobKeeper types.MemClobKeeper)
m.clobKeeper = clobKeeper
}

// SetGenerateOffchainUpdates sets the `generateOffchainUpdates` field of the MemClob.
func (m *MemClobPriceTimePriority) SetGenerateOrderbookUpdates(generateOrderbookUpdates bool) {
m.generateOrderbookUpdates = generateOrderbookUpdates
}

// CancelOrder removes a Short-Term order by `OrderId` (if it exists) from all order-related data structures
// in the memclob. This method manages only Short-Term cancellations. For stateful cancellations, see
// `msg_server_cancel_orders.go`.
Expand Down Expand Up @@ -1510,6 +1519,12 @@ func (m *MemClobPriceTimePriority) mustAddOrderToOrderbook(
}

m.openOrders.mustAddOrderToOrderbook(ctx, newOrder, forceToFrontOfLevel)

if m.generateOrderbookUpdates {
// Send an orderbook update to grpc streams.
orderbookUpdate := m.GetOrderbookUpdatesForOrderPlacement(ctx, newOrder)
m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false)
}
}

// mustPerformTakerOrderMatching performs matching using the provided taker order while the order
Expand Down Expand Up @@ -1944,6 +1959,12 @@ func (m *MemClobPriceTimePriority) mustRemoveOrder(
!m.operationsToPropose.IsOrderPlacementInOperationsQueue(order) {
m.operationsToPropose.RemoveShortTermOrderTxBytes(order)
}

if m.generateOrderbookUpdates {
// Send an orderbook update to grpc streams.
orderbookUpdate := m.GetOrderbookUpdatesForOrderRemoval(ctx, order.OrderId)
m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false)
}
}

// mustUpdateOrderbookStateWithMatchedMakerOrder updates the orderbook with a matched maker order.
Expand All @@ -1961,6 +1982,12 @@ func (m *MemClobPriceTimePriority) mustUpdateOrderbookStateWithMatchedMakerOrder
panic("Total filled size of maker order greater than the order size")
}

// Send an orderbook update for the order's new total filled amount.
if m.generateOrderbookUpdates {
orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, makerOrder.OrderId)
m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false)
}

// If the order is fully filled, remove it from the orderbook.
// Note we shouldn't remove Short-Term order hashes from `ShortTermOrderTxBytes` here since
// the order was matched.
Expand Down
54 changes: 48 additions & 6 deletions protocol/x/clob/memclob/memclob_grpc_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package memclob
import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates"
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
indexersharedtypes "github.com/dydxprotocol/v4-chain/protocol/indexer/shared/types"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)
Expand All @@ -27,7 +29,7 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot(
level.LevelOrders.Front.Each(
func(order types.ClobOrder) {
offchainUpdates.Append(
m.GetOffchainUpdatesForOrder(ctx, order.Order),
m.GetOrderbookUpdatesForOrderPlacement(ctx, order.Order),
)
},
)
Expand All @@ -44,7 +46,7 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot(
level.LevelOrders.Front.Each(
func(order types.ClobOrder) {
offchainUpdates.Append(
m.GetOffchainUpdatesForOrder(ctx, order.Order),
m.GetOrderbookUpdatesForOrderPlacement(ctx, order.Order),
)
},
)
Expand All @@ -54,10 +56,10 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot(
return offchainUpdates
}

// GetOffchainUpdatesForOrder returns a place order offchain message and
// a update order offchain message used to construct an order for
// the orderbook snapshot grpc stream.
func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrder(
// GetOrderbookUpdatesForOrderPlacement returns a place order offchain message and
// a update order offchain message used to add an order for
// the orderbook grpc stream.
func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderPlacement(
ctx sdk.Context,
order types.Order,
) (offchainUpdates *types.OffchainUpdates) {
Expand Down Expand Up @@ -86,3 +88,43 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrder(

return offchainUpdates
}

// GetOrderbookUpdatesForOrderRemoval returns a remove order offchain message
// used to remove an order for the orderbook grpc stream.
func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderRemoval(
Copy link
Contributor

@jonfung-dydx jonfung-dydx Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In MemclobPriceTimepriority object we store a bool that indicates if we should be emitting offchain events.

	// ---- Fields for determining if off-chain update messages should be generated ----
	generateOffchainUpdates bool

Can we do the same for orderbook updates? It would be nice to have all these no-op if this isn't a full node with streaming turned on.

ctx sdk.Context,
orderId types.OrderId,
) (offchainUpdates *types.OffchainUpdates) {
offchainUpdates = types.NewOffchainUpdates()
if message, success := off_chain_updates.CreateOrderRemoveMessageWithReason(
ctx,
orderId,
indexersharedtypes.OrderRemovalReason_ORDER_REMOVAL_REASON_UNSPECIFIED,
ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_BEST_EFFORT_CANCELED,
); success {
offchainUpdates.AddRemoveMessage(orderId, message)
}
return offchainUpdates
}

// GetOrderbookUpdatesForOrderUpdate returns an update order offchain message
// used to update an order for the orderbook grpc stream.
func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderUpdate(
ctx sdk.Context,
orderId types.OrderId,
) (offchainUpdates *types.OffchainUpdates) {
offchainUpdates = types.NewOffchainUpdates()

// Get the current fill amount of the order.
fillAmount := m.GetOrderFilledAmount(ctx, orderId)

// Generate an update message updating the total filled amount of order.
if message, success := off_chain_updates.CreateOrderUpdateMessage(
ctx,
orderId,
fillAmount,
); success {
offchainUpdates.AddUpdateMessage(orderId, message)
}
return offchainUpdates
}
14 changes: 8 additions & 6 deletions protocol/x/clob/memclob/memclob_grpc_streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Buy(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(false, satypes.BaseQuantums(0), uint32(0))
clobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return()

memclob := NewMemClobPriceTimePriority(false)
memclob.SetClobKeeper(clobKeeper)
Expand All @@ -44,9 +45,9 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Buy(t *testing.T) {

expected := types.NewOffchainUpdates()
// Buy orders are in descending order.
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1]))
expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[2]))
expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[0]))
expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[1]))

require.Equal(t, expected, offchainUpdates)
}
Expand All @@ -60,6 +61,7 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Sell(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(false, satypes.BaseQuantums(0), uint32(0))
clobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return()

memclob := NewMemClobPriceTimePriority(false)
memclob.SetClobKeeper(clobKeeper)
Expand All @@ -83,9 +85,9 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Sell(t *testing.T) {

expected := types.NewOffchainUpdates()
// Sell orders are in ascending order.
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2]))
expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0]))
expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[1]))
expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[2]))
expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[0]))

require.Equal(t, expected, offchainUpdates)
}
Loading
Loading