From 444a53f178562dc04b35ebb1bd282dd2b4aa2873 Mon Sep 17 00:00:00 2001 From: cvauclair Date: Thu, 9 Jan 2025 16:51:18 -0500 Subject: [PATCH] feat(sink): Update sustreams config (#10) * feat: Add `SUBSTREAMS_START_BLOCK` to env vars * feat: Add `SUBSTREAMS_END_BLOCK` as env parameter * feat: Update protobufs to latest models * feat: Update substreams package * style: fmt --- docker/Dockerfile | 1 + docker/docker-compose.yaml | 1 + geo-substream.spkg | Bin 565461 -> 565461 bytes sdk/proto/geo.proto | 4 +- sdk/proto/ipfs.proto | 64 +++++++++++++++--------------- sdk/src/models/proposal.rs | 14 ++----- sdk/src/pb/ipfs.rs | 78 ++++++++++++++++++------------------- sink/src/main.rs | 26 +++++++++++-- 8 files changed, 97 insertions(+), 91 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index 85f1f45..30a3b25 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -22,6 +22,7 @@ FROM run AS sink ENV SUBSTREAMS_API_TOKEN "" ENV SUBSTREAMS_ENDPOINT_URL "" +ENV SUBSTREAMS_START_BLOCK "" COPY --from=builder /kg-node/target/release/sink . COPY --from=builder /kg-node/geo-substream.spkg . diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 097866b..b9a7174 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -29,6 +29,7 @@ services: neo4j_pass: neo4j SUBSTREAMS_API_TOKEN: ${SUBSTREAMS_API_TOKEN} SUBSTREAMS_ENDPOINT_URL: ${SUBSTREAMS_ENDPOINT_URL} + SUBSTREAMS_START_BLOCK: ${SUBSTREAMS_START_BLOCK} api: build: context: .. diff --git a/geo-substream.spkg b/geo-substream.spkg index f596193308f8fea14b70e560fb340b3cb02d01f3..08235e014f85c2e0597ddd16e9c36ef503400f68 100644 GIT binary patch delta 669 zcmZ9GOGrXd5QfjM)&YaE2SP7;K!LcZbwuQ*Wi5he5k+8&7Ci_`0>eN^YGDw;Ku~QW zgD41sOVIJa~!)I`krveHk{(9KY=lRA1C0?_BoQM zrM@KQIlM?>m95Xf8e81J3L8&lhzFeXo5CKycb;+0Hu{3Q%sp-m)jnzT*)=t>*)Omh4EgiQ3SR36_OrW!YFI+K^e=c8_NA@5>Mx z$QF@fXe4h$22e+z5ow~HZVzM!TD0#28LB~(=D(CTfQNdoWia3sLV(rK=@54I#T<`& zy%U*QJ`d;exrwQn1+L`tlU$zX2ZhrLJ9t^azJO>e4n&v20}5{|e6Fye@R!16mcj!D Yg=jAdgzzXlqOecl4TX=iny8GL|EPSKUH||9 delta 669 zcmZ9GPbkA-7{}l5jJI}}zb!NWnjGZ7(Lv0`jf+}J5zWEDpH}8FO(}~+N&J2Q2jSf<1ok#Bswm{EGYkdsP#Sz`Sc=z z7Aj3*tDt XE(nBhC_JRFN8t^HkF=_w45t17)sLbv diff --git a/sdk/proto/geo.proto b/sdk/proto/geo.proto index bda2ba5..d171d13 100644 --- a/sdk/proto/geo.proto +++ b/sdk/proto/geo.proto @@ -338,13 +338,11 @@ message GeoOutput { repeated GeoPersonalSpaceAdminPluginCreated personal_plugins_created = 12; repeated MemberRemoved members_removed = 13; repeated EditorRemoved editors_removed = 14; - repeated PublishEditProposalCreated edits = 15; - repeated AddMemberProposalCreated proposed_added_members = 16; repeated RemoveMemberProposalCreated proposed_removed_members = 17; repeated AddEditorProposalCreated proposed_added_editors = 18; repeated RemoveEditorProposalCreated proposed_removed_editors = 19; repeated AddSubspaceProposalCreated proposed_added_subspaces = 20; repeated RemoveSubspaceProposalCreated proposed_removed_subspaces = 21; -} +} \ No newline at end of file diff --git a/sdk/proto/ipfs.proto b/sdk/proto/ipfs.proto index e6f89f0..551e512 100644 --- a/sdk/proto/ipfs.proto +++ b/sdk/proto/ipfs.proto @@ -8,8 +8,6 @@ message IpfsMetadata { // independently of other proposal types. string version = 1; ActionType type = 2; - string id = 3; - string name = 4; } message Edit { @@ -28,16 +26,19 @@ message ImportEdit { string name = 4; repeated Op ops = 5; repeated string authors = 6; - string createdBy = 7; - string createdAt = 8; - string blockHash = 9; - string blockNumber = 10; - string transactionHash = 11; + string created_by = 7; + string created_at = 8; + string block_hash = 9; + string block_number = 10; + string transaction_hash = 11; } message Op { OpType type = 1; Triple triple = 2; + Entity entity = 3; + Relation relation = 4; + repeated Triple triples = 5; } message Triple { @@ -51,14 +52,31 @@ message Value { string value = 2; } +message Relation { + string id = 1; + string type = 2; + string from_entity = 3; + string to_entity = 4; + string index = 5; +} + +message Entity { + string id = 1; + repeated string types = 2; +} + enum OpType { - NONE = 0; + OP_TYPE_UNKNOWN = 0; SET_TRIPLE = 1; DELETE_TRIPLE = 2; + SET_TRIPLE_BATCH = 3; + DELETE_ENTITY = 4; + CREATE_RELATION = 5; + DELETE_RELATION = 6; } enum ValueType { - UNKNOWN = 0; + VALUE_TYPE_UNKNOWN = 0; TEXT = 1; NUMBER = 2; CHECKBOX = 3; @@ -67,44 +85,24 @@ enum ValueType { POINT = 6; } -message Membership { - ActionType type = 1; - string name = 2; - string version = 3; - string id = 4; - string user = 5; -} - -message Subspace { - ActionType type = 1; - string name = 2; - string version = 3; - string id = 4; - string subspace = 5; -} - enum ActionType { - EMPTY = 0; + ACTION_TYPE_UNKNOWN = 0; ADD_EDIT = 1; ADD_SUBSPACE = 2; REMOVE_SUBSPACE = 3; IMPORT_SPACE = 4; ARCHIVE_SPACE = 5; - ADD_EDITOR = 6; - REMOVE_EDITOR = 7; - ADD_MEMBER = 8; - REMOVE_MEMBER = 9; } message Import { string version = 1; ActionType type = 2; - string previousNetwork = 3; - string previousContractAddress = 4; + string previous_network = 3; + string previous_contract_address = 4; repeated string edits = 5; } message Options { string format = 1; string crop = 2; -} +} \ No newline at end of file diff --git a/sdk/src/models/proposal.rs b/sdk/src/models/proposal.rs index 027e539..786a54c 100644 --- a/sdk/src/models/proposal.rs +++ b/sdk/src/models/proposal.rs @@ -16,13 +16,10 @@ use super::BlockMetadata; #[serde(rename_all = "SCREAMING_SNAKE_CASE")] pub enum ProposalType { AddEdit, - ImportSpace, AddSubspace, RemoveSubspace, - AddEditor, - RemoveEditor, - AddMember, - RemoveMember, + ImportSpace, + ArchiveSpace, } impl TryFrom for ProposalType { @@ -30,14 +27,11 @@ impl TryFrom for ProposalType { fn try_from(action_type: pb::ipfs::ActionType) -> Result { match action_type { - pb::ipfs::ActionType::AddMember => Ok(Self::AddMember), - pb::ipfs::ActionType::RemoveMember => Ok(Self::RemoveMember), - pb::ipfs::ActionType::AddEditor => Ok(Self::AddEditor), - pb::ipfs::ActionType::RemoveEditor => Ok(Self::RemoveEditor), + pb::ipfs::ActionType::AddEdit => Ok(Self::AddEdit), pb::ipfs::ActionType::AddSubspace => Ok(Self::AddSubspace), pb::ipfs::ActionType::RemoveSubspace => Ok(Self::RemoveSubspace), - pb::ipfs::ActionType::AddEdit => Ok(Self::AddEdit), pb::ipfs::ActionType::ImportSpace => Ok(Self::ImportSpace), + pb::ipfs::ActionType::ArchiveSpace => Ok(Self::ArchiveSpace), _ => Err(format!("Invalid action type: {:?}", action_type)), } } diff --git a/sdk/src/pb/ipfs.rs b/sdk/src/pb/ipfs.rs index b826caa..0b5a2b4 100644 --- a/sdk/src/pb/ipfs.rs +++ b/sdk/src/pb/ipfs.rs @@ -10,10 +10,6 @@ pub struct IpfsMetadata { pub version: ::prost::alloc::string::String, #[prost(enumeration="ActionType", tag="2")] pub r#type: i32, - #[prost(string, tag="3")] - pub id: ::prost::alloc::string::String, - #[prost(string, tag="4")] - pub name: ::prost::alloc::string::String, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -64,6 +60,12 @@ pub struct Op { pub r#type: i32, #[prost(message, optional, tag="2")] pub triple: ::core::option::Option, + #[prost(message, optional, tag="3")] + pub entity: ::core::option::Option, + #[prost(message, optional, tag="4")] + pub relation: ::core::option::Option, + #[prost(message, repeated, tag="5")] + pub triples: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -85,31 +87,25 @@ pub struct Value { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct Membership { - #[prost(enumeration="ActionType", tag="1")] - pub r#type: i32, +pub struct Relation { + #[prost(string, tag="1")] + pub id: ::prost::alloc::string::String, #[prost(string, tag="2")] - pub name: ::prost::alloc::string::String, + pub r#type: ::prost::alloc::string::String, #[prost(string, tag="3")] - pub version: ::prost::alloc::string::String, + pub from_entity: ::prost::alloc::string::String, #[prost(string, tag="4")] - pub id: ::prost::alloc::string::String, + pub to_entity: ::prost::alloc::string::String, #[prost(string, tag="5")] - pub user: ::prost::alloc::string::String, + pub index: ::prost::alloc::string::String, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct Subspace { - #[prost(enumeration="ActionType", tag="1")] - pub r#type: i32, - #[prost(string, tag="2")] - pub name: ::prost::alloc::string::String, - #[prost(string, tag="3")] - pub version: ::prost::alloc::string::String, - #[prost(string, tag="4")] +pub struct Entity { + #[prost(string, tag="1")] pub id: ::prost::alloc::string::String, - #[prost(string, tag="5")] - pub subspace: ::prost::alloc::string::String, + #[prost(string, repeated, tag="2")] + pub types: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -136,9 +132,13 @@ pub struct Options { #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum OpType { - None = 0, + Unknown = 0, SetTriple = 1, DeleteTriple = 2, + SetTripleBatch = 3, + DeleteEntity = 4, + CreateRelation = 5, + DeleteRelation = 6, } impl OpType { /// String value of the enum field names used in the ProtoBuf definition. @@ -147,17 +147,25 @@ impl OpType { /// (if the ProtoBuf definition does not change) and safe for programmatic use. pub fn as_str_name(&self) -> &'static str { match self { - OpType::None => "NONE", + OpType::Unknown => "OP_TYPE_UNKNOWN", OpType::SetTriple => "SET_TRIPLE", OpType::DeleteTriple => "DELETE_TRIPLE", + OpType::SetTripleBatch => "SET_TRIPLE_BATCH", + OpType::DeleteEntity => "DELETE_ENTITY", + OpType::CreateRelation => "CREATE_RELATION", + OpType::DeleteRelation => "DELETE_RELATION", } } /// Creates an enum from field names used in the ProtoBuf definition. pub fn from_str_name(value: &str) -> ::core::option::Option { match value { - "NONE" => Some(Self::None), + "OP_TYPE_UNKNOWN" => Some(Self::Unknown), "SET_TRIPLE" => Some(Self::SetTriple), "DELETE_TRIPLE" => Some(Self::DeleteTriple), + "SET_TRIPLE_BATCH" => Some(Self::SetTripleBatch), + "DELETE_ENTITY" => Some(Self::DeleteEntity), + "CREATE_RELATION" => Some(Self::CreateRelation), + "DELETE_RELATION" => Some(Self::DeleteRelation), _ => None, } } @@ -180,7 +188,7 @@ impl ValueType { /// (if the ProtoBuf definition does not change) and safe for programmatic use. pub fn as_str_name(&self) -> &'static str { match self { - ValueType::Unknown => "UNKNOWN", + ValueType::Unknown => "VALUE_TYPE_UNKNOWN", ValueType::Text => "TEXT", ValueType::Number => "NUMBER", ValueType::Checkbox => "CHECKBOX", @@ -192,7 +200,7 @@ impl ValueType { /// Creates an enum from field names used in the ProtoBuf definition. pub fn from_str_name(value: &str) -> ::core::option::Option { match value { - "UNKNOWN" => Some(Self::Unknown), + "VALUE_TYPE_UNKNOWN" => Some(Self::Unknown), "TEXT" => Some(Self::Text), "NUMBER" => Some(Self::Number), "CHECKBOX" => Some(Self::Checkbox), @@ -206,16 +214,12 @@ impl ValueType { #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum ActionType { - Empty = 0, + Unknown = 0, AddEdit = 1, AddSubspace = 2, RemoveSubspace = 3, ImportSpace = 4, ArchiveSpace = 5, - AddEditor = 6, - RemoveEditor = 7, - AddMember = 8, - RemoveMember = 9, } impl ActionType { /// String value of the enum field names used in the ProtoBuf definition. @@ -224,31 +228,23 @@ impl ActionType { /// (if the ProtoBuf definition does not change) and safe for programmatic use. pub fn as_str_name(&self) -> &'static str { match self { - ActionType::Empty => "EMPTY", + ActionType::Unknown => "ACTION_TYPE_UNKNOWN", ActionType::AddEdit => "ADD_EDIT", ActionType::AddSubspace => "ADD_SUBSPACE", ActionType::RemoveSubspace => "REMOVE_SUBSPACE", ActionType::ImportSpace => "IMPORT_SPACE", ActionType::ArchiveSpace => "ARCHIVE_SPACE", - ActionType::AddEditor => "ADD_EDITOR", - ActionType::RemoveEditor => "REMOVE_EDITOR", - ActionType::AddMember => "ADD_MEMBER", - ActionType::RemoveMember => "REMOVE_MEMBER", } } /// Creates an enum from field names used in the ProtoBuf definition. pub fn from_str_name(value: &str) -> ::core::option::Option { match value { - "EMPTY" => Some(Self::Empty), + "ACTION_TYPE_UNKNOWN" => Some(Self::Unknown), "ADD_EDIT" => Some(Self::AddEdit), "ADD_SUBSPACE" => Some(Self::AddSubspace), "REMOVE_SUBSPACE" => Some(Self::RemoveSubspace), "IMPORT_SPACE" => Some(Self::ImportSpace), "ARCHIVE_SPACE" => Some(Self::ArchiveSpace), - "ADD_EDITOR" => Some(Self::AddEditor), - "REMOVE_EDITOR" => Some(Self::RemoveEditor), - "ADD_MEMBER" => Some(Self::AddMember), - "REMOVE_MEMBER" => Some(Self::RemoveMember), _ => None, } } diff --git a/sink/src/main.rs b/sink/src/main.rs index 16aa9d3..af86458 100644 --- a/sink/src/main.rs +++ b/sink/src/main.rs @@ -10,8 +10,8 @@ use tracing_subscriber::util::SubscriberInitExt; const PKG_FILE: &str = "geo-substream.spkg"; const MODULE_NAME: &str = "geo_out"; -const START_BLOCK: i64 = 28410; -const STOP_BLOCK: u64 = 0; +const DEFAULT_START_BLOCK: u64 = 880; +const DEFAULT_END_BLOCK: u64 = 0; #[tokio::main] async fn main() -> Result<(), Error> { @@ -19,6 +19,20 @@ async fn main() -> Result<(), Error> { init_tracing(); let endpoint_url = env::var("SUBSTREAMS_ENDPOINT_URL").expect("SUBSTREAMS_ENDPOINT_URL not set"); + let start_block = env::var("SUBSTREAMS_START_BLOCK").unwrap_or_else(|_| { + tracing::warn!( + "SUBSTREAMS_START_BLOCK not set. Using default value: {}", + DEFAULT_START_BLOCK + ); + DEFAULT_START_BLOCK.to_string() + }); + let end_block = env::var("SUBSTREAMS_END_BLOCK").unwrap_or_else(|_| { + tracing::warn!( + "SUBSTREAMS_END_BLOCK not set. Using default value: {}", + DEFAULT_END_BLOCK + ); + DEFAULT_END_BLOCK.to_string() + }); let args = AppArgs::parse(); @@ -39,8 +53,12 @@ async fn main() -> Result<(), Error> { &endpoint_url, PKG_FILE, MODULE_NAME, - START_BLOCK, - STOP_BLOCK, + start_block + .parse() + .unwrap_or_else(|_| panic!("Invalid start block: {}! Must be integer", start_block)), + end_block + .parse() + .unwrap_or_else(|_| panic!("Invalid end block: {}! Must be integer", end_block)), ) .await?;