Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-723] add block number + stage to grpc updates #1252

Merged
merged 3 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,15 @@ export interface StreamOrderbookUpdatesResponse {
*/

snapshot: boolean;
/**
* ---Additional fields used to debug issues---
* Block height of the updates.
*/

blockHeight: number;
/** Exec mode of the updates. */

execMode: number;
}
/**
* StreamOrderbookUpdatesResponse is a response message for the
Expand All @@ -250,6 +259,15 @@ export interface StreamOrderbookUpdatesResponseSDKType {
*/

snapshot: boolean;
/**
* ---Additional fields used to debug issues---
* Block height of the updates.
*/

block_height: number;
/** Exec mode of the updates. */

exec_mode: number;
}

function createBaseQueryGetClobPairRequest(): QueryGetClobPairRequest {
Expand Down Expand Up @@ -904,7 +922,9 @@ export const StreamOrderbookUpdatesRequest = {
function createBaseStreamOrderbookUpdatesResponse(): StreamOrderbookUpdatesResponse {
return {
updates: [],
snapshot: false
snapshot: false,
blockHeight: 0,
execMode: 0
};
}

Expand All @@ -918,6 +938,14 @@ export const StreamOrderbookUpdatesResponse = {
writer.uint32(16).bool(message.snapshot);
}

if (message.blockHeight !== 0) {
writer.uint32(24).uint32(message.blockHeight);
}

if (message.execMode !== 0) {
writer.uint32(32).uint32(message.execMode);
}

return writer;
},

Expand All @@ -938,6 +966,14 @@ export const StreamOrderbookUpdatesResponse = {
message.snapshot = reader.bool();
break;

case 3:
message.blockHeight = reader.uint32();
break;

case 4:
message.execMode = reader.uint32();
break;

default:
reader.skipType(tag & 7);
break;
Expand All @@ -951,6 +987,8 @@ export const StreamOrderbookUpdatesResponse = {
const message = createBaseStreamOrderbookUpdatesResponse();
message.updates = object.updates?.map(e => OffChainUpdateV1.fromPartial(e)) || [];
message.snapshot = object.snapshot ?? false;
message.blockHeight = object.blockHeight ?? 0;
message.execMode = object.execMode ?? 0;
return message;
}

Expand Down
7 changes: 7 additions & 0 deletions proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,11 @@ message StreamOrderbookUpdatesResponse {
// Note that if the snapshot is true, then all previous entries should be
// discarded and the orderbook should be resynced.
bool snapshot = 2;

// ---Additional fields used to debug issues---
// Block height of the updates.
uint32 block_height = 3;

// Exec mode of the updates.
uint32 exec_mode = 4;
}
6 changes: 6 additions & 0 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1683,6 +1683,8 @@ func (app *App) PreBlocker(ctx sdk.Context, _ *abci.RequestFinalizeBlock) (*sdk.

// BeginBlocker application updates every begin block
func (app *App) BeginBlocker(ctx sdk.Context) (sdk.BeginBlock, error) {
ctx = ctx.WithExecMode(lib.ExecModeBeginBlock)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wanna do

	ctx = log.AddPersistentTagsToLogger(ctx,
		log.Handler, XXXXXX,
		log.BlockHeight, ctx.BlockHeight(),
	)

while you are at it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the strings not the ints though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean to move this from x/clob's begin blocker to here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can just add some tags to the app-level app.go file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going to punt on this for now so that I can merge. let's do this in a separate PR


// Update the proposer address in the logger for the panic logging middleware.
proposerAddr := sdk.ConsAddress(ctx.BlockHeader().ProposerAddress)
middleware.Logger = ctx.Logger().With("proposer_cons_addr", proposerAddr.String())
Comment on lines 1683 to 1690
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 NOTE
This review was outside the diff hunks, and no overlapping diff hunk was found. Original lines [738-738]

The gRPC server is initialized without specifying credentials, which could potentially expose the server to unencrypted connections. It's recommended to secure the gRPC server by using SSL/TLS credentials.

- app.Server = daemonserver.NewServer(logger, grpc.NewServer(), &daemontypes.FileHandlerImpl{}, daemonFlags.Shared.SocketAddress)
+ creds, err := credentials.NewServerTLSFromFile("cert.pem", "cert.key")
+ if err != nil {
+     log.Fatalf("Failed to generate credentials %v", err)
+ }
+ app.Server = daemonserver.NewServer(logger, grpc.NewServer(grpc.Creds(creds)), &daemontypes.FileHandlerImpl{}, daemonFlags.Shared.SocketAddress)

Expand All @@ -1693,6 +1695,8 @@ func (app *App) BeginBlocker(ctx sdk.Context) (sdk.BeginBlock, error) {

// EndBlocker application updates every end block
func (app *App) EndBlocker(ctx sdk.Context) (sdk.EndBlock, error) {
ctx = ctx.WithExecMode(lib.ExecModeEndBlock)

// Reset the logger for middleware.
// Note that the middleware is only used by `CheckTx` and `DeliverTx`, and not `EndBlocker`.
// Panics from `EndBlocker` will not be logged by the middleware and will lead to consensus failures.
Expand All @@ -1716,6 +1720,8 @@ func (app *App) Precommitter(ctx sdk.Context) {

// PrepareCheckStater application updates after commit and before any check state is invoked.
func (app *App) PrepareCheckStater(ctx sdk.Context) {
ctx = ctx.WithExecMode(lib.ExecModePrepareCheckState)

if err := app.ModuleManager.PrepareCheckState(ctx); err != nil {
panic(err)
}
Expand Down
7 changes: 7 additions & 0 deletions protocol/lib/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/lib/log"
)

// Custom exec modes
const (
ExecModeBeginBlock = 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using ints for these values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are meant to augment these

ExecModeEndBlock = 101
ExecModePrepareCheckState = 102
)

type TxHash string

func GetTxHash(tx []byte) TxHash {
Expand Down
6 changes: 3 additions & 3 deletions protocol/mocks/ClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions protocol/mocks/MemClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpc
import (
"sync"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
"github.com/dydxprotocol/v4-chain/protocol/lib"
Expand Down Expand Up @@ -76,6 +77,8 @@ func (sm *GrpcStreamingManagerImpl) Subscribe(
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
snapshot bool,
blockHeight uint32,
execMode sdk.ExecMode,
) {
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
Expand Down Expand Up @@ -113,8 +116,10 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
if len(updatesToSend) > 0 {
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updatesToSend,
Snapshot: snapshot,
Updates: updatesToSend,
Snapshot: snapshot,
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
); err != nil {
idsToRemove = append(idsToRemove, id)
Expand Down
3 changes: 3 additions & 0 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package grpc

import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)
Expand Down Expand Up @@ -29,6 +30,8 @@ func (sm *NoopGrpcStreamingManager) Subscribe(
func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
snapshot bool,
blockHeight uint32,
execMode sdk.ExecMode,
) {
}

Expand Down
3 changes: 3 additions & 0 deletions protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package types

import (
sdk "github.com/cosmos/cosmos-sdk/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)

Expand All @@ -18,5 +19,7 @@ type GrpcStreamingManager interface {
SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
snapshot bool,
blockHeight uint32,
execMode sdk.ExecMode,
)
}
1 change: 1 addition & 0 deletions protocol/testutil/memclob/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ func (f *FakeMemClobKeeper) Logger(ctx sdk.Context) log.Logger {
}

func (f *FakeMemClobKeeper) SendOrderbookUpdates(
ctx sdk.Context,
offchainUpdates *types.OffchainUpdates,
snapshot bool,
) {
Expand Down
2 changes: 1 addition & 1 deletion protocol/x/clob/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func PrepareCheckState(
allUpdates.Append(orderbookUpdate)
}
}
keeper.SendOrderbookUpdates(allUpdates, false)
keeper.SendOrderbookUpdates(ctx, allUpdates, false)
}

// 3. Place all stateful order placements included in the last block on the memclob.
Expand Down
10 changes: 8 additions & 2 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,17 +233,23 @@ func (k Keeper) InitializeNewGrpcStreams(ctx sdk.Context) {
allUpdates.Append(update)
}

k.SendOrderbookUpdates(allUpdates, true)
k.SendOrderbookUpdates(ctx, allUpdates, true)
}

// SendOrderbookUpdates sends the offchain updates to the gRPC streaming manager.
func (k Keeper) SendOrderbookUpdates(
ctx sdk.Context,
offchainUpdates *types.OffchainUpdates,
snapshot bool,
) {
if len(offchainUpdates.Messages) == 0 {
return
}

k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, snapshot)
k.GetGrpcStreamingManager().SendOrderbookUpdates(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't really feel strongly at all but why don't we pass in the ctx instead of the blockheight and exec mode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can abstract away the context from the streaming manager. logic to get block height/exec mode doesn't really belong there.

let me know if you have a preference

offchainUpdates,
snapshot,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
}
2 changes: 1 addition & 1 deletion protocol/x/clob/keeper/order_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,6 @@ func (k Keeper) PruneStateFillAmountsForShortTermOrders(
allUpdates.Append(orderbookUpdate)
}
}
k.SendOrderbookUpdates(allUpdates, false)
k.SendOrderbookUpdates(ctx, allUpdates, false)
}
}
6 changes: 3 additions & 3 deletions protocol/x/clob/memclob/memclob.go
Original file line number Diff line number Diff line change
Expand Up @@ -1523,7 +1523,7 @@ func (m *MemClobPriceTimePriority) mustAddOrderToOrderbook(
if m.generateOrderbookUpdates {
// Send an orderbook update to grpc streams.
orderbookUpdate := m.GetOrderbookUpdatesForOrderPlacement(ctx, newOrder)
m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false)
m.clobKeeper.SendOrderbookUpdates(ctx, orderbookUpdate, false)
}
}

Expand Down Expand Up @@ -1963,7 +1963,7 @@ func (m *MemClobPriceTimePriority) mustRemoveOrder(
if m.generateOrderbookUpdates {
// Send an orderbook update to grpc streams.
orderbookUpdate := m.GetOrderbookUpdatesForOrderRemoval(ctx, order.OrderId)
m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false)
m.clobKeeper.SendOrderbookUpdates(ctx, orderbookUpdate, false)
}
}

Expand All @@ -1985,7 +1985,7 @@ func (m *MemClobPriceTimePriority) mustUpdateOrderbookStateWithMatchedMakerOrder
// Send an orderbook update for the order's new total filled amount.
if m.generateOrderbookUpdates {
orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, makerOrder.OrderId)
m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false)
m.clobKeeper.SendOrderbookUpdates(ctx, orderbookUpdate, false)
}

// If the order is fully filled, remove it from the orderbook.
Expand Down
1 change: 1 addition & 0 deletions protocol/x/clob/types/clob_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ type ClobKeeper interface {
// Gprc streaming
InitializeNewGrpcStreams(ctx sdk.Context)
SendOrderbookUpdates(
ctx sdk.Context,
offchainUpdates *OffchainUpdates,
snapshot bool,
)
Expand Down
1 change: 1 addition & 0 deletions protocol/x/clob/types/mem_clob_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ type MemClobKeeper interface {
ctx sdk.Context,
) log.Logger
SendOrderbookUpdates(
ctx sdk.Context,
offchainUpdates *OffchainUpdates,
snapshot bool,
)
Expand Down
Loading
Loading