From 15d8847d6dc6c2269912f1478bb00d923f421a11 Mon Sep 17 00:00:00 2001 From: Jay Yu <103467857+jayy04@users.noreply.github.com> Date: Sat, 2 Mar 2024 11:19:29 -0500 Subject: [PATCH 1/3] [CT-644] instantiate grpc stream manager --- protocol/app/app.go | 27 ++++++++++++- .../streaming/grpc/grpc_streaming_manager.go | 38 +++++++++++++++++++ .../streaming/grpc/noop_streaming_manager.go | 36 ++++++++++++++++++ protocol/streaming/grpc/types/manager.go | 19 ++++++++++ protocol/testutil/keeper/clob.go | 2 + protocol/x/clob/keeper/keeper.go | 27 ++++++++----- 6 files changed, 138 insertions(+), 11 deletions(-) create mode 100644 protocol/streaming/grpc/grpc_streaming_manager.go create mode 100644 protocol/streaming/grpc/noop_streaming_manager.go create mode 100644 protocol/streaming/grpc/types/manager.go diff --git a/protocol/app/app.go b/protocol/app/app.go index 57626186ec..2ac4d17519 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -198,6 +198,10 @@ import ( "github.com/dydxprotocol/v4-chain/protocol/indexer" "github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager" "github.com/dydxprotocol/v4-chain/protocol/indexer/msgsender" + + // Grpc Streaming + streaming "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc" + streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" ) var ( @@ -298,8 +302,9 @@ type App struct { // module configurator configurator module.Configurator - IndexerEventManager indexer_manager.IndexerEventManager - Server *daemonserver.Server + IndexerEventManager indexer_manager.IndexerEventManager + GrpcStreamingManager streamingtypes.GrpcStreamingManager + Server *daemonserver.Server // startDaemons encapsulates the logic that starts all daemons and daemon services. This function contains a // closure of all relevant data structures that are shared with various keepers. Daemon services startup is @@ -679,6 +684,9 @@ func New( tkeys[indexer_manager.TransientStoreKey], indexerFlags.SendOffchainData, ) + + app.GrpcStreamingManager = getGrpcStreamingManagerFromOptions(appFlags, appOpts, logger) + timeProvider := &timelib.TimeProviderImpl{} app.EpochsKeeper = *epochsmodulekeeper.NewKeeper( @@ -976,6 +984,7 @@ func New( app.StatsKeeper, app.RewardsKeeper, app.IndexerEventManager, + app.GrpcStreamingManager, txConfig.TxDecoder(), clobFlags, rate_limit.NewPanicRateLimiter[*clobmoduletypes.MsgPlaceOrder](), @@ -1740,3 +1749,17 @@ func getIndexerFromOptions( } return indexerMessageSender, indexerFlags } + +// getGrpcStreamingManagerFromOptions returns an instance of a streamingtypes.GrpcStreamingManager from the specified +// options. This function will default to returning a no-op instance. +func getGrpcStreamingManagerFromOptions( + appFlags flags.Flags, + appOpts servertypes.AppOptions, + logger log.Logger, +) (manager streamingtypes.GrpcStreamingManager) { + // TODO(CT-625): add command line flags for full node streaming. + if appFlags.NonValidatingFullNode { + return streaming.NewGrpcStreamingManager() + } + return streaming.NewNoopGrpcStreamingManager() +} diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go new file mode 100644 index 0000000000..e3f40a23bb --- /dev/null +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -0,0 +1,38 @@ +package grpc + +import ( + "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" + clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" +) + +var _ types.GrpcStreamingManager = (*GrpcStreamingManagerImpl)(nil) + +// GrpcStreamingManager is an implementation for managing gRPC streaming subscriptions. +type GrpcStreamingManagerImpl struct { +} + +func NewGrpcStreamingManager() *GrpcStreamingManagerImpl { + return &GrpcStreamingManagerImpl{} +} + +func (sm *GrpcStreamingManagerImpl) Enabled() bool { + return true +} + +// Subscribe subscribes to the orderbook updates stream. +func (sm *GrpcStreamingManagerImpl) Subscribe( + req clobtypes.StreamOrderbookUpdatesRequest, + srv clobtypes.Query_StreamOrderbookUpdatesServer, +) ( + finished chan<- bool, + err error, +) { + return nil, nil +} + +// SendMessages groups messages by their clob pair ids and +// sends messages to the subscribers. +func (sm *GrpcStreamingManagerImpl) SendMessages( + msg *clobtypes.OffchainUpdateMessage, +) { +} diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go new file mode 100644 index 0000000000..1fa8dc2e17 --- /dev/null +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -0,0 +1,36 @@ +package grpc + +import ( + "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" + clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" +) + +var _ types.GrpcStreamingManager = (*NoopGrpcStreamingManager)(nil) + +// GrpcStreamingManager is an implementation for managing gRPC streaming subscriptions. +type NoopGrpcStreamingManager struct{} + +func NewNoopGrpcStreamingManager() *NoopGrpcStreamingManager { + return &NoopGrpcStreamingManager{} +} + +func (sm *NoopGrpcStreamingManager) Enabled() bool { + return false +} + +func (sm *NoopGrpcStreamingManager) Subscribe( + req clobtypes.StreamOrderbookUpdatesRequest, + srv clobtypes.Query_StreamOrderbookUpdatesServer, +) ( + finished chan<- bool, + err error, +) { + return nil, nil +} + +// SendMessages groups messages by their clob pair ids and +// sends messages to the subscribers. +func (sm *NoopGrpcStreamingManager) SendMessages( + msg *clobtypes.OffchainUpdateMessage, +) { +} diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go new file mode 100644 index 0000000000..393918a1d8 --- /dev/null +++ b/protocol/streaming/grpc/types/manager.go @@ -0,0 +1,19 @@ +package types + +import ( + clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" +) + +type GrpcStreamingManager interface { + Enabled() bool + + // L3+ Orderbook updates. + Subscribe( + req clobtypes.StreamOrderbookUpdatesRequest, + srv clobtypes.Query_StreamOrderbookUpdatesServer, + ) ( + finished chan<- bool, + err error, + ) + SendMessages(*clobtypes.OffchainUpdateMessage) +} diff --git a/protocol/testutil/keeper/clob.go b/protocol/testutil/keeper/clob.go index 30db24e498..506ebe114a 100644 --- a/protocol/testutil/keeper/clob.go +++ b/protocol/testutil/keeper/clob.go @@ -14,6 +14,7 @@ import ( "github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager" "github.com/dydxprotocol/v4-chain/protocol/lib" "github.com/dydxprotocol/v4-chain/protocol/mocks" + streaming "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc" clobtest "github.com/dydxprotocol/v4-chain/protocol/testutil/clob" "github.com/dydxprotocol/v4-chain/protocol/testutil/constants" asskeeper "github.com/dydxprotocol/v4-chain/protocol/x/assets/keeper" @@ -214,6 +215,7 @@ func createClobKeeper( statsKeeper, rewardsKeeper, indexerEventManager, + streaming.NewNoopGrpcStreamingManager(), constants.TestEncodingCfg.TxConfig.TxDecoder(), flags.GetDefaultClobFlags(), rate_limit.NewNoOpRateLimiter[*types.MsgPlaceOrder](), diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index e79827f1f1..13af8452b5 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -14,6 +14,7 @@ import ( "github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager" "github.com/dydxprotocol/v4-chain/protocol/lib" "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" + streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" flags "github.com/dydxprotocol/v4-chain/protocol/x/clob/flags" "github.com/dydxprotocol/v4-chain/protocol/x/clob/rate_limit" "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" @@ -31,16 +32,18 @@ type ( UntriggeredConditionalOrders map[types.ClobPairId]*UntriggeredConditionalOrders PerpetualIdToClobPairId map[uint32][]types.ClobPairId - subaccountsKeeper types.SubaccountsKeeper - assetsKeeper types.AssetsKeeper - bankKeeper types.BankKeeper - blockTimeKeeper types.BlockTimeKeeper - feeTiersKeeper types.FeeTiersKeeper - perpetualsKeeper types.PerpetualsKeeper - pricesKeeper types.PricesKeeper - statsKeeper types.StatsKeeper - rewardsKeeper types.RewardsKeeper + subaccountsKeeper types.SubaccountsKeeper + assetsKeeper types.AssetsKeeper + bankKeeper types.BankKeeper + blockTimeKeeper types.BlockTimeKeeper + feeTiersKeeper types.FeeTiersKeeper + perpetualsKeeper types.PerpetualsKeeper + pricesKeeper types.PricesKeeper + statsKeeper types.StatsKeeper + rewardsKeeper types.RewardsKeeper + indexerEventManager indexer_manager.IndexerEventManager + streamingManager streamingtypes.GrpcStreamingManager memStoreInitialized *atomic.Bool @@ -82,6 +85,7 @@ func NewKeeper( statsKeeper types.StatsKeeper, rewardsKeeper types.RewardsKeeper, indexerEventManager indexer_manager.IndexerEventManager, + grpcStreamingManager streamingtypes.GrpcStreamingManager, txDecoder sdk.TxDecoder, clobFlags flags.ClobFlags, placeOrderRateLimiter rate_limit.RateLimiter[*types.MsgPlaceOrder], @@ -107,6 +111,7 @@ func NewKeeper( statsKeeper: statsKeeper, rewardsKeeper: rewardsKeeper, indexerEventManager: indexerEventManager, + streamingManager: grpcStreamingManager, memStoreInitialized: &atomic.Bool{}, txDecoder: txDecoder, mevTelemetryConfig: MevTelemetryConfig{ @@ -136,6 +141,10 @@ func (k Keeper) GetIndexerEventManager() indexer_manager.IndexerEventManager { return k.indexerEventManager } +func (k Keeper) GetGrpcStreamingManager() streamingtypes.GrpcStreamingManager { + return k.streamingManager +} + func (k Keeper) Logger(ctx sdk.Context) log.Logger { return ctx.Logger().With( log.ModuleKey, "x/clob", From fe41e9770a8d3cb676688b3e7d65fc0c4c573b09 Mon Sep 17 00:00:00 2001 From: Jay Yu <103467857+jayy04@users.noreply.github.com> Date: Sat, 2 Mar 2024 11:34:31 -0500 Subject: [PATCH 2/3] update type --- protocol/streaming/grpc/grpc_streaming_manager.go | 8 ++++---- protocol/streaming/grpc/noop_streaming_manager.go | 7 ++----- protocol/streaming/grpc/types/manager.go | 2 +- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index e3f40a23bb..17ad1cddda 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -7,7 +7,7 @@ import ( var _ types.GrpcStreamingManager = (*GrpcStreamingManagerImpl)(nil) -// GrpcStreamingManager is an implementation for managing gRPC streaming subscriptions. +// GrpcStreamingManagerImpl is an implementation for managing gRPC streaming subscriptions. type GrpcStreamingManagerImpl struct { } @@ -30,9 +30,9 @@ func (sm *GrpcStreamingManagerImpl) Subscribe( return nil, nil } -// SendMessages groups messages by their clob pair ids and +// SendOrderbookUpdates groups updates by their clob pair ids and // sends messages to the subscribers. -func (sm *GrpcStreamingManagerImpl) SendMessages( - msg *clobtypes.OffchainUpdateMessage, +func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( + updates *clobtypes.OffchainUpdates, ) { } diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go index 1fa8dc2e17..5ba4e22808 100644 --- a/protocol/streaming/grpc/noop_streaming_manager.go +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -7,7 +7,6 @@ import ( var _ types.GrpcStreamingManager = (*NoopGrpcStreamingManager)(nil) -// GrpcStreamingManager is an implementation for managing gRPC streaming subscriptions. type NoopGrpcStreamingManager struct{} func NewNoopGrpcStreamingManager() *NoopGrpcStreamingManager { @@ -28,9 +27,7 @@ func (sm *NoopGrpcStreamingManager) Subscribe( return nil, nil } -// SendMessages groups messages by their clob pair ids and -// sends messages to the subscribers. -func (sm *NoopGrpcStreamingManager) SendMessages( - msg *clobtypes.OffchainUpdateMessage, +func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( + updates *clobtypes.OffchainUpdates, ) { } diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go index 393918a1d8..0cbfa00515 100644 --- a/protocol/streaming/grpc/types/manager.go +++ b/protocol/streaming/grpc/types/manager.go @@ -15,5 +15,5 @@ type GrpcStreamingManager interface { finished chan<- bool, err error, ) - SendMessages(*clobtypes.OffchainUpdateMessage) + SendOrderbookUpdates(*clobtypes.OffchainUpdates) } From 0d405465b93a46a4aa00d73b271af0064840ae76 Mon Sep 17 00:00:00 2001 From: Jay Yu <103467857+jayy04@users.noreply.github.com> Date: Sat, 2 Mar 2024 12:15:56 -0500 Subject: [PATCH 3/3] update channel type --- protocol/streaming/grpc/grpc_streaming_manager.go | 2 +- protocol/streaming/grpc/noop_streaming_manager.go | 2 +- protocol/streaming/grpc/types/manager.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index 17ad1cddda..6b9250145f 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -24,7 +24,7 @@ func (sm *GrpcStreamingManagerImpl) Subscribe( req clobtypes.StreamOrderbookUpdatesRequest, srv clobtypes.Query_StreamOrderbookUpdatesServer, ) ( - finished chan<- bool, + finished chan bool, err error, ) { return nil, nil diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go index 5ba4e22808..b4670ba66f 100644 --- a/protocol/streaming/grpc/noop_streaming_manager.go +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -21,7 +21,7 @@ func (sm *NoopGrpcStreamingManager) Subscribe( req clobtypes.StreamOrderbookUpdatesRequest, srv clobtypes.Query_StreamOrderbookUpdatesServer, ) ( - finished chan<- bool, + finished chan bool, err error, ) { return nil, nil diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go index 0cbfa00515..7bf043c41d 100644 --- a/protocol/streaming/grpc/types/manager.go +++ b/protocol/streaming/grpc/types/manager.go @@ -12,7 +12,7 @@ type GrpcStreamingManager interface { req clobtypes.StreamOrderbookUpdatesRequest, srv clobtypes.Query_StreamOrderbookUpdatesServer, ) ( - finished chan<- bool, + finished chan bool, err error, ) SendOrderbookUpdates(*clobtypes.OffchainUpdates)