-
Notifications
You must be signed in to change notification settings - Fork 1k
/
Copy pathcontext.rs
76 lines (71 loc) · 2.09 KB
/
context.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
use crate::{
polling_monitor::{
ipfs_service::IpfsService, spawn_monitor, PollingMonitor, PollingMonitorMetrics,
},
subgraph::SubgraphInstance,
};
use anyhow::{self, Error};
use bytes::Bytes;
use cid::Cid;
use graph::{
blockchain::Blockchain,
components::store::DeploymentId,
data_source::offchain,
env::ENV_VARS,
ipfs_client::IpfsClient,
prelude::{CancelGuard, DeploymentHash, MetricsRegistry, RuntimeHostBuilder},
slog::Logger,
tokio::sync::mpsc,
};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
pub type SharedInstanceKeepAliveMap = Arc<RwLock<HashMap<DeploymentId, CancelGuard>>>;
pub struct IndexingContext<C, T>
where
T: RuntimeHostBuilder<C>,
C: Blockchain,
{
pub instance: SubgraphInstance<C, T>,
pub instances: SharedInstanceKeepAliveMap,
pub filter: C::TriggerFilter,
pub offchain_monitor: OffchainMonitor,
}
pub struct OffchainMonitor {
ipfs_monitor: PollingMonitor<Cid>,
pub ipfs_monitor_rx: mpsc::Receiver<(Cid, Bytes)>,
pub data_sources: Vec<offchain::DataSource>,
}
impl OffchainMonitor {
pub fn new(
logger: Logger,
registry: Arc<dyn MetricsRegistry>,
subgraph_hash: &DeploymentHash,
client: IpfsClient,
) -> Self {
let (ipfs_monitor_tx, ipfs_monitor_rx) = mpsc::channel(10);
let ipfs_service = IpfsService::new(
client,
ENV_VARS.mappings.max_ipfs_file_bytes.unwrap_or(1 << 20) as u64,
ENV_VARS.mappings.ipfs_timeout,
10,
);
let ipfs_monitor = spawn_monitor(
ipfs_service,
ipfs_monitor_tx,
logger,
PollingMonitorMetrics::new(registry, subgraph_hash),
);
Self {
ipfs_monitor,
ipfs_monitor_rx,
data_sources: Vec::new(),
}
}
pub fn add_data_source(&mut self, ds: offchain::DataSource) -> Result<(), Error> {
match ds.source {
offchain::Source::Ipfs(cid) => self.ipfs_monitor.monitor(cid.clone()),
};
self.data_sources.push(ds);
Ok(())
}
}