Skip to content

Commit 7b4228c

Browse files
authored
Updated Substreams to latest version of Protobuf definition and activated ProductionMode by default (#4211)
The production mode is required to benefits from automatic backprocessing and downloading of block scoped data message as they are produced. This will drastically improve the ingestion speed of a substreams (time to gather some metrics!). Updated also the instructions to re-generate the `substreams.proto` file with more instructions of how we do it. Co-authored-by: Matthieu Vachon <[email protected]>
1 parent 94d1104 commit 7b4228c

File tree

4 files changed

+123
-37
lines changed

4 files changed

+123
-37
lines changed

chain/substreams/src/mapper.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ impl SubstreamsMapper<Chain> for Mapper {
9898
}
9999
}
100100
}
101-
Some(Data::StoreDeltas(_)) => Err(UnexpectedStoreDeltaOutput),
101+
Some(Data::DebugStoreDeltas(_)) => Err(UnexpectedStoreDeltaOutput),
102102
_ => Err(SubstreamsError::ModuleOutputNotPresentOrUnexpected),
103103
}
104104
}

graph/proto/substreams.proto

+66-21
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,27 @@
1+
// File generated using this command at the root of `graph-node` project
2+
// and assuming `substreams` repository is a sibling of `graph-node` (note that you
3+
// might need to adjust the `head -nN` and `skip N` values in the commands below to skip
4+
// more/less lines):
5+
//
6+
// ```
7+
// cat graph/proto/substreams.proto | head -n16 > /tmp/substreams.proto && mv /tmp/substreams.proto graph/proto/substreams.proto
8+
// cat ../substreams/proto/sf/substreams/v1/substreams.proto | grep -Ev 'import *"sf/substreams' >> graph/proto/substreams.proto
9+
// cat ../substreams/proto/sf/substreams/v1/modules.proto | skip 6 >> graph/proto/substreams.proto
10+
// cat ../substreams/proto/sf/substreams/v1/package.proto | skip 9 >> graph/proto/substreams.proto
11+
// cat ../substreams/proto/sf/substreams/v1/clock.proto | skip 7 >> graph/proto/substreams.proto
12+
// # Manually add line `import "google/protobuf/descriptor.proto";` below `import "google/protobuf/timestamp.proto";`
13+
// ```
14+
//
15+
// FIXME: We copy over and inline most of the substreams files, this is bad and we need a better way to
16+
// generate that, outside of doing this copying over.
117
syntax = "proto3";
218

319
package sf.substreams.v1;
4-
520
option go_package = "github.com/streamingfast/substreams/pb/sf/substreams/v1;pbsubstreams";
621

722
import "google/protobuf/any.proto";
8-
import "google/protobuf/descriptor.proto";
923
import "google/protobuf/timestamp.proto";
10-
11-
// FIXME: I copied over and inlined most of the substreams files, this is bad and we need a better way to
12-
// generate that, outside of doing this copying over. We should check maybe `buf` or a pre-populated
13-
// package.
24+
import "google/protobuf/descriptor.proto";
1425

