Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

substreams proto #3765

Merged
merged 1 commit into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions chain/substreams/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ name = "graph-chain-substreams"
version = "0.26.0"
edition = "2021"

[build-dependencies]
tonic-build = { version = "0.7.2", features = ["prost"] }

[dependencies]
envconfig = "0.10.0"
futures = "0.1.21"
Expand All @@ -19,6 +22,7 @@ anyhow = "1.0"
tiny-keccak = "1.5.0"
hex = "0.4.3"
semver = "1.0.12"
tonic = { version = "0.7.1", features = ["tls-roots"] }

itertools = "0.10.3"

Expand All @@ -30,5 +34,5 @@ graph-core = { path = "../../core" }
test-store = { path = "../../store/test-store" }
base64 = "0.13.0"

[build-dependencies]
tonic-build = { version = "0.7.2", features = ["prost"] }


7 changes: 7 additions & 0 deletions chain/substreams/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
fn main() {
println!("cargo:rerun-if-changed=proto");
tonic_build::configure()
.out_dir("src/protobuf")
.compile(&["proto/substreams.proto"], &["proto"])
.expect("Failed to compile Substreams proto(s)");
}
267 changes: 267 additions & 0 deletions chain/substreams/proto/substreams.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
syntax = "proto3";

package sf.substreams.v1;

option go_package = "github.com/streamingfast/substreams/pb/sf/substreams/v1;pbsubstreams";

import "google/protobuf/any.proto";
import "google/protobuf/descriptor.proto";
import "google/protobuf/timestamp.proto";

// FIXME: I copied over and inlined most of the substreams files, this is bad and we need a better way to
// generate that, outside of doing this copying over. We should check maybe `buf` or a pre-populated
// package.

service Stream {
rpc Blocks(Request) returns (stream Response);
}

message Request {
int64 start_block_num = 1;
string start_cursor = 2;
uint64 stop_block_num = 3;
repeated ForkStep fork_steps = 4;
string irreversibility_condition = 5;

Modules modules = 6;
repeated string output_modules = 7;
repeated string initial_store_snapshot_for_modules = 8;
}

message Response {
oneof message {
ModulesProgress progress = 1; // Progress of data preparation, before sending in the stream of `data` events.
InitialSnapshotData snapshot_data = 2;
InitialSnapshotComplete snapshot_complete = 3;
BlockScopedData data = 4;
}
}

enum ForkStep {
STEP_UNKNOWN = 0;
// Block is new head block of the chain, that is linear with the previous block
STEP_NEW = 1;
// Block is now forked and should be undone, it's not the head block of the chain anymore
STEP_UNDO = 2;
// Removed, was STEP_REDO
reserved 3;
// Block is now irreversible and can be committed to (finality is chain specific, see chain documentation for more details)
STEP_IRREVERSIBLE = 4;
// Removed, was STEP_STALLED
reserved 5;
}

message InitialSnapshotComplete {
string cursor = 1;
}

message InitialSnapshotData {
string module_name = 1;
StoreDeltas deltas = 2;
uint64 sent_keys = 4;
uint64 total_keys = 3;
}

message BlockScopedData {
repeated ModuleOutput outputs = 1;
Clock clock = 3;
ForkStep step = 6;
string cursor = 10;
}

message ModuleOutput {
string name = 1;
oneof data {
google.protobuf.Any map_output = 2;
StoreDeltas store_deltas = 3;
}
repeated string logs = 4;

// LogsTruncated is a flag that tells you if you received all the logs or if they
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
bool logs_truncated = 5;
}

message ModulesProgress {
repeated ModuleProgress modules = 1;
}

message ModuleProgress {
string name = 1;

oneof type {
ProcessedRange processed_ranges = 2;
InitialState initial_state = 3;
ProcessedBytes processed_bytes = 4;
Failed failed = 5;
}

message ProcessedRange {
repeated BlockRange processed_ranges = 1;
}
message InitialState {
uint64 available_up_to_block = 2;
}
message ProcessedBytes {
uint64 total_bytes_read = 1;
uint64 total_bytes_written = 2;
}
message Failed {
string reason = 1;
repeated string logs = 2;
// FailureLogsTruncated is a flag that tells you if you received all the logs or if they
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
bool logs_truncated = 3;
}
}

