-
Notifications
You must be signed in to change notification settings - Fork 1k
/
Copy pathmapper.rs
105 lines (92 loc) · 3.75 KB
/
mapper.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use crate::{Block, Chain, EntityChanges, TriggerData};
use graph::blockchain::block_stream::SubstreamsError::{
MultipleModuleOutputError, UnexpectedStoreDeltaOutput,
};
use graph::blockchain::block_stream::{
BlockStreamEvent, BlockWithTriggers, FirehoseCursor, SubstreamsError, SubstreamsMapper,
};
use graph::prelude::{async_trait, BlockHash, BlockNumber, BlockPtr, Logger};
use graph::substreams::module_output::Data;
use graph::substreams::{BlockScopedData, Clock, ForkStep};
use prost::Message;
pub struct Mapper {}
#[async_trait]
impl SubstreamsMapper<Chain> for Mapper {
async fn to_block_stream_event(
&self,
logger: &Logger,
block_scoped_data: &BlockScopedData,
) -> Result<Option<BlockStreamEvent<Chain>>, SubstreamsError> {
let BlockScopedData {
outputs,
clock,
step,
cursor: _,
} = block_scoped_data;
let step = ForkStep::from_i32(*step).unwrap_or_else(|| {
panic!(
"unknown step i32 value {}, maybe you forgot update & re-regenerate the protobuf definitions?",
step
)
});
if outputs.len() == 0 {
return Ok(None);
}
if outputs.len() > 1 {
return Err(MultipleModuleOutputError);
}
//todo: handle step
let module_output = &block_scoped_data.outputs[0];
let cursor = &block_scoped_data.cursor;
let clock = match clock {
Some(clock) => clock,
None => return Err(SubstreamsError::MissingClockError),
};
let Clock {
id: hash,
number,
timestamp: _,
} = clock;
let hash: BlockHash = hash.as_str().try_into()?;
let number: BlockNumber = *number as BlockNumber;
match module_output.data.as_ref() {
Some(Data::MapOutput(msg)) => {
let changes: EntityChanges = Message::decode(msg.value.as_slice())
.map_err(SubstreamsError::DecodingError)?;
use ForkStep::*;
match step {
StepIrreversible | StepNew => Ok(Some(BlockStreamEvent::ProcessBlock(
// Even though the trigger processor for substreams doesn't care about TriggerData
// there are a bunch of places in the runner that check if trigger data
// empty and skip processing if so. This will prolly breakdown
// close to head so we will need to improve things.
// TODO(filipe): Fix once either trigger data can be empty
// or we move the changes into trigger data.
BlockWithTriggers::new(
Block {
hash,
number,
changes,
},
vec![TriggerData {}],
logger,
),
FirehoseCursor::from(cursor.clone()),
))),
StepUndo => {
let parent_ptr = BlockPtr { hash, number };
Ok(Some(BlockStreamEvent::Revert(
parent_ptr,
FirehoseCursor::from(cursor.clone()),
)))
}
StepUnknown => {
panic!("unknown step should not happen in the Firehose response")
}
}
}
Some(Data::StoreDeltas(_)) => Err(UnexpectedStoreDeltaOutput),
_ => Err(SubstreamsError::ModuleOutputNotPresentOrUnexpected),
}
}
}