Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor locking; add more debug locking #60

Merged
merged 1 commit into from
Jun 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions src/chain/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,22 @@ impl State {

/// Sync the current state to disk
fn sync_to_disk(&self) -> io::Result<()> {
debug!(
"writing new consensus state to {}: {:?}",
self.state_file_path.display(),
&self.consensus_state
);

let json = serde_json::to_string(&self.consensus_state)?;

AtomicFile::new(&self.state_file_path, OverwriteBehavior::AllowOverwrite)
.write(|f| f.write_all(json.as_bytes()))?;

debug!(
"successfully wrote new consensus state to {}",
self.state_file_path.display(),
);

Ok(())
}
}
Expand Down
177 changes: 112 additions & 65 deletions src/session.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! A session with a validator node

use crate::{
chain::{self, state::StateErrorKind},
chain::{self, state::StateErrorKind, Chain},
config::{TendermintVersion, ValidatorConfig},
connection::{tcp, unix::UnixConnection, Connection},
error::{Error, ErrorKind::*},
Expand All @@ -11,9 +11,7 @@ use crate::{
use prost_amino::Message;
use std::{fmt::Debug, os::unix::net::UnixStream, time::Instant};
use tendermint::{
amino_types::{
PingRequest, PingResponse, PubKeyRequest, PubKeyResponse, RemoteError, SignedMsgType,
},
amino_types::{PingResponse, PubKeyRequest, PubKeyResponse, RemoteError, SignedMsgType},
consensus, net,
};

Expand All @@ -36,7 +34,10 @@ impl Session {
host,
port,
} => {
debug!("{}: Connecting to {}...", &config.chain_id, &config.addr);
debug!(
"[{}@{}] connecting to validator...",
&config.chain_id, &config.addr
);

let seed = config.load_secret_key()?;
let v0_33_handshake = match config.protocol_version {
Expand All @@ -61,7 +62,7 @@ impl Session {
if peer_id.is_none() {
// TODO(tarcieri): make peer verification mandatory
warn!(
"[{}] {}: unverified validator peer ID! ({})",
"[{}@{}]: unverified validator peer ID! ({})",
&config.chain_id,
&config.addr,
conn.remote_pubkey().peer_id()
Expand Down Expand Up @@ -105,20 +106,20 @@ impl Session {
fn handle_request(&mut self) -> Result<bool, Error> {
let request = Request::read(&mut self.connection)?;
debug!(
"[{}:{}] received request: {:?}",
"[{}@{}] received request: {:?}",
&self.config.chain_id, &self.config.addr, &request
);

let response = match request {
Request::SignProposal(req) => self.sign(req)?,
Request::SignVote(req) => self.sign(req)?,
// non-signable requests:
Request::ReplyPing(ref req) => self.reply_ping(req),
Request::ReplyPing(_) => Response::Ping(PingResponse {}),
Request::ShowPublicKey(ref req) => self.get_public_key(req)?,
};

debug!(
"[{}:{}] sending response: {:?}",
"[{}@{}] sending response: {:?}",
&self.config.chain_id, &self.config.addr, &response
);

Expand All @@ -142,28 +143,56 @@ impl Session {
R: TendermintRequest + Debug,
{
request.validate()?;
self.check_max_height(&mut request)?;

debug!(
"[{}@{}] acquiring chain registry (for signing)",
&self.config.chain_id, &self.config.addr
);

let registry = chain::REGISTRY.get();

// unwrap is acceptable here as chain presence is validated in client.rs's
// `register_chain` function.
let chain = registry.get_chain(&self.config.chain_id).unwrap();
debug!(
"[{}@{}] acquiring read-only shared lock on chain",
&self.config.chain_id, &self.config.addr
);

let (_, request_state) = parse_request(&request)?;
let mut chain_state = chain.state.lock().unwrap();
let chain = registry
.get_chain(&self.config.chain_id)
.unwrap_or_else(|| {
panic!("chain '{}' missing from registry!", &self.config.chain_id);
});

if let Err(e) = chain_state.update_consensus_state(request_state) {
// Report double signing error back to the validator
if e.kind() == StateErrorKind::DoubleSign {
return self.handle_double_signing(
request,
&chain_state.consensus_state().block_id_prefix(),
);
} else {
return Err(e.into());
}
if let Some(remote_err) = self.update_consensus_state(chain, &request)? {
// In the event of double signing we send a response to notify the validator
return Ok(request.build_response(Some(remote_err)));
}

let mut to_sign = vec![];
request.sign_bytes(self.config.chain_id, &mut to_sign)?;

debug!(
"[{}@{}] performing signature",
&self.config.chain_id, &self.config.addr
);

let started_at = Instant::now();

// TODO(ismail): figure out which key to use here instead of taking the only key
let signature = chain.keyring.sign_ed25519(None, &to_sign)?;

self.log_signing_request(&request, started_at).unwrap();
request.set_signature(&signature);

Ok(request.build_response(None))
}

/// If a max block height is configured, ensure the block we're signing
/// doesn't exceed it
fn check_max_height<R>(&mut self, request: &mut R) -> Result<(), Error>
where
R: TendermintRequest + Debug,
{
if let Some(max_height) = self.config.max_height {
if let Some(height) = request.height() {
if height > max_height.value() as i64 {
Expand All @@ -177,32 +206,75 @@ impl Session {
}
}

let mut to_sign = vec![];
request.sign_bytes(self.config.chain_id, &mut to_sign)?;
Ok(())
}

// TODO(ismail): figure out which key to use here instead of taking the only key
let started_at = Instant::now();
let signature = chain.keyring.sign_ed25519(None, &to_sign)?;
/// Update our local knowledge of the chain's consensus state, detecting
/// attempted double signing and sending a response in the event it happens
fn update_consensus_state<R>(
&mut self,
chain: &Chain,
request: &R,
) -> Result<Option<RemoteError>, Error>
where
R: TendermintRequest + Debug,
{
let (msg_type, request_state) = parse_request(request)?;

self.log_signing_request(&request, started_at).unwrap();
debug!(
"[{}@{}] acquiring read-write exclusive lock on chain",
&self.config.chain_id, &self.config.addr
);

request.set_signature(&signature);
let mut chain_state = chain.state.lock().unwrap();

Ok(request.build_response(None))
}
debug!(
"[{}@{}] updating consensus state to: {:?}",
&self.config.chain_id, &self.config.addr, &request_state
);

/// Reply to a ping request
fn reply_ping(&mut self, _request: &PingRequest) -> Response {
debug!("replying with PingResponse");
Response::Ping(PingResponse {})
match chain_state.update_consensus_state(request_state.clone()) {
Ok(()) => Ok(None),
Err(e) if e.kind() == StateErrorKind::DoubleSign => {
// Report double signing error back to the validator
let original_block_id = chain_state.consensus_state().block_id_prefix();

error!(
"[{}@{}] attempted double sign {:?} at h/r/s: {} ({} != {})",
&self.config.chain_id,
&self.config.addr,
msg_type,
request_state,
original_block_id,
request_state.block_id_prefix()
);

let remote_err = RemoteError::double_sign(request_state.height.into());
Ok(Some(remote_err))
}
Err(e) => Err(e.into()),
}
}

/// Get the public key for (the only) public key in the keyring
fn get_public_key(&mut self, _request: &PubKeyRequest) -> Result<Response, Error> {
// unwrap is acceptable here as chain presence is validated in client.rs's
// `register_chain` function.
debug!(
"[{}@{}] acquiring chain registry (for public key)",
&self.config.chain_id, &self.config.addr
);

let registry = chain::REGISTRY.get();
let chain = registry.get_chain(&self.config.chain_id).unwrap();

debug!(
"[{}@{}] acquiring read-only shared lock on chain",
&self.config.chain_id, &self.config.addr
);

let chain = registry
.get_chain(&self.config.chain_id)
.unwrap_or_else(|| {
panic!("chain '{}' missing from registry!", &self.config.chain_id);
});

Ok(Response::PublicKey(PubKeyResponse::from(
*chain.keyring.default_pubkey()?,
Expand All @@ -228,31 +300,6 @@ impl Session {

Ok(())
}

/// Handle attempted double signing
fn handle_double_signing<R>(
&self,
request: R,
original_block_id: &str,
) -> Result<Response, Error>
where
R: TendermintRequest + Debug,
{
let (msg_type, request_state) = parse_request(&request)?;

error!(
"[{}:{}] attempted double sign {:?} at h/r/s: {} ({} != {})",
&self.config.chain_id,
&self.config.addr,
msg_type,
request_state,
original_block_id,
request_state.block_id_prefix()
);

let remote_err = RemoteError::double_sign(request.height().unwrap());
Ok(request.build_response(Some(remote_err)))
}
}

/// Parse the consensus state from an incoming request
Expand Down