-
Notifications
You must be signed in to change notification settings - Fork 1k
/
Copy pathfirehose.rs
102 lines (95 loc) · 3.26 KB
/
firehose.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use anyhow::Error;
use graph::{
env::env_var,
prelude::{prost, tokio, tonic},
{firehose, firehose::FirehoseEndpoint, firehose::ForkStep},
};
use graph_chain_ethereum::codec;
use hex::ToHex;
use prost::Message;
use std::sync::Arc;
use tonic::Streaming;
#[tokio::main]
async fn main() -> Result<(), Error> {
let mut cursor: Option<String> = None;
let token_env = env_var("SF_API_TOKEN", "".to_string());
let mut token: Option<String> = None;
if token_env.len() > 0 {
token = Some(token_env);
}
let firehose = Arc::new(FirehoseEndpoint::new(
"firehose",
"https://api.streamingfast.io:443",
token,
false,
false,
1,
));
loop {
println!("Connecting to the stream!");
let mut stream: Streaming<firehose::Response> = match firehose
.clone()
.stream_blocks(firehose::Request {
start_block_num: 12369739,
stop_block_num: 12369739,
start_cursor: match &cursor {
Some(c) => c.clone(),
None => String::from(""),
},
fork_steps: vec![ForkStep::StepNew as i32, ForkStep::StepUndo as i32],
..Default::default()
})
.await
{
Ok(s) => s,
Err(e) => {
println!("Could not connect to stream! {}", e);
continue;
}
};
loop {
let resp = match stream.message().await {
Ok(Some(t)) => t,
Ok(None) => {
println!("Stream completed");
return Ok(());
}
Err(e) => {
println!("Error getting message {}", e);
break;
}
};
let b = codec::Block::decode(resp.block.unwrap().value.as_ref());
match b {
Ok(b) => {
println!(
"Block #{} ({}) ({})",
b.number,
hex::encode(b.hash),
resp.step
);
b.transaction_traces.iter().for_each(|trx| {
let mut logs: Vec<String> = vec![];
trx.calls.iter().for_each(|call| {
call.logs.iter().for_each(|log| {
logs.push(format!(
"Log {} Topics, Address {}, Trx Index {}, Block Index {}",
log.topics.len(),
log.address.encode_hex::<String>(),
log.index,
log.block_index
));
})
});
if logs.len() > 0 {
println!("Transaction {}", trx.hash.encode_hex::<String>());
logs.iter().for_each(|log| println!("{}", log));
}
});
cursor = Some(resp.cursor)
}
Err(e) => panic!("Unable to decode {:?}", e),
}
}
}
}