message BlockRange {
uint64 start_block = 1;
uint64 end_block = 2;
}

message StoreDeltas {
repeated StoreDelta deltas = 1;
}

message StoreDelta {
enum Operation {
UNSET = 0;
CREATE = 1;
UPDATE = 2;
DELETE = 3;
}
Operation operation = 1;
uint64 ordinal = 2;
string key = 3;
bytes old_value = 4;
bytes new_value = 5;
}

message Output {
uint64 block_num = 1;
string block_id = 2;
google.protobuf.Timestamp timestamp = 4;
google.protobuf.Any value = 10;
}

message Modules {
repeated Module modules = 1;
repeated Binary binaries = 2;
}

// Binary represents some code compiled to its binary form.
message Binary {
string type = 1;
bytes content = 2;
}

message Module {
string name = 1;
oneof kind {
KindMap kind_map = 2;
KindStore kind_store = 3;
};

uint32 binary_index = 4;
string binary_entrypoint = 5;

repeated Input inputs = 6;
Output output = 7;

uint64 initial_block = 8;

message KindMap {
string output_type = 1;
}

message KindStore {
// The `update_policy` determines the functions available to mutate the store
// (like `set()`, `set_if_not_exists()` or `sum()`, etc..) in
// order to ensure that parallel operations are possible and deterministic
//
// Say a store cumulates keys from block 0 to 1M, and a second store
// cumulates keys from block 1M to 2M. When we want to use this
// store as a dependency for a downstream module, we will merge the
// two stores according to this policy.
UpdatePolicy update_policy = 1;
string value_type = 2;

enum UpdatePolicy {
UPDATE_POLICY_UNSET = 0;
// Provides a store where you can `set()` keys, and the latest key wins
UPDATE_POLICY_SET = 1;
// Provides a store where you can `set_if_not_exists()` keys, and the first key wins
UPDATE_POLICY_SET_IF_NOT_EXISTS = 2;
// Provides a store where you can `add_*()` keys, where two stores merge by summing its values.
UPDATE_POLICY_ADD = 3;
// Provides a store where you can `min_*()` keys, where two stores merge by leaving the minimum value.
UPDATE_POLICY_MIN = 4;
// Provides a store where you can `max_*()` keys, where two stores merge by leaving the maximum value.
UPDATE_POLICY_MAX = 5;
}

}

message Input {
oneof input {
Source source = 1;
Map map = 2;
Store store = 3;
}

message Source {
string type = 1; // ex: "sf.ethereum.type.v1.Block"
}
message Map {
string module_name = 1; // ex: "block_to_pairs"
}
message Store {
string module_name = 1;
Mode mode = 2;

enum Mode {
UNSET = 0;
GET = 1;
DELTAS = 2;
}
}
}

message Output {
string type = 1;
}
}

message Clock {
string id = 1;
uint64 number = 2;
google.protobuf.Timestamp timestamp = 3;
}

message Package {
// Needs to be one so this file can be used _directly_ as a
// buf `Image` andor a ProtoSet for grpcurl and other tools
repeated google.protobuf.FileDescriptorProto proto_files = 1;
reserved 2; // In case protosets add a field some day.
reserved 3; // In case protosets add a field some day.
reserved 4; // In case protosets add a field some day.

uint64 version = 5;
sf.substreams.v1.Modules modules = 6;
repeated ModuleMetadata module_meta = 7;
repeated PackageMetadata package_meta = 8;
}

message PackageMetadata {
string version = 1;
string url = 2;
string name = 3;
string doc = 4;
}

message ModuleMetadata {
// Corresponds to the index in `Package.metadata.package_meta`
uint64 package_index = 1;
string doc = 2;
}
3 changes: 3 additions & 0 deletions chain/substreams/src/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#[rustfmt::skip]
#[path = "protobuf/sf.substreams.v1.rs"]
pub mod pbcodec;
1 change: 1 addition & 0 deletions chain/substreams/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod chain;
mod codec;
mod data_source;
mod trigger;

Expand Down
Loading