Skip to content

Commit 400c24d

Browse files
committed
core, graph, store: Reset runner state only when needed
We only need to reset the state of the runner when the `WritableStore` actually had to be restarted because of an error; if it had an error, we have to reset the state. Use `SubgraphRunner.revert_state` to properly reset the runner state.
1 parent 2574b37 commit 400c24d

File tree

5 files changed

+20
-16
lines changed

5 files changed

+20
-16
lines changed

core/src/subgraph/runner.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -183,13 +183,13 @@ where
183183
// Restart the store to clear any errors that it
184184
// might have encountered and use that from now on
185185
let store = self.inputs.store.cheap_clone();
186-
let store = store.restart().await?;
187-
self.inputs = Arc::new(self.inputs.with_store(store));
188-
// Also clear the entity cache since we might have
189-
// entries in there that never made it to the
190-
// database
191-
self.state.entity_lfu_cache = LfuCache::new();
192-
self.state.synced = self.inputs.store.is_deployment_synced().await?;
186+
if let Some(store) = store.restart().await? {
187+
let last_good_block =
188+
store.block_ptr().map(|ptr| ptr.number).unwrap_or(0);
189+
self.revert_state(last_good_block)?;
190+
self.inputs = Arc::new(self.inputs.with_store(store));
191+
self.state.synced = self.inputs.store.is_deployment_synced().await?;
192+
}
193193
break;
194194
}
195195
};

graph/src/components/store/traits.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -331,10 +331,11 @@ pub trait WritableStore: ReadStore + DeploymentCursorTracker {
331331
/// contain unprocessed write requests that will be discarded by this
332332
/// call.
333333
///
334-
/// After this call, `self` should not be used anymore, as it will
335-
/// continue to produce errors for any write requests, and instead, the
336-
/// returned `WritableStore` should be used.
337-
async fn restart(self: Arc<Self>) -> Result<Arc<dyn WritableStore>, StoreError>;
334+
/// This call returns `None` if a restart was not needed because `self`
335+
/// had no errors. If it returns `Some`, `self` should not be used
336+
/// anymore, as it will continue to produce errors for any write
337+
/// requests, and instead, the returned `WritableStore` should be used.
338+
async fn restart(self: Arc<Self>) -> Result<Option<Arc<dyn WritableStore>>, StoreError>;
338339
}
339340

340341
#[async_trait]

store/postgres/src/writable.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -1316,16 +1316,19 @@ impl WritableStoreTrait for WritableStore {
13161316
self.writer.flush().await
13171317
}
13181318

1319-
async fn restart(self: Arc<Self>) -> Result<Arc<dyn WritableStoreTrait>, StoreError> {
1319+
async fn restart(self: Arc<Self>) -> Result<Option<Arc<dyn WritableStoreTrait>>, StoreError> {
13201320
if self.poisoned() {
13211321
let logger = self.store.logger.clone();
13221322
if let Err(e) = self.stop().await {
13231323
warn!(logger, "Writable had error when stopping, it is safe to ignore this error"; "error" => e.to_string());
13241324
}
13251325
let store = Arc::new(self.store.store.0.clone());
1326-
store.writable(logger, self.store.site.id.into()).await
1326+
store
1327+
.writable(logger, self.store.site.id.into())
1328+
.await
1329+
.map(|store| Some(store))
13271330
} else {
1328-
Ok(self)
1331+
Ok(None)
13291332
}
13301333
}
13311334
}

store/test-store/tests/graph/entity_cache.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ impl WritableStore for MockStore {
170170
unimplemented!()
171171
}
172172

173-
async fn restart(self: Arc<Self>) -> Result<Arc<dyn WritableStore>, StoreError> {
173+
async fn restart(self: Arc<Self>) -> Result<Option<Arc<dyn WritableStore>>, StoreError> {
174174
unimplemented!()
175175
}
176176
}

store/test-store/tests/postgres/writable.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ fn restart() {
192192

193193
// We now have a poisoned store. Restarting it gives us a new store
194194
// that works again
195-
let writable = writable.restart().await.unwrap();
195+
let writable = writable.restart().await.unwrap().unwrap();
196196
writable.flush().await.unwrap();
197197

198198
// Retry our write with correct data

0 commit comments

Comments
 (0)