Skip to content

Commit 42a4788

Browse files
authored
substreams proto (#3765)
1 parent a4ace83 commit 42a4788

File tree

7 files changed

+889
-2
lines changed

7 files changed

+889
-2
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chain/substreams/Cargo.toml

+6-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ name = "graph-chain-substreams"
33
version = "0.26.0"
44
edition = "2021"
55

6+
[build-dependencies]
7+
tonic-build = { version = "0.7.2", features = ["prost"] }
8+
69
[dependencies]
710
envconfig = "0.10.0"
811
futures = "0.1.21"
@@ -19,6 +22,7 @@ anyhow = "1.0"
1922
tiny-keccak = "1.5.0"
2023
hex = "0.4.3"
2124
semver = "1.0.12"
25+
tonic = { version = "0.7.1", features = ["tls-roots"] }
2226

2327
itertools = "0.10.3"
2428

@@ -30,5 +34,5 @@ graph-core = { path = "../../core" }
3034
test-store = { path = "../../store/test-store" }
3135
base64 = "0.13.0"
3236

33-
[build-dependencies]
34-
tonic-build = { version = "0.7.2", features = ["prost"] }
37+
38+

chain/substreams/build.rs

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
fn main() {
2+
println!("cargo:rerun-if-changed=proto");
3+
tonic_build::configure()
4+
.out_dir("src/protobuf")
5+
.compile(&["proto/substreams.proto"], &["proto"])
6+
.expect("Failed to compile Substreams proto(s)");
7+
}
+267
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
syntax = "proto3";
2+
3+
package sf.substreams.v1;
4+
5+
option go_package = "github.com/streamingfast/substreams/pb/sf/substreams/v1;pbsubstreams";
6+
7+
import "google/protobuf/any.proto";
8+
import "google/protobuf/descriptor.proto";
9+
import "google/protobuf/timestamp.proto";
10+
11+
// FIXME: I copied over and inlined most of the substreams files, this is bad and we need a better way to
12+
// generate that, outside of doing this copying over. We should check maybe `buf` or a pre-populated
13+
// package.
14+
15+
service Stream {
16+
rpc Blocks(Request) returns (stream Response);
17+
}
18+
19+
message Request {
20+
int64 start_block_num = 1;
21+
string start_cursor = 2;
22+
uint64 stop_block_num = 3;
23+
repeated ForkStep fork_steps = 4;
24+
string irreversibility_condition = 5;
25+
26+
Modules modules = 6;
27+
repeated string output_modules = 7;
28+
repeated string initial_store_snapshot_for_modules = 8;
29+
}
30+
31+
message Response {
32+
oneof message {
33+
ModulesProgress progress = 1; // Progress of data preparation, before sending in the stream of `data` events.
34+
InitialSnapshotData snapshot_data = 2;
35+
InitialSnapshotComplete snapshot_complete = 3;
36+
BlockScopedData data = 4;
37+
}
38+
}
39+
40+
enum ForkStep {
41+
STEP_UNKNOWN = 0;
42+
// Block is new head block of the chain, that is linear with the previous block
43+
STEP_NEW = 1;
44+
// Block is now forked and should be undone, it's not the head block of the chain anymore
45+
STEP_UNDO = 2;
46+
// Removed, was STEP_REDO
47+
reserved 3;
48+
// Block is now irreversible and can be committed to (finality is chain specific, see chain documentation for more details)
49+
STEP_IRREVERSIBLE = 4;
50+
// Removed, was STEP_STALLED
51+
reserved 5;
52+
}
53+
54+
message InitialSnapshotComplete {
55+
string cursor = 1;
56+
}
57+
58+
message InitialSnapshotData {
59+
string module_name = 1;
60+
StoreDeltas deltas = 2;
61+
uint64 sent_keys = 4;
62+
uint64 total_keys = 3;
63+
}
64+
65+
message BlockScopedData {
66+
repeated ModuleOutput outputs = 1;
67+
Clock clock = 3;
68+
ForkStep step = 6;
69+
string cursor = 10;
70+
}
71+
72+
message ModuleOutput {
73+
string name = 1;
74+
oneof data {
75+
google.protobuf.Any map_output = 2;
76+
StoreDeltas store_deltas = 3;
77+
}
78+
repeated string logs = 4;
79+
80+
// LogsTruncated is a flag that tells you if you received all the logs or if they
81+
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
82+
bool logs_truncated = 5;
83+
}
84+
85+
message ModulesProgress {
86+
repeated ModuleProgress modules = 1;
87+
}
88+
89+
message ModuleProgress {
90+
string name = 1;
91+
92+
oneof type {
93+
ProcessedRange processed_ranges = 2;
94+
InitialState initial_state = 3;
95+
ProcessedBytes processed_bytes = 4;
96+
Failed failed = 5;
97+
}
98+
99+
message ProcessedRange {
100+
repeated BlockRange processed_ranges = 1;
101+
}
102+
message InitialState {
103+
uint64 available_up_to_block = 2;
104+
}
105+
message ProcessedBytes {
106+
uint64 total_bytes_read = 1;
107+
uint64 total_bytes_written = 2;
108+
}
109+
message Failed {
110+
string reason = 1;
111+
repeated string logs = 2;
112+
// FailureLogsTruncated is a flag that tells you if you received all the logs or if they
113+
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
114+
bool logs_truncated = 3;
115+
}
116+
}
117+
118+
message BlockRange {
119+
uint64 start_block = 1;
120+
uint64 end_block = 2;
121+
}
122+
123+
message StoreDeltas {
124+
repeated StoreDelta deltas = 1;
125+
}
126+
127+
message StoreDelta {
128+
enum Operation {
129+
UNSET = 0;
130+
CREATE = 1;
131+
UPDATE = 2;
132+
DELETE = 3;
133+
}
134+
Operation operation = 1;
135+
uint64 ordinal = 2;
136+
string key = 3;
137+
bytes old_value = 4;
138+
bytes new_value = 5;
139+
}
140+
141+
message Output {
142+
uint64 block_num = 1;
143+
string block_id = 2;
144+
google.protobuf.Timestamp timestamp = 4;
145+
google.protobuf.Any value = 10;
146+
}
147+
148+
message Modules {
149+
repeated Module modules = 1;
150+
repeated Binary binaries = 2;
151+
}
152+
153+
// Binary represents some code compiled to its binary form.
154+
message Binary {
155+
string type = 1;
156+
bytes content = 2;
157+
}
158+
159+
message Module {
160+
string name = 1;
161+
oneof kind {
162+
KindMap kind_map = 2;
163+
KindStore kind_store = 3;
164+
};
165+
166+
uint32 binary_index = 4;
167+
string binary_entrypoint = 5;
168+
169+
repeated Input inputs = 6;
170+
Output output = 7;
171+
172+
uint64 initial_block = 8;
173+
174+
message KindMap {
175+
string output_type = 1;
176+
}
177+
178+
message KindStore {
179+
// The `update_policy` determines the functions available to mutate the store
180+
// (like `set()`, `set_if_not_exists()` or `sum()`, etc..) in
181+
// order to ensure that parallel operations are possible and deterministic
182+
//
183+
// Say a store cumulates keys from block 0 to 1M, and a second store
184+
// cumulates keys from block 1M to 2M. When we want to use this
185+
// store as a dependency for a downstream module, we will merge the
186+
// two stores according to this policy.
187+
UpdatePolicy update_policy = 1;
188+
string value_type = 2;
189+
190+
enum UpdatePolicy {
191+
UPDATE_POLICY_UNSET = 0;
192+
// Provides a store where you can `set()` keys, and the latest key wins
193+
UPDATE_POLICY_SET = 1;
194+
// Provides a store where you can `set_if_not_exists()` keys, and the first key wins
195+
UPDATE_POLICY_SET_IF_NOT_EXISTS = 2;
196+
// Provides a store where you can `add_*()` keys, where two stores merge by summing its values.
197+
UPDATE_POLICY_ADD = 3;
198+
// Provides a store where you can `min_*()` keys, where two stores merge by leaving the minimum value.
199+
UPDATE_POLICY_MIN = 4;
200+
// Provides a store where you can `max_*()` keys, where two stores merge by leaving the maximum value.
201+
UPDATE_POLICY_MAX = 5;
202+
}
203+
204+
}
205+
206+
message Input {
207+
oneof input {
208+
Source source = 1;
209+
Map map = 2;
210+
Store store = 3;
211+
}
212+
213+
message Source {
214+
string type = 1; // ex: "sf.ethereum.type.v1.Block"
215+
}
216+
message Map {
217+
string module_name = 1; // ex: "block_to_pairs"
218+
}
219+
message Store {
220+
string module_name = 1;
221+
Mode mode = 2;
222+
223+
enum Mode {
224+
UNSET = 0;
225+
GET = 1;
226+
DELTAS = 2;
227+
}
228+
}
229+
}
230+
231+
message Output {
232+
string type = 1;
233+
}
234+
}
235+
236+
message Clock {
237+
string id = 1;
238+
uint64 number = 2;
239+
google.protobuf.Timestamp timestamp = 3;
240+
}
241+
242+
message Package {
243+
// Needs to be one so this file can be used _directly_ as a
244+
// buf `Image` andor a ProtoSet for grpcurl and other tools
245+
repeated google.protobuf.FileDescriptorProto proto_files = 1;
246+
reserved 2; // In case protosets add a field some day.
247+
reserved 3; // In case protosets add a field some day.
248+
reserved 4; // In case protosets add a field some day.
249+
250+
uint64 version = 5;
251+
sf.substreams.v1.Modules modules = 6;
252+
repeated ModuleMetadata module_meta = 7;
253+
repeated PackageMetadata package_meta = 8;
254+
}
255+
256+
message PackageMetadata {
257+
string version = 1;
258+
string url = 2;
259+
string name = 3;
260+
string doc = 4;
261+
}
262+
263+
message ModuleMetadata {
264+
// Corresponds to the index in `Package.metadata.package_meta`
265+
uint64 package_index = 1;
266+
string doc = 2;
267+
}

chain/substreams/src/codec.rs

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#[rustfmt::skip]
2+
#[path = "protobuf/sf.substreams.v1.rs"]
3+
pub mod pbcodec;

chain/substreams/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod chain;
2+
mod codec;
23
mod data_source;
34
mod trigger;
45

0 commit comments

Comments
 (0)