1526
service Stream {
1627
rpc Blocks(Request) returns (stream Response);
@@ -23,13 +34,26 @@ message Request {
2334
repeated ForkStep fork_steps = 4;
2435
string irreversibility_condition = 5;
2536

37+
// By default, the engine runs in developer mode, with richer and deeper output,
38+
// * support for multiple `output_modules`, of `store` and `map` kinds
39+
// * support for `initial_store_snapshot_for_modules`
40+
// * log outputs for output modules
41+
//
42+
// With `production_mode`, however, you trade off functionality for high speed, where it:
43+
// * restricts the possible requested `output_modules` to a single mapper module,
44+
// * turns off support for `initial_store_snapshot_for_modules`,
45+
// * still streams output linearly, with a cursor, but at higher speeds
46+
// * and purges log outputs from responses.
47+
bool production_mode = 9;
48+
2649
Modules modules = 6;
2750
repeated string output_modules = 7;
2851
repeated string initial_store_snapshot_for_modules = 8;
2952
}
3053

3154
message Response {
3255
oneof message {
56+
SessionInit session = 5; // Always sent first
3357
ModulesProgress progress = 1; // Progress of data preparation, before sending in the stream of `data` events.
3458
InitialSnapshotData snapshot_data = 2;
3559
InitialSnapshotComplete snapshot_complete = 3;
@@ -51,6 +75,10 @@ enum ForkStep {
5175
reserved 5;
5276
}
5377

78+
message SessionInit {
79+
string trace_id = 1;
80+
}
81+
5482
message InitialSnapshotComplete {
5583
string cursor = 1;
5684
}
@@ -71,17 +99,36 @@ message BlockScopedData {
7199

72100
message ModuleOutput {
73101
string name = 1;
102+
74103
oneof data {
75104
google.protobuf.Any map_output = 2;
76-
StoreDeltas store_deltas = 3;
77-
}
78-
repeated string logs = 4;
79105

106+
// StoreDeltas are produced for store modules in development mode.
107+
// It is not possible to retrieve store models in production, with parallelization
108+
// enabled. If you need the deltas directly, write a pass through mapper module
109+
// that will get them down to you.
110+
StoreDeltas debug_store_deltas = 3;
111+
}
112+
repeated string debug_logs = 4;
80113
// LogsTruncated is a flag that tells you if you received all the logs or if they
81114
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
82-
bool logs_truncated = 5;
115+
bool debug_logs_truncated = 5;
116+
117+
bool cached = 6;
83118
}
84119

120+
// think about:
121+
// message ModuleOutput { ...
122+
// ModuleOutputDebug debug_info = 6;
123+
// ...}
124+
//message ModuleOutputDebug {
125+
// StoreDeltas store_deltas = 3;
126+
// repeated string logs = 4;
127+
// // LogsTruncated is a flag that tells you if you received all the logs or if they
128+
// // were truncated because you logged too much (fixed limit currently is set to 128 KiB).
129+
// bool logs_truncated = 5;
130+
//}
131+
85132
message ModulesProgress {
86133
repeated ModuleProgress modules = 1;
87134
}
@@ -116,8 +163,8 @@ message ModuleProgress {
116163
}
117164

118165
message BlockRange {
119-
uint64 start_block = 1;
120-
uint64 end_block = 2;
166+
uint64 start_block = 2;
167+
uint64 end_block = 3;
121168
}
122169

123170
message StoreDeltas {
@@ -144,7 +191,6 @@ message Output {
144191
google.protobuf.Timestamp timestamp = 4;
145192
google.protobuf.Any value = 10;
146193
}
147-
148194
message Modules {
149195
repeated Module modules = 1;
150196
repeated Binary binaries = 2;
@@ -199,8 +245,9 @@ message Module {
199245
UPDATE_POLICY_MIN = 4;
200246
// Provides a store where you can `max_*()` keys, where two stores merge by leaving the maximum value.
201247
UPDATE_POLICY_MAX = 5;
248+
// Provides a store where you can `append()` keys, where two stores merge by concatenating the bytes in order.
249+
UPDATE_POLICY_APPEND = 6;
202250
}
203-
204251
}
205252

206253
message Input {
@@ -232,13 +279,6 @@ message Module {
232279
string type = 1;
233280
}
234281
}
235-
236-
message Clock {
237-
string id = 1;
238-
uint64 number = 2;
239-
google.protobuf.Timestamp timestamp = 3;
240-
}
241-
242282
message Package {
243283
// Needs to be one so this file can be used _directly_ as a
244284
// buf `Image` andor a ProtoSet for grpcurl and other tools
@@ -265,3 +305,8 @@ message ModuleMetadata {
265305
uint64 package_index = 1;
266306
string doc = 2;
267307
}
308+
message Clock {
309+
string id = 1;
310+
uint64 number = 2;
311+
google.protobuf.Timestamp timestamp = 3;
312+
}

graph/src/blockchain/substreams_block_stream.rs

+1
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ fn stream_blocks<C: Blockchain, F: SubstreamsMapper<C>>(
186186
irreversibility_condition: "".to_string(),
187187
modules,
188188
output_modules: vec![module_name],
189+
production_mode: true,
189190
..Default::default()
190191
};
191192

graph/src/substreams/sf.substreams.v1.rs

+55-15
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,18 @@ pub struct Request {
1010
pub fork_steps: ::prost::alloc::vec::Vec<i32>,
1111
#[prost(string, tag="5")]
1212
pub irreversibility_condition: ::prost::alloc::string::String,
13+
/// By default, the engine runs in developer mode, with richer and deeper output,
14+
/// * support for multiple `output_modules`, of `store` and `map` kinds
15+
/// * support for `initial_store_snapshot_for_modules`
16+
/// * log outputs for output modules
17+
///
18+
/// With `production_mode`, however, you trade off functionality for high speed, where it:
19+
/// * restricts the possible requested `output_modules` to a single mapper module,
20+
/// * turns off support for `initial_store_snapshot_for_modules`,
21+
/// * still streams output linearly, with a cursor, but at higher speeds
22+
/// * and purges log outputs from responses.
23+
#[prost(bool, tag="9")]
24+
pub production_mode: bool,
1325
#[prost(message, optional, tag="6")]
1426
pub modules: ::core::option::Option<Modules>,
1527
#[prost(string, repeated, tag="7")]
@@ -19,13 +31,16 @@ pub struct Request {
1931
}
2032
#[derive(Clone, PartialEq, ::prost::Message)]
2133
pub struct Response {
22-
#[prost(oneof="response::Message", tags="1, 2, 3, 4")]
34+
#[prost(oneof="response::Message", tags="5, 1, 2, 3, 4")]
2335
pub message: ::core::option::Option<response::Message>,
2436
}
2537
/// Nested message and enum types in `Response`.
2638
pub mod response {
2739
#[derive(Clone, PartialEq, ::prost::Oneof)]
2840
pub enum Message {
41+
/// Always sent first
42+
#[prost(message, tag="5")]
43+
Session(super::SessionInit),
2944
/// Progress of data preparation, before sending in the stream of `data` events.
3045
#[prost(message, tag="1")]
3146
Progress(super::ModulesProgress),
@@ -38,6 +53,11 @@ pub mod response {
3853
}
3954
}
4055
#[derive(Clone, PartialEq, ::prost::Message)]
56+
pub struct SessionInit {
57+
#[prost(string, tag="1")]
58+
pub trace_id: ::prost::alloc::string::String,
59+
}
60+
#[derive(Clone, PartialEq, ::prost::Message)]
4161
pub struct InitialSnapshotComplete {
4262
#[prost(string, tag="1")]
4363
pub cursor: ::prost::alloc::string::String,
@@ -69,11 +89,13 @@ pub struct ModuleOutput {
6989
#[prost(string, tag="1")]
7090
pub name: ::prost::alloc::string::String,
7191
#[prost(string, repeated, tag="4")]
72-
pub logs: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
92+
pub debug_logs: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
7393
/// LogsTruncated is a flag that tells you if you received all the logs or if they
7494
/// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
7595
#[prost(bool, tag="5")]
76-
pub logs_truncated: bool,
96+
pub debug_logs_truncated: bool,
97+
#[prost(bool, tag="6")]
98+
pub cached: bool,
7799
#[prost(oneof="module_output::Data", tags="2, 3")]
78100
pub data: ::core::option::Option<module_output::Data>,
79101
}
@@ -83,10 +105,26 @@ pub mod module_output {
83105
pub enum Data {
84106
#[prost(message, tag="2")]
85107
MapOutput(::prost_types::Any),
108+
/// StoreDeltas are produced for store modules in development mode.
109+
/// It is not possible to retrieve store models in production, with parallelization
110+
/// enabled. If you need the deltas directly, write a pass through mapper module
111+
/// that will get them down to you.
86112
#[prost(message, tag="3")]
87-
StoreDeltas(super::StoreDeltas),
113+
DebugStoreDeltas(super::StoreDeltas),
88114
}
89115
}
116+
// think about:
117+
// message ModuleOutput { ...
118+
// ModuleOutputDebug debug_info = 6;
119+
// ...}
120+
//message ModuleOutputDebug {
121+
// StoreDeltas store_deltas = 3;
122+
// repeated string logs = 4;
123+
// // LogsTruncated is a flag that tells you if you received all the logs or if they
124+
// // were truncated because you logged too much (fixed limit currently is set to 128 KiB).
125+
// bool logs_truncated = 5;
126+
//}
127+
90128
#[derive(Clone, PartialEq, ::prost::Message)]
91129
pub struct ModulesProgress {
92130
#[prost(message, repeated, tag="1")]
@@ -143,9 +181,9 @@ pub mod module_progress {
143181
}
144182
#[derive(Clone, PartialEq, ::prost::Message)]
145183
pub struct BlockRange {
146-
#[prost(uint64, tag="1")]
147-
pub start_block: u64,
148184
#[prost(uint64, tag="2")]
185+
pub start_block: u64,
186+
#[prost(uint64, tag="3")]
149187
pub end_block: u64,
150188
}
151189
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -258,6 +296,8 @@ pub mod module {
258296
Min = 4,
259297
/// Provides a store where you can `max_*()` keys, where two stores merge by leaving the maximum value.
260298
Max = 5,
299+
/// Provides a store where you can `append()` keys, where two stores merge by concatenating the bytes in order.
300+
Append = 6,
261301
}
262302
}
263303
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -320,15 +360,6 @@ pub mod module {
320360
}
321361
}
322362
#[derive(Clone, PartialEq, ::prost::Message)]
323-
pub struct Clock {
324-
#[prost(string, tag="1")]
325-
pub id: ::prost::alloc::string::String,
326-
#[prost(uint64, tag="2")]
327-
pub number: u64,
328-
#[prost(message, optional, tag="3")]
329-
pub timestamp: ::core::option::Option<::prost_types::Timestamp>,
330-
}
331-
#[derive(Clone, PartialEq, ::prost::Message)]
332363
pub struct Package {
333364
/// Needs to be one so this file can be used _directly_ as a
334365
/// buf `Image` andor a ProtoSet for grpcurl and other tools
@@ -362,6 +393,15 @@ pub struct ModuleMetadata {
362393
#[prost(string, tag="2")]
363394
pub doc: ::prost::alloc::string::String,
364395
}
396+
#[derive(Clone, PartialEq, ::prost::Message)]
397+
pub struct Clock {
398+
#[prost(string, tag="1")]
399+
pub id: ::prost::alloc::string::String,
400+
#[prost(uint64, tag="2")]
401+
pub number: u64,
402+
#[prost(message, optional, tag="3")]
403+
pub timestamp: ::core::option::Option<::prost_types::Timestamp>,
404+
}
365405
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
366406
#[repr(i32)]
367407
pub enum ForkStep {

0 commit comments

Comments
 (0)