Skip to content

Commit

Permalink
Issue 245: Retry when controller receives unauthenticated error (#250)
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Wenqi Mou authored Apr 28, 2021
1 parent a026f15 commit 766502e
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 36 deletions.
11 changes: 9 additions & 2 deletions controller-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,12 +568,19 @@ impl ControllerClientImpl {
| Code::AlreadyExists
| Code::PermissionDenied
| Code::OutOfRange
| Code::Unimplemented
| Code::Unauthenticated => ControllerError::OperationError {
| Code::Unimplemented => ControllerError::OperationError {
can_retry: false,
operation: operation_name.into(),
error_msg: status.to_string(),
},
Code::Unauthenticated => {
self.reset().await;
ControllerError::OperationError {
can_retry: true,
operation: operation_name.into(),
error_msg: status.to_string(),
}
}
Code::Unknown => {
self.reset().await;
ControllerError::ConnectionError {
Expand Down
6 changes: 3 additions & 3 deletions integration_test/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use tar::Archive;
use tracing::info;

const LIBRARY: &str = "pravega";
const VERSION: &str = "0.9.0";
const TAG: &str = "v0.9.0";
const VERSION: &str = "0.10.0-2876.1c3854e-SNAPSHOT";
const TAG: &str = "refresh-cert";
const BASE: &str = "./";

fn main() {
Expand Down Expand Up @@ -47,7 +47,7 @@ fn remove_suffix(value: &mut String, suffix: &str) {
/// Downloads and unpacks a prebuilt binary. Only works for certain platforms.
fn install_prebuilt() {
let url = format!(
"https://github.com/pravega/pravega/releases/download/{}/pravega-{}.tgz",
"https://github.com/Tristan1900/pravega/releases/download/{}/pravega-{}.tgz",
TAG, VERSION
);
let short_file_name = url.split('/').last().unwrap();
Expand Down
34 changes: 17 additions & 17 deletions integration_test/ca-cert.crt
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIDDDCCAfSgAwIBAgIJAL0NWaqWdxAsMA0GCSqGSIb3DQEBCwUAMBsxGTAXBgNV
BAMMEFByYXZlZ2EtU3RhY2stQ0EwHhcNMTkwNDI1MDM1ODExWhcNMjEwNDI0MDM1
ODExWjAbMRkwFwYDVQQDDBBQcmF2ZWdhLVN0YWNrLUNBMIIBIjANBgkqhkiG9w0B
AQEFAAOCAQ8AMIIBCgKCAQEA4CN6rLGJyoJkcekmrIKUyPFatuPjVGX+ECJeOYyA
BAgnSWWE0iXgNrGtwhUaPZomekyjiNFjskPYVxwSo1+Wf2Uf3T95K23ntX/IkMpw
35sXp2XsuyahvWt8yrnb6oCgQYHbECQvosHQ+nwHwqGo7ZM2AYwl7YG7ilLFVfcS
wtnBwM8/mU3MJ5tF6pbBEH4fDegYdqt5lSuugJc6Iba9+T0AFZggLhNU1Q127VAn
sLhv/CV1ye6lXhCwFvQKppgiWdgwMwmui1TV/AYGHKPa2NzLFhegXjkzmGBeSDW2
71Wq8NPmyjOITPep2Il11bmpZlONp/l7LP+0QHim1GoAlQIDAQABo1MwUTAdBgNV
HQ4EFgQU4CdeRGkrmZSuj7QPTZRLhUYimckwHwYDVR0jBBgwFoAU4CdeRGkrmZSu
j7QPTZRLhUYimckwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEA
cja7eO93nwRrptcxIPT38yzFk74pcJ4kCyWVw497SRF9bMp24knnFT6uw4EPJQbZ
ovSbWlvTQzvm1CwnOnhHORzM/wLBkYvxIFkTOkQBpEuViks01BoPEOTAgcH22aJ9
SlYkOmg8csQFCLePlZXrD+Jmh8xOrmuomyVeo4oBcXjUnPmkUG4lEqFvi0y0VHMv
Fg0+8YWa4FkE/ZKzqpO4oWb8HxAtuJ6yRzTy5/X01dae1xrPrs0tQ/jQj6cndsdp
wf+F4TDeQHcV+v2TiBXUl2sYjkV0kOfYggbJUZTyFvMGJuq4HMsNyIm2AVBtWEgZ
S4cqFeExqdaOvo+7mobMsg==
MIIDGTCCAgGgAwIBAgIUduPG6pTkGZWno81X0uddz0pBgeAwDQYJKoZIhvcNAQEL
BQAwGzEZMBcGA1UEAwwQUHJhdmVnYS1TdGFjay1DQTAgFw0yMTA0MjYyMTM2MzJa
GA8yMTIxMDQwMjIxMzYzMlowGzEZMBcGA1UEAwwQUHJhdmVnYS1TdGFjay1DQTCC
ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAJk/mT5WGDIibSLzOHo4Wg0/
CLTAWg3mnAn1ykVn5goFvP+Ec+LbwzUTXWXeVBPgts6t1VcE6PrpKlGt47l/XNLF
HoC0Q3Z0zohSTTYLW9CeqjZSOoEKl+l/UAiGxwr31+9p9Arjda+1WDR1oE4pN7YP
R4hn7G2211eUkfaqYrWK6BeFFAn9XPGdzp+VB9DGm3QyWY6UIwx0WxrlbiFC8W/H
n990we266l00P0D2kzTVquJ+Q6+JLhQCWhZYrjVbh6Hoo58A6ygwsfw1ZJ7lQk/D
6KVTD2BHjQhhONFhjyghNXC/fOLUUOGX764M4jVcaa1/sMPh+XP2ncZ6wRDrbTkC
AwEAAaNTMFEwHQYDVR0OBBYEFKm6SWz9ibOjfTRSymKlzLyWucMnMB8GA1UdIwQY
MBaAFKm6SWz9ibOjfTRSymKlzLyWucMnMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZI
hvcNAQELBQADggEBADICwBwyIjvOFIqdIHUr8EkrZ2o2MSYszD14tGkB92quAX/I
Pfx6hIxMSXx3VIQxHvAuJgeWQ7+s0Bj2k+9VjIsvWKbvwCunaEkimswmpvdTeM+B
Hjh2U60Y6dJvBmWLupdIhthA0q4AOkniQbpTDYIwSKlEXv9qeqNUzsfmaZ78dOUm
UWLKKw8xY5Jk5AEHn5pZDWD05i2N/WpZ9ZFZH1zMtXqm8Zv+SwULU9NGVNlVS0u6
WFzLbO2GY5W8HRVNI/7cMe3MlqwnRxIevCBFXPFRBFHfZ79Daki6pN0Pe3w2vDXg
OS5gnlNBdJS7N6m7tPtjFOHqknNVIcGuColFbCw=
-----END CERTIFICATE-----
28 changes: 15 additions & 13 deletions src/transaction/pinger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ use pravega_client_shared::{PingStatus, ScopedStream, TxId};
use std::collections::HashSet;
use std::time::Duration;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;
use tokio::time::sleep;
use tracing::{debug, error, info};

#[derive(Debug)]
enum PingerEvent {
Add(TxId),
Remove(TxId),
Terminate,
}

/// Pinger is used to ping transactions periodically. It spawns a task that runs in the background
Expand All @@ -34,6 +34,7 @@ pub(crate) struct Pinger {
ping_interval_millis: u64,
factory: ClientFactory,
receiver: UnboundedReceiver<PingerEvent>,
shutdown: oneshot::Receiver<()>,
}

/// PingerHandle is just a wrapped channel sender which is used to communicate with the Pinger.
Expand Down Expand Up @@ -65,28 +66,24 @@ impl PingerHandle {
}
}

impl Drop for PingerHandle {
fn drop(&mut self) {
let _res = self.0.send(PingerEvent::Terminate);
}
}

impl Pinger {
pub(crate) fn new(
stream: ScopedStream,
txn_lease_millis: u64,
factory: ClientFactory,
) -> (Self, PingerHandle) {
) -> (Self, PingerHandle, oneshot::Sender<()>) {
let (tx, rx) = unbounded_channel();
let (oneshot_tx, oneshot_rx) = oneshot::channel();
let pinger = Pinger {
stream,
txn_lease_millis,
ping_interval_millis: Pinger::get_ping_interval(txn_lease_millis),
factory,
receiver: rx,
shutdown: oneshot_rx,
};
let handle = PingerHandle(tx);
(pinger, handle)
(pinger, handle, oneshot_tx)
}

pub(crate) async fn start_ping(&mut self) {
Expand All @@ -108,9 +105,6 @@ impl Pinger {
PingerEvent::Remove(id) => {
txn_list.remove(&id);
}
PingerEvent::Terminate => {
return;
}
}
} else {
panic!("pinger sender gone");
Expand Down Expand Up @@ -149,7 +143,15 @@ impl Pinger {
info!("sending transaction pings complete.");

// delay for transaction lease milliseconds.
sleep(Duration::from_millis(self.txn_lease_millis)).await;
tokio::select! {
_ = sleep(Duration::from_millis(self.txn_lease_millis)) => {
debug!("pinger wake up after {}ms", self.txn_lease_millis);
}
_ = &mut self.shutdown => {
info!("shut down pinger");
return;
}
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/transaction/transactional_event_stream_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use snafu::ResultExt;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::oneshot;
use tracing::{info, info_span};
use tracing_futures::Instrument;

Expand Down Expand Up @@ -68,12 +69,13 @@ pub struct TransactionalEventStreamWriter {
factory: ClientFactory,
pinger_handle: PingerHandle,
delegation_token_provider: Arc<DelegationTokenProvider>,
shutdown: oneshot::Sender<()>,
}

impl TransactionalEventStreamWriter {
// use ClientFactory to initialize a TransactionalEventStreamWriter.
pub(crate) async fn new(stream: ScopedStream, writer_id: WriterId, factory: ClientFactory) -> Self {
let (mut pinger, pinger_handle) = Pinger::new(
let (mut pinger, pinger_handle, shutdown) = Pinger::new(
stream.clone(),
factory.get_config().transaction_timeout_time,
factory.clone(),
Expand All @@ -90,6 +92,7 @@ impl TransactionalEventStreamWriter {
factory,
pinger_handle,
delegation_token_provider,
shutdown,
}
}

Expand Down

0 comments on commit 766502e

Please sign in to comment.