Skip to content

Commit 8ea2995

Browse files
authored
Firehose cursor newtype (#3754)
* firehose: Add a newtype for FirehoseCursor This makes working with the cursor more uniform across various places. A practical benefit is that it normalizes `Some("")` to `None`. This also removes `update_firehose_cursor` by merging it with `revert_block_ptr`. * firehose: Remove dead `fn delete_block_cursor`
1 parent e9daeae commit 8ea2995

File tree

19 files changed

+167
-179
lines changed

19 files changed

+167
-179
lines changed

chain/arweave/src/chain.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::{
2929
codec,
3030
data_source::{DataSource, UnresolvedDataSource},
3131
};
32-
use graph::blockchain::block_stream::BlockStream;
32+
use graph::blockchain::block_stream::{BlockStream, FirehoseCursor};
3333

3434
pub struct Chain {
3535
logger_factory: LoggerFactory,
@@ -98,7 +98,7 @@ impl Blockchain for Chain {
9898
async fn new_firehose_block_stream(
9999
&self,
100100
deployment: DeploymentLocator,
101-
block_cursor: Option<String>,
101+
block_cursor: FirehoseCursor,
102102
start_blocks: Vec<BlockNumber>,
103103
subgraph_current_block: Option<BlockPtr>,
104104
filter: Arc<Self::TriggerFilter>,
@@ -287,7 +287,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
287287
match step {
288288
StepNew => Ok(BlockStreamEvent::ProcessBlock(
289289
adapter.triggers_in_block(logger, block, filter).await?,
290-
Some(response.cursor.clone()),
290+
FirehoseCursor::from(response.cursor.clone()),
291291
)),
292292

293293
StepUndo => {
@@ -297,7 +297,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
297297

298298
Ok(BlockStreamEvent::Revert(
299299
parent_ptr,
300-
Some(response.cursor.clone()),
300+
FirehoseCursor::from(response.cursor.clone()),
301301
))
302302
}
303303

chain/cosmos/src/chain.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::sync::Arc;
22

3+
use graph::blockchain::block_stream::FirehoseCursor;
34
use graph::cheap_clone::CheapClone;
45
use graph::data::subgraph::UnifiedMappingApiVersion;
56
use graph::prelude::MetricsRegistry;
@@ -95,7 +96,7 @@ impl Blockchain for Chain {
9596
async fn new_firehose_block_stream(
9697
&self,
9798
deployment: DeploymentLocator,
98-
block_cursor: Option<String>,
99+
block_cursor: FirehoseCursor,
99100
start_blocks: Vec<BlockNumber>,
100101
subgraph_current_block: Option<BlockPtr>,
101102
filter: Arc<Self::TriggerFilter>,
@@ -310,7 +311,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
310311
match step {
311312
ForkStep::StepNew => Ok(BlockStreamEvent::ProcessBlock(
312313
adapter.triggers_in_block(logger, sp, filter).await?,
313-
Some(response.cursor.clone()),
314+
FirehoseCursor::from(response.cursor.clone()),
314315
)),
315316

316317
ForkStep::StepUndo => {
@@ -321,7 +322,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
321322

322323
Ok(BlockStreamEvent::Revert(
323324
parent_ptr,
324-
Some(response.cursor.clone()),
325+
FirehoseCursor::from(response.cursor.clone()),
325326
))
326327
}
327328

chain/ethereum/src/chain.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
5555
&self,
5656
chain: &Chain,
5757
deployment: DeploymentLocator,
58-
block_cursor: Option<String>,
58+
block_cursor: FirehoseCursor,
5959
start_blocks: Vec<BlockNumber>,
6060
subgraph_current_block: Option<BlockPtr>,
6161
filter: Arc<<Chain as Blockchain>::TriggerFilter>,
@@ -277,7 +277,7 @@ impl Blockchain for Chain {
277277
async fn new_firehose_block_stream(
278278
&self,
279279
deployment: DeploymentLocator,
280-
block_cursor: Option<String>,
280+
block_cursor: FirehoseCursor,
281281
start_blocks: Vec<BlockNumber>,
282282
subgraph_current_block: Option<BlockPtr>,
283283
filter: Arc<Self::TriggerFilter>,
@@ -623,7 +623,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
623623

624624
Ok(BlockStreamEvent::ProcessBlock(
625625
block_with_triggers,
626-
FirehoseCursor::Some(response.cursor.clone()),
626+
FirehoseCursor::from(response.cursor.clone()),
627627
))
628628
}
629629

@@ -634,7 +634,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
634634

635635
Ok(BlockStreamEvent::Revert(
636636
parent_ptr,
637-
FirehoseCursor::Some(response.cursor.clone()),
637+
FirehoseCursor::from(response.cursor.clone()),
638638
))
639639
}
640640

chain/near/src/chain.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::{
3030
codec,
3131
data_source::{DataSource, UnresolvedDataSource},
3232
};
33-
use graph::blockchain::block_stream::{BlockStream, BlockStreamBuilder};
33+
use graph::blockchain::block_stream::{BlockStream, BlockStreamBuilder, FirehoseCursor};
3434

3535
pub struct NearStreamBuilder {}
3636

@@ -40,7 +40,7 @@ impl BlockStreamBuilder<Chain> for NearStreamBuilder {
4040
&self,
4141
chain: &Chain,
4242
deployment: DeploymentLocator,
43-
block_cursor: Option<String>,
43+
block_cursor: FirehoseCursor,
4444
start_blocks: Vec<BlockNumber>,
4545
subgraph_current_block: Option<BlockPtr>,
4646
filter: Arc<<Chain as Blockchain>::TriggerFilter>,
@@ -161,7 +161,7 @@ impl Blockchain for Chain {
161161
async fn new_firehose_block_stream(
162162
&self,
163163
deployment: DeploymentLocator,
164-
block_cursor: Option<String>,
164+
block_cursor: FirehoseCursor,
165165
start_blocks: Vec<BlockNumber>,
166166
subgraph_current_block: Option<BlockPtr>,
167167
filter: Arc<Self::TriggerFilter>,
@@ -354,7 +354,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
354354
match step {
355355
StepNew => Ok(BlockStreamEvent::ProcessBlock(
356356
adapter.triggers_in_block(logger, block, filter).await?,
357-
Some(response.cursor.clone()),
357+
FirehoseCursor::from(response.cursor.clone()),
358358
)),
359359

360360
StepUndo => {
@@ -365,7 +365,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
365365

366366
Ok(BlockStreamEvent::Revert(
367367
parent_ptr,
368-
Some(response.cursor.clone()),
368+
FirehoseCursor::from(response.cursor.clone()),
369369
))
370370
}
371371

core/src/subgraph/runner.rs

+8-10
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::subgraph::state::IndexingState;
66
use crate::subgraph::stream::new_block_stream;
77
use crate::subgraph::SubgraphInstance;
88
use atomic_refcell::AtomicRefCell;
9-
use graph::blockchain::block_stream::{BlockStreamEvent, BlockWithTriggers};
9+
use graph::blockchain::block_stream::{BlockStreamEvent, BlockWithTriggers, FirehoseCursor};
1010
use graph::blockchain::{Block, Blockchain, DataSource, TriggerFilter as _};
1111
use graph::components::{
1212
store::ModificationsAndCache,
@@ -139,7 +139,7 @@ where
139139
&mut self,
140140
block_stream_cancel_handle: &CancelHandle,
141141
block: BlockWithTriggers<C>,
142-
firehose_cursor: Option<String>,
142+
firehose_cursor: FirehoseCursor,
143143
) -> Result<Action, BlockProcessingError> {
144144
let triggers = block.trigger_data;
145145
let block = Arc::new(block.block);
@@ -556,13 +556,13 @@ trait StreamEventHandler<C: Blockchain> {
556556
async fn handle_process_block(
557557
&mut self,
558558
block: BlockWithTriggers<C>,
559-
cursor: Option<String>,
559+
cursor: FirehoseCursor,
560560
cancel_handle: &CancelHandle,
561561
) -> Result<Action, Error>;
562562
async fn handle_revert(
563563
&mut self,
564564
revert_to_ptr: BlockPtr,
565-
cursor: Option<String>,
565+
cursor: FirehoseCursor,
566566
) -> Result<Action, Error>;
567567
async fn handle_err(
568568
&mut self,
@@ -580,7 +580,7 @@ where
580580
async fn handle_process_block(
581581
&mut self,
582582
block: BlockWithTriggers<C>,
583-
cursor: Option<String>,
583+
cursor: FirehoseCursor,
584584
cancel_handle: &CancelHandle,
585585
) -> Result<Action, Error> {
586586
let block_ptr = block.ptr();
@@ -614,9 +614,7 @@ where
614614

615615
let start = Instant::now();
616616

617-
let res = self
618-
.process_block(&cancel_handle, block, cursor.into())
619-
.await;
617+
let res = self.process_block(&cancel_handle, block, cursor).await;
620618

621619
let elapsed = start.elapsed().as_secs_f64();
622620
self.metrics
@@ -780,7 +778,7 @@ where
780778
async fn handle_revert(
781779
&mut self,
782780
revert_to_ptr: BlockPtr,
783-
cursor: Option<String>,
781+
cursor: FirehoseCursor,
784782
) -> Result<Action, Error> {
785783
// Current deployment head in the database / WritableAgent Mutex cache.
786784
//
@@ -797,7 +795,7 @@ where
797795
if let Err(e) = self
798796
.inputs
799797
.store
800-
.revert_block_operations(revert_to_ptr, cursor.as_deref())
798+
.revert_block_operations(revert_to_ptr, cursor)
801799
.await
802800
{
803801
error!(&self.logger, "Could not revert block. Retrying"; "error" => %e);

graph/src/blockchain/block_stream.rs

+50-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use anyhow::Error;
22
use async_stream::stream;
33
use futures03::Stream;
4+
use std::fmt;
45
use std::sync::Arc;
56
use thiserror::Error;
67
use tokio::sync::mpsc::{self, Receiver, Sender};
@@ -89,7 +90,7 @@ pub trait BlockStreamBuilder<C: Blockchain>: Send + Sync {
8990
&self,
9091
chain: &C,
9192
deployment: DeploymentLocator,
92-
block_cursor: Option<String>,
93+
block_cursor: FirehoseCursor,
9394
start_blocks: Vec<BlockNumber>,
9495
subgraph_current_block: Option<BlockPtr>,
9596
filter: Arc<C::TriggerFilter>,
@@ -107,7 +108,50 @@ pub trait BlockStreamBuilder<C: Blockchain>: Send + Sync {
107108
) -> Result<Box<dyn BlockStream<C>>>;
108109
}
109110

110-
pub type FirehoseCursor = Option<String>;
111+
#[derive(Debug, Clone)]
112+
pub struct FirehoseCursor(Option<String>);
113+
114+
impl FirehoseCursor {
115+
#[allow(non_upper_case_globals)]
116+
pub const None: Self = FirehoseCursor(None);
117+
118+
pub fn is_none(&self) -> bool {
119+
self.0.is_none()
120+
}
121+
}
122+
123+
impl fmt::Display for FirehoseCursor {
124+
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
125+
f.write_str(&self.0.as_deref().unwrap_or(""))
126+
}
127+
}
128+
129+
impl From<String> for FirehoseCursor {
130+
fn from(cursor: String) -> Self {
131+
// Treat a cursor of "" as None, not absolutely necessary for correctness since the firehose
132+
// treats both as the same, but makes it a little clearer.
133+
if cursor == "" {
134+
FirehoseCursor::None
135+
} else {
136+
FirehoseCursor(Some(cursor))
137+
}
138+
}
139+
}
140+
141+
impl From<Option<String>> for FirehoseCursor {
142+
fn from(cursor: Option<String>) -> Self {
143+
match cursor {
144+
None => FirehoseCursor::None,
145+
Some(s) => FirehoseCursor::from(s),
146+
}
147+
}
148+
}
149+
150+
impl AsRef<Option<String>> for FirehoseCursor {
151+
fn as_ref(&self) -> &Option<String> {
152+
&self.0
153+
}
154+
}
111155

112156
#[derive(Debug)]
113157
pub struct BlockWithTriggers<C: Blockchain> {
@@ -335,7 +379,9 @@ mod test {
335379
ext::futures::{CancelableError, SharedCancelGuard, StreamExtension},
336380
};
337381

338-
use super::{BlockStream, BlockStreamEvent, BlockWithTriggers, BufferedBlockStream};
382+
use super::{
383+
BlockStream, BlockStreamEvent, BlockWithTriggers, BufferedBlockStream, FirehoseCursor,
384+
};
339385

340386
#[derive(Debug)]
341387
struct TestStream {
@@ -359,7 +405,7 @@ mod test {
359405
},
360406
trigger_data: vec![],
361407
},
362-
None,
408+
FirehoseCursor::None,
363409
))))
364410
}
365411
}

0 commit comments

Comments
 (0)