Skip to content

Commit

Permalink
Refactor locking; add more debug locking
Browse files Browse the repository at this point in the history
This is an attempt to help address #37.

Based on `strace` logging it appears at least one of the instances of
this bug occurred during a lock acquisition happening immediately after
persisting the chain state. The system call sequence looked something
like this:

```
close(12)   = 0
rmdir("/.../.atomicwrite.InysUcmuRax7") = 0
futex(0x..., FUTEX_WAIT_PRIVATE, 2, NULL
```

Unfortunately this isn't a whole lot to go on, but makes it appear as if
it's hanging trying to acquire a lock immediately after persisting the
consensus state to disk.

This commit does a couple things to try to narrow down what is
happening:

1. Ensures that an exclusive lock to the chain state isn't held while
   the signing operation is being performed (i.e. while communicating
   with the HSM). If we were able to update the consensus state, that
   means the signing operation is authorized, and we no longer need to
   hold the lock. In the event the signing operation fails, the
   validator will miss the block in question, but with no risk of
   double-signing.
2. Adds a significant amount of additional debug logging, particularly
   around things like lock acquisition and writing to disk. While this
   commit is unlikely to fix #37 in and of itself, the additional
   debug logging should be helpful in isolating the problem.
  • Loading branch information
tony-iqlusion committed Jun 8, 2020
1 parent 45a7370 commit f9e5de0
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 66 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions src/chain/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,22 @@ impl State {

/// Sync the current state to disk
fn sync_to_disk(&self) -> io::Result<()> {
debug!(
"writing new consensus state to {}: {:?}",
self.state_file_path.display(),
&self.consensus_state
);

let json = serde_json::to_string(&self.consensus_state)?;

AtomicFile::new(&self.state_file_path, OverwriteBehavior::AllowOverwrite)
.write(|f| f.write_all(json.as_bytes()))?;

debug!(
"successfully wrote new consensus state to {}",
self.state_file_path.display(),
);

Ok(())
}
}
Expand Down
177 changes: 112 additions & 65 deletions src/session.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! A session with a validator node
use crate::{
chain::{self, state::StateErrorKind},
chain::{self, state::StateErrorKind, Chain},
config::{TendermintVersion, ValidatorConfig},
connection::{tcp, unix::UnixConnection, Connection},
error::{Error, ErrorKind::*},
Expand All @@ -11,9 +11,7 @@ use crate::{
use prost_amino::Message;
use std::{fmt::Debug, os::unix::net::UnixStream, time::Instant};
use tendermint::{
amino_types::{
PingRequest, PingResponse, PubKeyRequest, PubKeyResponse, RemoteError, SignedMsgType,
},
amino_types::{PingResponse, PubKeyRequest, PubKeyResponse, RemoteError, SignedMsgType},
consensus, net,
};

Expand All @@ -36,7 +34,10 @@ impl Session {
host,
port,
} => {
debug!("{}: Connecting to {}...", &config.chain_id, &config.addr);
debug!(
"[{}@{}] connecting to validator...",
&config.chain_id, &config.addr
);

let seed = config.load_secret_key()?;
let v0_33_handshake = match config.protocol_version {
Expand All @@ -61,7 +62,7 @@ impl Session {
if peer_id.is_none() {
// TODO(tarcieri): make peer verification mandatory
warn!(
"[{}] {}: unverified validator peer ID! ({})",
"[{}@{}]: unverified validator peer ID! ({})",
&config.chain_id,
&config.addr,
conn.remote_pubkey().peer_id()
Expand Down Expand Up @@ -105,20 +106,20 @@ impl Session {
fn handle_request(&mut self) -> Result<bool, Error> {
let request = Request::read(&mut self.connection)?;
debug!(
"[{}:{}] received request: {:?}",
"[{}@{}] received request: {:?}",
&self.config.chain_id, &self.config.addr, &request
);

let response = match request {
Request::SignProposal(req) => self.sign(req)?,
Request::SignVote(req) => self.sign(req)?,
// non-signable requests:
Request::ReplyPing(ref req) => self.reply_ping(req),
Request::ReplyPing(_) => Response::Ping(PingResponse {}),
Request::ShowPublicKey(ref req) => self.get_public_key(req)?,
};

debug!(
"[{}:{}] sending response: {:?}",
"[{}@{}] sending response: {:?}",
&self.config.chain_id, &self.config.addr, &response
);

Expand All @@ -142,28 +143,56 @@ impl Session {
R: TendermintRequest + Debug,
{
request.validate()?;
self.check_max_height(&mut request)?;

debug!(
"[{}@{}] acquiring chain registry (for signing)",
&self.config.chain_id, &self.config.addr
);

let registry = chain::REGISTRY.get();

// unwrap is acceptable here as chain presence is validated in client.rs's
// `register_chain` function.
let chain = registry.get_chain(&self.config.chain_id).unwrap();
debug!(
"[{}@{}] acquiring read-only shared lock on chain",
&self.config.chain_id, &self.config.addr
);

let (_, request_state) = parse_request(&request)?;
let mut chain_state = chain.state.lock().unwrap();
let chain = registry
.get_chain(&self.config.chain_id)
.unwrap_or_else(|| {
panic!("chain '{}' missing from registry!", &self.config.chain_id);
});

if let Err(e) = chain_state.update_consensus_state(request_state) {
// Report double signing error back to the validator
if e.kind() == StateErrorKind::DoubleSign {
return self.handle_double_signing(
request,
&chain_state.consensus_state().block_id_prefix(),
);
} else {
return Err(e.into());
}
if let Some(remote_err) = self.update_consensus_state(chain, &request)? {
// In the event of double signing we send a response to notify the validator
return Ok(request.build_response(Some(remote_err)));
}

let mut to_sign = vec![];
request.sign_bytes(self.config.chain_id, &mut to_sign)?;

debug!(
"[{}@{}] performing signature",
&self.config.chain_id, &self.config.addr
);

let started_at = Instant::now();

// TODO(ismail): figure out which key to use here instead of taking the only key
let signature = chain.keyring.sign_ed25519(None, &to_sign)?;

self.log_signing_request(&request, started_at).unwrap();
request.set_signature(&signature);

Ok(request.build_response(None))
}

/// If a max block height is configured, ensure the block we're signing
/// doesn't exceed it
fn check_max_height<R>(&mut self, request: &mut R) -> Result<(), Error>
where
R: TendermintRequest + Debug,
{
if let Some(max_height) = self.config.max_height {
if let Some(height) = request.height() {
if height > max_height.value() as i64 {
Expand All @@ -177,32 +206,75 @@ impl Session {
}
}

let mut to_sign = vec![];
request.sign_bytes(self.config.chain_id, &mut to_sign)?;
Ok(())
}

// TODO(ismail): figure out which key to use here instead of taking the only key
let started_at = Instant::now();
let signature = chain.keyring.sign_ed25519(None, &to_sign)?;
/// Update our local knowledge of the chain's consensus state, detecting
/// attempted double signing and sending a response in the event it happens
fn update_consensus_state<R>(
&mut self,
chain: &Chain,
request: &R,
) -> Result<Option<RemoteError>, Error>
where
R: TendermintRequest + Debug,
{
let (msg_type, request_state) = parse_request(request)?;

self.log_signing_request(&request, started_at).unwrap();
debug!(
"[{}@{}] acquiring read-write exclusive lock on chain",
&self.config.chain_id, &self.config.addr
);

request.set_signature(&signature);
let mut chain_state = chain.state.lock().unwrap();

Ok(request.build_response(None))
}
debug!(
"[{}@{}] updating consensus state to: {:?}",
&self.config.chain_id, &self.config.addr, &request_state
);

/// Reply to a ping request
fn reply_ping(&mut self, _request: &PingRequest) -> Response {
debug!("replying with PingResponse");
Response::Ping(PingResponse {})
match chain_state.update_consensus_state(request_state.clone()) {
Ok(()) => Ok(None),
Err(e) if e.kind() == StateErrorKind::DoubleSign => {
// Report double signing error back to the validator
let original_block_id = chain_state.consensus_state().block_id_prefix();

error!(
"[{}@{}] attempted double sign {:?} at h/r/s: {} ({} != {})",
&self.config.chain_id,
&self.config.addr,
msg_type,
request_state,
original_block_id,
request_state.block_id_prefix()
);

let remote_err = RemoteError::double_sign(request_state.height.into());
Ok(Some(remote_err))
}
Err(e) => Err(e.into()),
}
}

/// Get the public key for (the only) public key in the keyring
fn get_public_key(&mut self, _request: &PubKeyRequest) -> Result<Response, Error> {
// unwrap is acceptable here as chain presence is validated in client.rs's
// `register_chain` function.
debug!(
"[{}@{}] acquiring chain registry (for public key)",
&self.config.chain_id, &self.config.addr
);

let registry = chain::REGISTRY.get();
let chain = registry.get_chain(&self.config.chain_id).unwrap();

debug!(
"[{}@{}] acquiring read-only shared lock on chain",
&self.config.chain_id, &self.config.addr
);

let chain = registry
.get_chain(&self.config.chain_id)
.unwrap_or_else(|| {
panic!("chain '{}' missing from registry!", &self.config.chain_id);
});

Ok(Response::PublicKey(PubKeyResponse::from(
*chain.keyring.default_pubkey()?,
Expand All @@ -228,31 +300,6 @@ impl Session {

Ok(())
}

/// Handle attempted double signing
fn handle_double_signing<R>(
&self,
request: R,
original_block_id: &str,
) -> Result<Response, Error>
where
R: TendermintRequest + Debug,
{
let (msg_type, request_state) = parse_request(&request)?;

error!(
"[{}:{}] attempted double sign {:?} at h/r/s: {} ({} != {})",
&self.config.chain_id,
&self.config.addr,
msg_type,
request_state,
original_block_id,
request_state.block_id_prefix()
);

let remote_err = RemoteError::double_sign(request.height().unwrap());
Ok(request.build_response(Some(remote_err)))
}
}

/// Parse the consensus state from an incoming request
Expand Down

0 comments on commit f9e5de0

Please sign in to comment.