diff --git a/state/datastream.go b/state/datastream.go index f4f16965de..49da04783a 100644 --- a/state/datastream.go +++ b/state/datastream.go @@ -315,26 +315,43 @@ func GenerateDataStreamFile(ctx context.Context, streamServer *datastreamer.Stre } currentL2BlockNumber := transaction.L2BlockNumber - currentBatchNumber = transaction.L2BlockNumber lastAddedL2BlockNumber = currentL2BlockNumber - // Get Previous l2block timestamp - bookMark := &datastream.BookMark{ + // Get current batch number + bookMarkCurrentL2Block := &datastream.BookMark{ Type: datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK, - Value: currentL2BlockNumber - 1, + Value: currentL2BlockNumber, } - marshalledBookMark, err := proto.Marshal(bookMark) + marshalledBookMarkCurrentL2Block, err := proto.Marshal(bookMarkCurrentL2Block) if err != nil { return err } - prevL2BlockEntryNumber, err := streamServer.GetBookmark(marshalledBookMark) + currentL2BlockEntry, err := streamServer.GetFirstEventAfterBookmark(marshalledBookMarkCurrentL2Block) if err != nil { return err } - prevL2BlockEntry, err := streamServer.GetEntry(prevL2BlockEntryNumber) + currentL2Block := &datastream.L2Block{} + if err := proto.Unmarshal(currentL2BlockEntry.Data, currentL2Block); err != nil { + return err + } + + currentBatchNumber = currentL2Block.BatchNumber + + // Get Previous l2block timestamp + bookMarkPrevL2Block := &datastream.BookMark{ + Type: datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK, + Value: currentL2BlockNumber - 1, + } + + marshalledBookMarkPrevL2Block, err := proto.Marshal(bookMarkPrevL2Block) + if err != nil { + return err + } + + prevL2BlockEntry, err := streamServer.GetFirstEventAfterBookmark(marshalledBookMarkPrevL2Block) if err != nil { return err } @@ -683,20 +700,22 @@ func GenerateDataStreamFile(ctx context.Context, streamServer *datastreamer.Stre } } - batchEnd := &datastream.BatchEnd{ - Number: batch.BatchNumber, - LocalExitRoot: batch.LocalExitRoot.Bytes(), - StateRoot: batch.StateRoot.Bytes(), - } + if !batch.WIP { + batchEnd := &datastream.BatchEnd{ + Number: batch.BatchNumber, + LocalExitRoot: batch.LocalExitRoot.Bytes(), + StateRoot: batch.StateRoot.Bytes(), + } - marshalledBatch, err := proto.Marshal(batchEnd) - if err != nil { - return err - } + marshalledBatch, err := proto.Marshal(batchEnd) + if err != nil { + return err + } - _, err = streamServer.AddStreamEntry(datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_BATCH_END), marshalledBatch) - if err != nil { - return err + _, err = streamServer.AddStreamEntry(datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_BATCH_END), marshalledBatch) + if err != nil { + return err + } } // Commit at the end of each batch group