Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry Pick add l2blockend to DS (#3751) #3752

Merged
merged 1 commit into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions proto/src/proto/datastream/v1/datastream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ message L2Block {
Debug debug = 14;
}

message L2BlockEnd {
uint64 number = 1;
}

message Transaction {
uint64 l2block_number = 1;
uint64 index = 2;
Expand Down Expand Up @@ -79,6 +83,7 @@ enum EntryType {
ENTRY_TYPE_TRANSACTION = 3;
ENTRY_TYPE_BATCH_END = 4;
ENTRY_TYPE_UPDATE_GER = 5;
ENTRY_TYPE_L2_BLOCK_END = 6;
}

enum BatchType {
Expand Down
24 changes: 21 additions & 3 deletions sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *Sequencer) Start(ctx context.Context) {
}

if s.streamServer != nil {
go s.sendDataToStreamer(s.cfg.StreamServer.ChainID)
go s.sendDataToStreamer(s.cfg.StreamServer.ChainID, s.cfg.StreamServer.Version)
}

s.workerReadyTxsCond = newTimeoutCond(&sync.Mutex{})
Expand Down Expand Up @@ -129,7 +129,7 @@ func (s *Sequencer) checkStateInconsistency(ctx context.Context) {
}

func (s *Sequencer) updateDataStreamerFile(ctx context.Context, chainID uint64) {
err := state.GenerateDataStreamFile(ctx, s.streamServer, s.stateIntf, true, nil, chainID, s.cfg.StreamServer.UpgradeEtrogBatchNumber)
err := state.GenerateDataStreamFile(ctx, s.streamServer, s.stateIntf, true, nil, chainID, s.cfg.StreamServer.UpgradeEtrogBatchNumber, s.cfg.StreamServer.Version)
if err != nil {
log.Fatalf("failed to generate data streamer file, error: %v", err)
}
Expand Down Expand Up @@ -241,7 +241,7 @@ func (s *Sequencer) addTxToWorker(ctx context.Context, tx pool.Transaction) erro
}

// sendDataToStreamer sends data to the data stream server
func (s *Sequencer) sendDataToStreamer(chainID uint64) {
func (s *Sequencer) sendDataToStreamer(chainID uint64, version uint8) {
var err error
for {
// Read error from previous iteration
Expand Down Expand Up @@ -369,6 +369,24 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
}
}

if version >= state.DSVersion4 {
streamL2BlockEnd := &datastream.L2BlockEnd{
Number: l2Block.L2BlockNumber,
}

marshalledL2BlockEnd, err := proto.Marshal(streamL2BlockEnd)
if err != nil {
log.Errorf("failed to marshal l2block %d, error: %v", l2Block.L2BlockNumber, err)
continue
}

_, err = s.streamServer.AddStreamEntry(datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_L2_BLOCK_END), marshalledL2BlockEnd)
if err != nil {
log.Errorf("failed to add stream entry for l2blockEnd %d, error: %v", l2Block.L2BlockNumber, err)
continue
}
}

err = s.streamServer.CommitAtomicOp()
if err != nil {
log.Errorf("failed to commit atomic op for l2block %d, error: %v ", l2Block.L2BlockNumber, err)
Expand Down
75 changes: 74 additions & 1 deletion state/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ const (
SystemSC = "0x000000000000000000000000000000005ca1ab1e"
// posConstant is the constant used to compute the position of the intermediate state root
posConstant = 1
// DSVersion3 is the first protobuf version
DSVersion3 uint8 = 3
// DSVersion4 is the second protobuf version, includes l2BlockEnd
DSVersion4 uint8 = 4
)

// DSBatch represents a data stream batch
Expand Down Expand Up @@ -87,7 +91,7 @@ type DSState interface {
}

// GenerateDataStreamFile generates or resumes a data stream file
func GenerateDataStreamFile(ctx context.Context, streamServer *datastreamer.StreamServer, stateDB DSState, readWIPBatch bool, imStateRoots *map[uint64][]byte, chainID uint64, upgradeEtrogBatchNumber uint64) error {
func GenerateDataStreamFile(ctx context.Context, streamServer *datastreamer.StreamServer, stateDB DSState, readWIPBatch bool, imStateRoots *map[uint64][]byte, chainID uint64, upgradeEtrogBatchNumber uint64, version uint8) error {
header := streamServer.GetHeader()

var currentBatchNumber uint64 = 0
Expand Down Expand Up @@ -177,6 +181,22 @@ func GenerateDataStreamFile(ctx context.Context, streamServer *datastreamer.Stre
return err
}

if version >= DSVersion4 {
genesisBlockEnd := &datastream.L2BlockEnd{
Number: genesisL2Block.L2BlockNumber,
}

marshalledGenesisBlockEnd, err := proto.Marshal(genesisBlockEnd)
if err != nil {
return err
}

_, err = streamServer.AddStreamEntry(datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_L2_BLOCK_END), marshalledGenesisBlockEnd)
if err != nil {
return err
}
}

genesisBatchEnd := &datastream.BatchEnd{
Number: genesisL2Block.BatchNumber,
LocalExitRoot: common.Hash{}.Bytes(),
Expand Down Expand Up @@ -249,6 +269,43 @@ func GenerateDataStreamFile(ctx context.Context, streamServer *datastreamer.Stre
currentBatchNumber = l2Block.BatchNumber
previousTimestamp = l2Block.Timestamp
lastAddedL2BlockNumber = currentL2BlockNumber

case datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_L2_BLOCK_END):
log.Info("Latest entry type is L2BlockEnd")

l2BlockEnd := &datastream.L2BlockEnd{}
if err := proto.Unmarshal(latestEntry.Data, l2BlockEnd); err != nil {
return err
}

currentL2BlockNumber := l2BlockEnd.Number

// Getting the l2 block is needed in order to get the batch number and the timestamp
bookMark := &datastream.BookMark{
Type: datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK,
Value: currentL2BlockNumber,
}

marshalledBookMark, err := proto.Marshal(bookMark)
if err != nil {
return err
}

l2BlockEntry, err := streamServer.GetFirstEventAfterBookmark(marshalledBookMark)
if err != nil {
return err
}

l2Block := &datastream.L2Block{}

if err := proto.Unmarshal(l2BlockEntry.Data, l2Block); err != nil {
return err
}

currentBatchNumber = l2Block.BatchNumber
previousTimestamp = l2Block.Timestamp
lastAddedL2BlockNumber = currentL2BlockNumber

case datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_TRANSACTION):
log.Info("Latest entry type is Transaction")

Expand Down Expand Up @@ -607,6 +664,22 @@ func GenerateDataStreamFile(ctx context.Context, streamServer *datastreamer.Stre
}

currentGER = l2Block.GlobalExitRoot

if version >= DSVersion4 {
streamL2BlockEnd := &datastream.L2BlockEnd{
Number: l2Block.L2BlockNumber,
}

marshalledL2BlockEnd, err := proto.Marshal(streamL2BlockEnd)
if err != nil {
return err
}

_, err = streamServer.AddStreamEntry(datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_L2_BLOCK_END), marshalledL2BlockEnd)
if err != nil {
return err
}
}
}
}

Expand Down
Loading
Loading