Skip to content

Commit

Permalink
Merge branch 'main' into wcy/prefix-bloom-filter
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Dec 8, 2022
2 parents 62a68a5 + 0439b74 commit e524946
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 11 deletions.
24 changes: 13 additions & 11 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,25 @@ where
let mut info = self.resolve_actor_info_for_recovery().await;
let mut new_epoch = prev_epoch.next();

// Migrate expired actors to newly joined node by changing actor_map
let migrated = self.migrate_actors(&info).await?;
// Migrate actors in expired CN to newly joined one.
let migrated = self.migrate_actors(&info).await.inspect_err(|err| {
error!(err = ?err, "migrate actors failed");
})?;
if migrated {
info = self.resolve_actor_info_for_recovery().await;
}

// Reset all compute nodes, stop and drop existing actors.
self.reset_compute_nodes(&info).await.inspect_err(|e| {
error!("reset compute nodes failed: {}", e);
self.reset_compute_nodes(&info).await.inspect_err(|err| {
error!(err = ?err, "reset compute nodes failed");
})?;

// update and build all actors.
self.update_actors(&info).await.inspect_err(|e| {
error!("update actors failed: {}", e);
self.update_actors(&info).await.inspect_err(|err| {
error!(err = ?err, "update actors failed");
})?;
self.build_actors(&info).await.inspect_err(|e| {
error!("build_actors failed: {}", e);
self.build_actors(&info).await.inspect_err(|err| {
error!(err = ?err, "build_actors failed");
})?;

// get split assignments for all actors
Expand Down Expand Up @@ -174,13 +176,13 @@ where
match barrier_complete_rx.recv().await.unwrap() {
(_, Ok(response)) => {
if let Err(err) = command_ctx.post_collect().await {
error!("post_collect failed: {}", err);
error!(err = ?err, "post_collect failed");
return Err(err);
}
Ok((new_epoch, response))
}
(_, Err(err)) => {
error!("inject_barrier failed: {}", err);
error!(err = ?err, "inject_barrier failed");
Err(err)
}
}
Expand Down Expand Up @@ -333,7 +335,7 @@ where
async fn reset_compute_nodes(&self, info: &BarrierActorInfo) -> MetaResult<()> {
let futures = info.node_map.iter().map(|(_, worker_node)| async move {
let client = self.env.stream_client_pool().get(worker_node).await?;
debug!("force stop actors: {}", worker_node.id);
debug!(worker = ?worker_node.id, "force stop actors");
client
.force_stop_actors(ForceStopActorsRequest {
request_id: Uuid::new_v4().to_string(),
Expand Down
2 changes: 2 additions & 0 deletions src/risedevtool/src/task/etcd_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ impl EtcdService {
.arg("risedev-meta")
.arg("--max-txn-ops")
.arg("999999")
.arg("--max-request-bytes")
.arg("10485760")
.arg("--auto-compaction-mode")
.arg("periodic")
.arg("--auto-compaction-retention")
Expand Down

0 comments on commit e524946

Please sign in to comment.