From 6ba5b4ae1d1b8e9871e51b1acd73258f64da09f8 Mon Sep 17 00:00:00 2001 From: David McNeil Date: Tue, 13 Aug 2019 09:24:22 -0400 Subject: [PATCH 01/14] Update event stream struct naming Signed-off-by: David McNeil --- components/sup/src/event.rs | 28 ++++++++++++++-------------- components/sup/src/event/nitox.rs | 10 +++++----- components/sup/src/event/ratsio.rs | 10 +++++----- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/components/sup/src/event.rs b/components/sup/src/event.rs index 252f6812f0..2662adbffe 100644 --- a/components/sup/src/event.rs +++ b/components/sup/src/event.rs @@ -69,7 +69,7 @@ pub fn init_stream(config: EventStreamConfig, event_core: EventCore) -> Result<( INIT.call_once(|| { let conn_info = - EventConnectionInfo::new(config.token, config.url, &event_core.supervisor_id); + EventStreamConnectionInfo::new(config.token, config.url, &event_core.supervisor_id); match stream_impl::init_stream(conn_info) { Ok(event_stream) => { EVENT_STREAM.set(event_stream); @@ -113,7 +113,7 @@ impl<'a> From<&'a ArgMatches<'a>> for EventStreamConfig { /// All the information needed to establish a connection to a NATS /// Streaming server. -pub struct EventConnectionInfo { +pub struct EventStreamConnectionInfo { pub name: String, pub verbose: bool, pub cluster_uri: String, @@ -121,13 +121,13 @@ pub struct EventConnectionInfo { pub auth_token: AutomateAuthToken, } -impl EventConnectionInfo { +impl EventStreamConnectionInfo { pub fn new(auth_token: AutomateAuthToken, cluster_uri: String, supervisor_id: &str) -> Self { - EventConnectionInfo { name: format!("hab_client_{}", supervisor_id), - verbose: true, - cluster_uri, - cluster_id: "event-service".to_string(), - auth_token } + EventStreamConnectionInfo { name: format!("hab_client_{}", supervisor_id), + verbose: true, + cluster_uri, + cluster_id: "event-service".to_string(), + auth_token } } } @@ -260,24 +260,24 @@ impl EventStream { /// How long should we for the event thread to start up before /// abandoning it and shutting down? #[derive(Clone, Debug)] -struct EventThreadStartupWait { +struct EventStreamConnectTimeout { secs: u64, } -impl Default for EventThreadStartupWait { +impl Default for EventStreamConnectTimeout { fn default() -> Self { Self { secs: 5 } } } -impl FromStr for EventThreadStartupWait { +impl FromStr for EventStreamConnectTimeout { type Err = ParseIntError; fn from_str(s: &str) -> ::std::result::Result { Ok(Self { secs: s.parse()? }) } } -impl EnvConfig for EventThreadStartupWait { - const ENVVAR: &'static str = "HAB_EVENT_THREAD_STARTUP_WAIT_SEC"; +impl EnvConfig for EventStreamConnectTimeout { + const ENVVAR: &'static str = "HAB_EVENT_STREAM_CONNECT_TIMEOUT"; } -impl Into for EventThreadStartupWait { +impl Into for EventStreamConnectTimeout { fn into(self) -> Duration { Duration::from_secs(self.secs) } } diff --git a/components/sup/src/event/nitox.rs b/components/sup/src/event/nitox.rs index ee85129233..95d1b5d995 100644 --- a/components/sup/src/event/nitox.rs +++ b/components/sup/src/event/nitox.rs @@ -1,7 +1,7 @@ -use super::EventThreadStartupWait; +use super::EventStreamConnectTimeout; use crate::event::{Error, - EventConnectionInfo, EventStream, + EventStreamConnectionInfo, Result}; use futures::{sync::mpsc as futures_mpsc, Future, @@ -19,7 +19,7 @@ use tokio::{executor, /// All messages are published under this subject. const HABITAT_SUBJECT: &str = "habitat"; -pub(super) fn init_stream(conn_info: EventConnectionInfo) -> Result { +pub(super) fn init_stream(conn_info: EventStreamConnectionInfo) -> Result { // TODO (CM): Investigate back-pressure scenarios let (event_tx, event_rx) = futures_mpsc::unbounded(); let (sync_tx, sync_rx) = std_mpsc::sync_channel(0); // rendezvous channel @@ -33,7 +33,7 @@ pub(super) fn init_stream(conn_info: EventConnectionInfo) -> Result #[rustfmt::skip] thread::Builder::new().name("events".to_string()) .spawn(move || { - let EventConnectionInfo { name, + let EventStreamConnectionInfo { name, verbose, cluster_uri, cluster_id, @@ -89,7 +89,7 @@ pub(super) fn init_stream(conn_info: EventConnectionInfo) -> Result }) .map_err(Error::SpawnEventThreadError)?; - sync_rx.recv_timeout(EventThreadStartupWait::configured_value().into()) + sync_rx.recv_timeout(EventStreamConnectTimeout::configured_value().into()) .map_err(Error::ConnectEventServerError)?; Ok(EventStream(event_tx)) } diff --git a/components/sup/src/event/ratsio.rs b/components/sup/src/event/ratsio.rs index c47d725fd8..886b40dd63 100644 --- a/components/sup/src/event/ratsio.rs +++ b/components/sup/src/event/ratsio.rs @@ -1,7 +1,7 @@ -use super::EventThreadStartupWait; +use super::EventStreamConnectTimeout; use crate::event::{Error, - EventConnectionInfo, EventStream, + EventStreamConnectionInfo, Result}; use futures::{sync::mpsc as futures_mpsc, Future, @@ -19,7 +19,7 @@ use tokio::{executor, /// All messages are published under this subject. const HABITAT_SUBJECT: &str = "habitat"; -pub(super) fn init_stream(conn_info: EventConnectionInfo) -> Result { +pub(super) fn init_stream(conn_info: EventStreamConnectionInfo) -> Result { let (event_tx, event_rx) = futures_mpsc::unbounded(); let (sync_tx, sync_rx) = std_mpsc::sync_channel(0); // rendezvous channel @@ -28,7 +28,7 @@ pub(super) fn init_stream(conn_info: EventConnectionInfo) -> Result #[rustfmt::skip] thread::Builder::new().name("events".to_string()) .spawn(move || { - let EventConnectionInfo { name, + let EventStreamConnectionInfo { name, verbose, cluster_uri, cluster_id, @@ -75,7 +75,7 @@ pub(super) fn init_stream(conn_info: EventConnectionInfo) -> Result }) .map_err(Error::SpawnEventThreadError)?; - sync_rx.recv_timeout(EventThreadStartupWait::configured_value().into()) + sync_rx.recv_timeout(EventStreamConnectTimeout::configured_value().into()) .map_err(Error::ConnectEventServerError)?; Ok(EventStream(event_tx)) } From 68f9e4339b9b52fb9abaeffde54a3479a9a596f0 Mon Sep 17 00:00:00 2001 From: David McNeil Date: Tue, 13 Aug 2019 10:27:56 -0400 Subject: [PATCH 02/14] Move event-stream-connect-timeout into structs Signed-off-by: David McNeil --- components/sup/src/event.rs | 91 +++++++++++++++++------------- components/sup/src/event/nitox.rs | 16 +++--- components/sup/src/event/ratsio.rs | 16 +++--- 3 files changed, 67 insertions(+), 56 deletions(-) diff --git a/components/sup/src/event.rs b/components/sup/src/event.rs index 2662adbffe..8d7d416ead 100644 --- a/components/sup/src/event.rs +++ b/components/sup/src/event.rs @@ -38,8 +38,7 @@ pub use error::{Error, use futures::sync::mpsc::UnboundedSender; use habitat_common::types::{AutomateAuthToken, EventStreamMetadata}; -use habitat_core::{env::Config as EnvConfig, - package::ident::PackageIdent}; +use habitat_core::package::ident::PackageIdent; use state::Container; use std::{net::SocketAddr, num::ParseIntError, @@ -68,8 +67,7 @@ pub fn init_stream(config: EventStreamConfig, event_core: EventCore) -> Result<( let mut return_value: Result<()> = Ok(()); INIT.call_once(|| { - let conn_info = - EventStreamConnectionInfo::new(config.token, config.url, &event_core.supervisor_id); + let conn_info = EventStreamConnectionInfo::new(&event_core.supervisor_id, config); match stream_impl::init_stream(conn_info) { Ok(event_stream) => { EVENT_STREAM.set(event_stream); @@ -86,48 +84,52 @@ pub fn init_stream(config: EventStreamConfig, event_core: EventCore) -> Result<( /// be passed in by a user #[derive(Clone, Debug)] pub struct EventStreamConfig { - environment: String, - application: String, - site: Option, - meta: EventStreamMetadata, - token: AutomateAuthToken, - url: String, + environment: String, + application: String, + site: Option, + meta: EventStreamMetadata, + token: AutomateAuthToken, + url: String, + connect_timeout: EventStreamConnectTimeout, } impl<'a> From<&'a ArgMatches<'a>> for EventStreamConfig { fn from(m: &ArgMatches) -> Self { - EventStreamConfig { environment: m.value_of("EVENT_STREAM_ENVIRONMENT") - .map(str::to_string) - .expect("Required option for EventStream feature"), - application: m.value_of("EVENT_STREAM_APPLICATION") - .map(str::to_string) - .expect("Required option for EventStream feature"), - site: m.value_of("EVENT_STREAM_SITE").map(str::to_string), - meta: EventStreamMetadata::from(m), - token: AutomateAuthToken::from(m), - url: m.value_of("EVENT_STREAM_URL") - .map(str::to_string) - .expect("Required option for EventStream feature"), } + EventStreamConfig { environment: m.value_of("EVENT_STREAM_ENVIRONMENT") + .map(str::to_string) + .expect("Required option for EventStream feature"), + application: m.value_of("EVENT_STREAM_APPLICATION") + .map(str::to_string) + .expect("Required option for EventStream feature"), + site: m.value_of("EVENT_STREAM_SITE").map(str::to_string), + meta: EventStreamMetadata::from(m), + token: AutomateAuthToken::from(m), + url: m.value_of("EVENT_STREAM_URL") + .map(str::to_string) + .expect("Required option for EventStream feature"), + connect_timeout: EventStreamConnectTimeout::from(m), } } } /// All the information needed to establish a connection to a NATS /// Streaming server. pub struct EventStreamConnectionInfo { - pub name: String, - pub verbose: bool, - pub cluster_uri: String, - pub cluster_id: String, - pub auth_token: AutomateAuthToken, + pub name: String, + pub verbose: bool, + pub cluster_uri: String, + pub cluster_id: String, + pub auth_token: AutomateAuthToken, + pub connect_timeout: EventStreamConnectTimeout, } impl EventStreamConnectionInfo { - pub fn new(auth_token: AutomateAuthToken, cluster_uri: String, supervisor_id: &str) -> Self { - EventStreamConnectionInfo { name: format!("hab_client_{}", supervisor_id), - verbose: true, - cluster_uri, - cluster_id: "event-service".to_string(), - auth_token } + pub fn new(supervisor_id: &str, config: EventStreamConfig) -> Self { + EventStreamConnectionInfo { name: format!("hab_client_{}", supervisor_id), + verbose: true, + cluster_uri: config.url, + cluster_id: "event-service".to_string(), + auth_token: config.token, + connect_timeout: config.connect_timeout, } } } @@ -260,12 +262,15 @@ impl EventStream { /// How long should we for the event thread to start up before /// abandoning it and shutting down? #[derive(Clone, Debug)] -struct EventStreamConnectTimeout { +pub struct EventStreamConnectTimeout { secs: u64, } -impl Default for EventStreamConnectTimeout { - fn default() -> Self { Self { secs: 5 } } +impl EventStreamConnectTimeout { + /// The name of the Clap argument. + pub const ARG_NAME: &'static str = "EVENT_STREAM_CONNECT_TIMEOUT"; + /// The environment variable to set this value. + pub const ENVVAR: &'static str = "HAB_EVENT_STREAM_CONNECT_TIMEOUT"; } impl FromStr for EventStreamConnectTimeout { @@ -274,10 +279,16 @@ impl FromStr for EventStreamConnectTimeout { fn from_str(s: &str) -> ::std::result::Result { Ok(Self { secs: s.parse()? }) } } -impl EnvConfig for EventStreamConnectTimeout { - const ENVVAR: &'static str = "HAB_EVENT_STREAM_CONNECT_TIMEOUT"; -} - impl Into for EventStreamConnectTimeout { fn into(self) -> Duration { Duration::from_secs(self.secs) } } + +impl<'a> From<&'a ArgMatches<'a>> for EventStreamConnectTimeout { + /// Create an instance of `EventStreamConnectTimeout` from validated user input. + fn from(m: &ArgMatches) -> Self { + m.value_of(Self::ARG_NAME) + .expect("EVENT_STREAM_CONNECT_TIMEOUT should be set") + .parse() + .expect("EVENT_STREAM_CONNECT_TIMEOUT should be validated") + } +} diff --git a/components/sup/src/event/nitox.rs b/components/sup/src/event/nitox.rs index 95d1b5d995..9870b5c436 100644 --- a/components/sup/src/event/nitox.rs +++ b/components/sup/src/event/nitox.rs @@ -1,4 +1,3 @@ -use super::EventStreamConnectTimeout; use crate::event::{Error, EventStream, EventStreamConnectionInfo, @@ -6,7 +5,6 @@ use crate::event::{Error, use futures::{sync::mpsc as futures_mpsc, Future, Stream}; -use habitat_core::env::Config as _; use nitox::{commands::ConnectCommand, streaming::client::NatsStreamingClient, NatsClient, @@ -24,6 +22,13 @@ pub(super) fn init_stream(conn_info: EventStreamConnectionInfo) -> Result Result Result Result Result Date: Tue, 13 Aug 2019 10:34:15 -0400 Subject: [PATCH 03/14] Move EventStreamConnectTimeout to types Signed-off-by: David McNeil --- components/common/src/types.rs | 38 ++++++++++++++++++++++++++++++++- components/sup/src/event.rs | 39 +--------------------------------- 2 files changed, 38 insertions(+), 39 deletions(-) diff --git a/components/common/src/types.rs b/components/common/src/types.rs index 0b845c1ac9..10dc332a95 100644 --- a/components/common/src/types.rs +++ b/components/common/src/types.rs @@ -8,11 +8,13 @@ use std::{collections::HashMap, SocketAddr, SocketAddrV4, ToSocketAddrs}, + num::ParseIntError, ops::{Deref, DerefMut}, option, result, - str::FromStr}; + str::FromStr, + time::Duration}; /// Bundles up information about the user and group that a supervised /// service should be run as. If the Supervisor itself is running with @@ -145,6 +147,40 @@ impl fmt::Display for AutomateAuthToken { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}", self.0) } } +/// How long should we for the event thread to start up before +/// abandoning it and shutting down? +#[derive(Clone, Debug)] +pub struct EventStreamConnectTimeout { + secs: u64, +} + +impl EventStreamConnectTimeout { + /// The name of the Clap argument. + pub const ARG_NAME: &'static str = "EVENT_STREAM_CONNECT_TIMEOUT"; + /// The environment variable to set this value. + pub const ENVVAR: &'static str = "HAB_EVENT_STREAM_CONNECT_TIMEOUT"; +} + +impl FromStr for EventStreamConnectTimeout { + type Err = ParseIntError; + + fn from_str(s: &str) -> ::std::result::Result { Ok(Self { secs: s.parse()? }) } +} + +impl Into for EventStreamConnectTimeout { + fn into(self) -> Duration { Duration::from_secs(self.secs) } +} + +impl<'a> From<&'a ArgMatches<'a>> for EventStreamConnectTimeout { + /// Create an instance of `EventStreamConnectTimeout` from validated user input. + fn from(m: &ArgMatches) -> Self { + m.value_of(Self::ARG_NAME) + .expect("EVENT_STREAM_CONNECT_TIMEOUT should be set") + .parse() + .expect("EVENT_STREAM_CONNECT_TIMEOUT should be validated") + } +} + habitat_core::env_config_socketaddr!(#[derive(Clone, Copy, PartialEq, Eq, Debug)] pub GossipListenAddr, HAB_LISTEN_GOSSIP, diff --git a/components/sup/src/event.rs b/components/sup/src/event.rs index 8d7d416ead..190c558f68 100644 --- a/components/sup/src/event.rs +++ b/components/sup/src/event.rs @@ -37,12 +37,11 @@ pub use error::{Error, Result}; use futures::sync::mpsc::UnboundedSender; use habitat_common::types::{AutomateAuthToken, + EventStreamConnectTimeout, EventStreamMetadata}; use habitat_core::package::ident::PackageIdent; use state::Container; use std::{net::SocketAddr, - num::ParseIntError, - str::FromStr, sync::Once, time::Duration}; @@ -256,39 +255,3 @@ impl EventStream { } } } - -//////////////////////////////////////////////////////////////////////// - -/// How long should we for the event thread to start up before -/// abandoning it and shutting down? -#[derive(Clone, Debug)] -pub struct EventStreamConnectTimeout { - secs: u64, -} - -impl EventStreamConnectTimeout { - /// The name of the Clap argument. - pub const ARG_NAME: &'static str = "EVENT_STREAM_CONNECT_TIMEOUT"; - /// The environment variable to set this value. - pub const ENVVAR: &'static str = "HAB_EVENT_STREAM_CONNECT_TIMEOUT"; -} - -impl FromStr for EventStreamConnectTimeout { - type Err = ParseIntError; - - fn from_str(s: &str) -> ::std::result::Result { Ok(Self { secs: s.parse()? }) } -} - -impl Into for EventStreamConnectTimeout { - fn into(self) -> Duration { Duration::from_secs(self.secs) } -} - -impl<'a> From<&'a ArgMatches<'a>> for EventStreamConnectTimeout { - /// Create an instance of `EventStreamConnectTimeout` from validated user input. - fn from(m: &ArgMatches) -> Self { - m.value_of(Self::ARG_NAME) - .expect("EVENT_STREAM_CONNECT_TIMEOUT should be set") - .parse() - .expect("EVENT_STREAM_CONNECT_TIMEOUT should be validated") - } -} From 0b2445ad01c7d2b74ef1433b3cbf805111fdeaaa Mon Sep 17 00:00:00 2001 From: David McNeil Date: Tue, 13 Aug 2019 10:47:55 -0400 Subject: [PATCH 04/14] Add event-stream-connect-timeout cli option Signed-off-by: David McNeil --- components/hab/src/cli.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/components/hab/src/cli.rs b/components/hab/src/cli.rs index 7595d5a241..6a529d1e9b 100644 --- a/components/hab/src/cli.rs +++ b/components/hab/src/cli.rs @@ -9,6 +9,7 @@ use habitat_common::{cli::{BINLINK_DIR_ENVVAR, RING_ENVVAR, RING_KEY_ENVVAR}, types::{AutomateAuthToken, + EventStreamConnectTimeout, EventStreamMetadata, GossipListenAddr, HttpListenAddr, @@ -1174,6 +1175,8 @@ fn sub_svc_unload() -> App<'static, 'static> { // For now, though, these at least provide a place to supply the // information; we can revise as we go. fn add_event_stream_options(app: App<'static, 'static>) -> App<'static, 'static> { + // Create shorter alias so formating works correctly + type ConnectTimeout = EventStreamConnectTimeout; app.arg(Arg::with_name("EVENT_STREAM_APPLICATION").help("The name of the application for \ event stream purposes. This is \ distinct from the `--application` \ @@ -1194,6 +1197,18 @@ fn add_event_stream_options(app: App<'static, 'static>) -> App<'static, 'static> .required(true) .takes_value(true) .validator(non_empty)) + .arg(Arg::with_name(ConnectTimeout::ARG_NAME).help("How long in seconds to wait for an \ + event stream connection before \ + exiting the supervisor. Set to '0' to \ + immediately start the supervisor and \ + continue running regardless of the \ + event stream status.") + .long("event-stream-connect-timeout") + .required(false) + .takes_value(true) + .env(ConnectTimeout::ENVVAR) + .default_value("5") + .validator(valid_numeric::)) .arg(Arg::with_name("EVENT_STREAM_URL").help("The event stream connection string \ (host:port) used by this Supervisor to send \ events to a messaging server.") From 4f1aef69096a7969aa7d4ff0cf65d136fefa6ea2 Mon Sep 17 00:00:00 2001 From: David McNeil Date: Tue, 13 Aug 2019 12:18:41 -0400 Subject: [PATCH 05/14] Add immediate event stream connect method Signed-off-by: David McNeil --- components/common/src/types.rs | 52 +++++++++++++++++------ components/hab/src/cli.rs | 28 ++++++------- components/sup/src/event.rs | 66 +++++++++++++++--------------- components/sup/src/event/nitox.rs | 16 +++++--- components/sup/src/event/ratsio.rs | 15 ++++--- 5 files changed, 107 insertions(+), 70 deletions(-) diff --git a/components/common/src/types.rs b/components/common/src/types.rs index 10dc332a95..5835c28e33 100644 --- a/components/common/src/types.rs +++ b/components/common/src/types.rs @@ -147,32 +147,40 @@ impl fmt::Display for AutomateAuthToken { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}", self.0) } } -/// How long should we for the event thread to start up before -/// abandoning it and shutting down? +/// The event stream connection method. #[derive(Clone, Debug)] -pub struct EventStreamConnectTimeout { - secs: u64, +pub enum EventStreamConnectMethod { + /// Immediately start the supervisor regardless of the event stream status. + Immediate, + /// Before starting the supervisor, wait the timeout for an event stream connection. + /// If after the timeout there is no connection, exit the supervisor. + Timeout { secs: u64 }, } -impl EventStreamConnectTimeout { +impl EventStreamConnectMethod { /// The name of the Clap argument. pub const ARG_NAME: &'static str = "EVENT_STREAM_CONNECT_TIMEOUT"; /// The environment variable to set this value. pub const ENVVAR: &'static str = "HAB_EVENT_STREAM_CONNECT_TIMEOUT"; } -impl FromStr for EventStreamConnectTimeout { +impl FromStr for EventStreamConnectMethod { type Err = ParseIntError; - fn from_str(s: &str) -> ::std::result::Result { Ok(Self { secs: s.parse()? }) } -} - -impl Into for EventStreamConnectTimeout { - fn into(self) -> Duration { Duration::from_secs(self.secs) } + /// A numeric value of zero indicates there is no timeout and the `Immediate` variant is + /// returned. Otherwise, the `Timeout` variant is returned. + fn from_str(s: &str) -> ::std::result::Result { + let secs = s.parse()?; + if secs == 0 { + Ok(EventStreamConnectMethod::Immediate) + } else { + Ok(EventStreamConnectMethod::Timeout { secs }) + } + } } -impl<'a> From<&'a ArgMatches<'a>> for EventStreamConnectTimeout { - /// Create an instance of `EventStreamConnectTimeout` from validated user input. +impl<'a> From<&'a ArgMatches<'a>> for EventStreamConnectMethod { + /// Create an instance of `EventStreamConnectMethod` from validated user input. fn from(m: &ArgMatches) -> Self { m.value_of(Self::ARG_NAME) .expect("EVENT_STREAM_CONNECT_TIMEOUT should be set") @@ -181,6 +189,24 @@ impl<'a> From<&'a ArgMatches<'a>> for EventStreamConnectTimeout { } } +impl EventStreamConnectMethod { + pub fn is_timeout(&self) -> bool { + match self { + EventStreamConnectMethod::Timeout { .. } => true, + _ => false, + } + } +} + +impl Into> for EventStreamConnectMethod { + fn into(self) -> Option { + match self { + EventStreamConnectMethod::Immediate => None, + EventStreamConnectMethod::Timeout { secs } => Some(Duration::from_secs(secs)), + } + } +} + habitat_core::env_config_socketaddr!(#[derive(Clone, Copy, PartialEq, Eq, Debug)] pub GossipListenAddr, HAB_LISTEN_GOSSIP, diff --git a/components/hab/src/cli.rs b/components/hab/src/cli.rs index 6a529d1e9b..4fcbb5ff22 100644 --- a/components/hab/src/cli.rs +++ b/components/hab/src/cli.rs @@ -9,7 +9,7 @@ use habitat_common::{cli::{BINLINK_DIR_ENVVAR, RING_ENVVAR, RING_KEY_ENVVAR}, types::{AutomateAuthToken, - EventStreamConnectTimeout, + EventStreamConnectMethod, EventStreamMetadata, GossipListenAddr, HttpListenAddr, @@ -1176,7 +1176,7 @@ fn sub_svc_unload() -> App<'static, 'static> { // information; we can revise as we go. fn add_event_stream_options(app: App<'static, 'static>) -> App<'static, 'static> { // Create shorter alias so formating works correctly - type ConnectTimeout = EventStreamConnectTimeout; + type ConnectMethod = EventStreamConnectMethod; app.arg(Arg::with_name("EVENT_STREAM_APPLICATION").help("The name of the application for \ event stream purposes. This is \ distinct from the `--application` \ @@ -1197,18 +1197,18 @@ fn add_event_stream_options(app: App<'static, 'static>) -> App<'static, 'static> .required(true) .takes_value(true) .validator(non_empty)) - .arg(Arg::with_name(ConnectTimeout::ARG_NAME).help("How long in seconds to wait for an \ - event stream connection before \ - exiting the supervisor. Set to '0' to \ - immediately start the supervisor and \ - continue running regardless of the \ - event stream status.") - .long("event-stream-connect-timeout") - .required(false) - .takes_value(true) - .env(ConnectTimeout::ENVVAR) - .default_value("5") - .validator(valid_numeric::)) + .arg(Arg::with_name(ConnectMethod::ARG_NAME).help("How long in seconds to wait for an \ + event stream connection before exiting \ + the supervisor. Set to '0' to \ + immediately start the supervisor and \ + continue running regardless of the \ + event stream status.") + .long("event-stream-connect-timeout") + .required(false) + .takes_value(true) + .env(ConnectMethod::ENVVAR) + .default_value("5") + .validator(valid_numeric::)) .arg(Arg::with_name("EVENT_STREAM_URL").help("The event stream connection string \ (host:port) used by this Supervisor to send \ events to a messaging server.") diff --git a/components/sup/src/event.rs b/components/sup/src/event.rs index 190c558f68..b6cff0db1b 100644 --- a/components/sup/src/event.rs +++ b/components/sup/src/event.rs @@ -37,7 +37,7 @@ pub use error::{Error, Result}; use futures::sync::mpsc::UnboundedSender; use habitat_common::types::{AutomateAuthToken, - EventStreamConnectTimeout, + EventStreamConnectMethod, EventStreamMetadata}; use habitat_core::package::ident::PackageIdent; use state::Container; @@ -83,52 +83,52 @@ pub fn init_stream(config: EventStreamConfig, event_core: EventCore) -> Result<( /// be passed in by a user #[derive(Clone, Debug)] pub struct EventStreamConfig { - environment: String, - application: String, - site: Option, - meta: EventStreamMetadata, - token: AutomateAuthToken, - url: String, - connect_timeout: EventStreamConnectTimeout, + environment: String, + application: String, + site: Option, + meta: EventStreamMetadata, + token: AutomateAuthToken, + url: String, + connect_method: EventStreamConnectMethod, } impl<'a> From<&'a ArgMatches<'a>> for EventStreamConfig { fn from(m: &ArgMatches) -> Self { - EventStreamConfig { environment: m.value_of("EVENT_STREAM_ENVIRONMENT") - .map(str::to_string) - .expect("Required option for EventStream feature"), - application: m.value_of("EVENT_STREAM_APPLICATION") - .map(str::to_string) - .expect("Required option for EventStream feature"), - site: m.value_of("EVENT_STREAM_SITE").map(str::to_string), - meta: EventStreamMetadata::from(m), - token: AutomateAuthToken::from(m), - url: m.value_of("EVENT_STREAM_URL") - .map(str::to_string) - .expect("Required option for EventStream feature"), - connect_timeout: EventStreamConnectTimeout::from(m), } + EventStreamConfig { environment: m.value_of("EVENT_STREAM_ENVIRONMENT") + .map(str::to_string) + .expect("Required option for EventStream feature"), + application: m.value_of("EVENT_STREAM_APPLICATION") + .map(str::to_string) + .expect("Required option for EventStream feature"), + site: m.value_of("EVENT_STREAM_SITE").map(str::to_string), + meta: EventStreamMetadata::from(m), + token: AutomateAuthToken::from(m), + url: m.value_of("EVENT_STREAM_URL") + .map(str::to_string) + .expect("Required option for EventStream feature"), + connect_method: EventStreamConnectMethod::from(m), } } } /// All the information needed to establish a connection to a NATS /// Streaming server. pub struct EventStreamConnectionInfo { - pub name: String, - pub verbose: bool, - pub cluster_uri: String, - pub cluster_id: String, - pub auth_token: AutomateAuthToken, - pub connect_timeout: EventStreamConnectTimeout, + pub name: String, + pub verbose: bool, + pub cluster_uri: String, + pub cluster_id: String, + pub auth_token: AutomateAuthToken, + pub connect_method: EventStreamConnectMethod, } impl EventStreamConnectionInfo { pub fn new(supervisor_id: &str, config: EventStreamConfig) -> Self { - EventStreamConnectionInfo { name: format!("hab_client_{}", supervisor_id), - verbose: true, - cluster_uri: config.url, - cluster_id: "event-service".to_string(), - auth_token: config.token, - connect_timeout: config.connect_timeout, } + EventStreamConnectionInfo { name: format!("hab_client_{}", supervisor_id), + verbose: true, + cluster_uri: config.url, + cluster_id: "event-service".to_string(), + auth_token: config.token, + connect_method: config.connect_method, } } } diff --git a/components/sup/src/event/nitox.rs b/components/sup/src/event/nitox.rs index 9870b5c436..592611b239 100644 --- a/components/sup/src/event/nitox.rs +++ b/components/sup/src/event/nitox.rs @@ -27,7 +27,8 @@ pub(super) fn init_stream(conn_info: EventStreamConnectionInfo) -> Result Result| { let publish_event = client .publish(HABITAT_SUBJECT.into(), event.into()) @@ -89,7 +93,9 @@ pub(super) fn init_stream(conn_info: EventStreamConnectionInfo) -> Result Result Result| { let stan_msg = @@ -75,7 +78,9 @@ pub(super) fn init_stream(conn_info: EventStreamConnectionInfo) -> Result Date: Wed, 14 Aug 2019 09:58:27 -0400 Subject: [PATCH 06/14] Remove natsio and nitxo clients Signed-off-by: David McNeil --- Cargo.lock | 321 ----------------------------- components/sup/Cargo.toml | 10 - components/sup/src/event.rs | 17 +- components/sup/src/event/nitox.rs | 101 --------- components/sup/src/event/ratsio.rs | 86 -------- components/sup/src/event/stream.rs | 33 +++ components/sup/src/subscriber.rs | 51 ----- 7 files changed, 38 insertions(+), 581 deletions(-) delete mode 100644 components/sup/src/event/nitox.rs delete mode 100644 components/sup/src/event/ratsio.rs create mode 100644 components/sup/src/event/stream.rs delete mode 100644 components/sup/src/subscriber.rs diff --git a/Cargo.lock b/Cargo.lock index 6c0bff80ba..3e6747e8b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -275,11 +275,6 @@ dependencies = [ "nodrop 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "atomic-counter" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "atty" version = "0.2.13" @@ -382,25 +377,6 @@ dependencies = [ "byte-tools 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "block-buffer" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "block-padding 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "byte-tools 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", - "generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "block-padding" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "byte-tools 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "broadcast" version = "0.1.0" @@ -411,11 +387,6 @@ name = "byte-tools" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "byte-tools" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "byteorder" version = "1.3.2" @@ -663,61 +634,6 @@ dependencies = [ "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "darling" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "darling_core 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", - "darling_macro 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "darling_core" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", - "ident_case 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", - "quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)", - "strsim 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", - "syn 0.15.43 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "darling_macro" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "darling_core 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", - "quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)", - "syn 0.15.43 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "derive_builder" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "darling 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", - "derive_builder_core 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", - "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", - "quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)", - "syn 0.15.43 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "derive_builder_core" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "darling 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", - "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", - "quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)", - "syn 0.15.43 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "derive_more" version = "0.14.1" @@ -750,14 +666,6 @@ dependencies = [ "generic-array 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "digest" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "dirs" version = "1.0.5" @@ -888,17 +796,6 @@ name = "env" version = "0.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "env_logger" -version = "0.5.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "atty 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", - "humantime 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "termcolor 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "env_logger" version = "0.6.2" @@ -1095,14 +992,6 @@ dependencies = [ "typenum 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "generic-array" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "typenum 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "getrandom" version = "0.1.6" @@ -1526,7 +1415,6 @@ dependencies = [ "libc 0.2.54 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "log4rs 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", - "nitox 0.1.10 (git+https://github.com/habitat-sh/nitox?branch=feature/nats-server)", "notify 4.0.12 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "palaver 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1536,7 +1424,6 @@ dependencies = [ "prost-derive 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "prost-types 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", - "ratsio 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "regex 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "rustls 0.15.2 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.98 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1707,11 +1594,6 @@ dependencies = [ "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "ident_case" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "idna" version = "0.1.5" @@ -1792,14 +1674,6 @@ dependencies = [ "winreg 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "itertools" -version = "0.7.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "either 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "itertools" version = "0.8.0" @@ -2196,33 +2070,6 @@ dependencies = [ "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "nitox" -version = "0.1.10" -source = "git+https://github.com/habitat-sh/nitox?branch=feature/nats-server#b52bb6dc8fa87bfa9e841fed3542950a6112ef6e" -dependencies = [ - "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "derive_builder 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", - "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", - "failure_derive 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", - "prost 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", - "prost-build 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", - "prost-derive 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.98 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive 1.0.98 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-tls 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", - "url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "nix" version = "0.11.0" @@ -2257,16 +2104,6 @@ name = "nom" version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "nom" -version = "4.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "memchr 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)", - "regex 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", - "version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "notify" version = "4.0.12" @@ -2315,11 +2152,6 @@ name = "numtoa" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "opaque-debug" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "openssl" version = "0.10.24" @@ -2614,15 +2446,6 @@ dependencies = [ "spin 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "prost" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", - "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "prost" version = "0.5.0" @@ -2633,24 +2456,6 @@ dependencies = [ "prost-derive 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "prost-build" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)", - "heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "itertools 0.7.11 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", - "petgraph 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)", - "prost 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", - "prost-types 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", - "which 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "prost-build" version = "0.5.0" @@ -2668,18 +2473,6 @@ dependencies = [ "which 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "prost-derive" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", - "itertools 0.7.11 (registry+https://github.com/rust-lang/crates.io-index)", - "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", - "quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)", - "syn 0.14.9 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "prost-derive" version = "0.5.0" @@ -2692,16 +2485,6 @@ dependencies = [ "syn 0.15.43 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "prost-types" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "prost 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", - "prost-derive 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "prost-types" version = "0.5.0" @@ -2907,37 +2690,6 @@ dependencies = [ "rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "ratsio" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "atomic-counter 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "chrono 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", - "derive_builder 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", - "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", - "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", - "failure_derive 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "nom 4.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", - "protobuf 2.8.0 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", - "regex 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.98 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive 1.0.98 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", - "sha2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-tls 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", - "url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "rdrand" version = "0.4.0" @@ -3313,7 +3065,6 @@ name = "serde_json" version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "ryu 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.98 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3357,17 +3108,6 @@ dependencies = [ "fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "sha2" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "block-buffer 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", - "digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", - "fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "opaque-debug 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "shlex" version = "0.1.1" @@ -3450,26 +3190,11 @@ dependencies = [ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "strsim" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "strsim" version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "syn" -version = "0.14.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", - "quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)", - "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "syn" version = "0.15.43" @@ -3515,15 +3240,6 @@ name = "tee" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "tempdir" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "rand 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "remove_dir_all 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "tempfile" version = "3.1.0" @@ -3811,16 +3527,6 @@ dependencies = [ "tokio-executor 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "tokio-tls" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", - "native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "tokio-udp" version = "0.1.3" @@ -4295,7 +4001,6 @@ dependencies = [ "checksum arc-swap 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1507f9b80b3ef096751728cf3f43bb0111ec906e44f5d8587e02c10643b9a2cd" "checksum arrayref 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "0d382e583f07208808f6b1249e60848879ba3543f57c32277bf52d69c2f0f0ee" "checksum arrayvec 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "b8d73f9beda665eaa98ab9e4f7442bd4e7de6652587de55b2525e52e29c1b0ba" -"checksum atomic-counter 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "62f447d68cfa5a9ab0c1c862a703da2a65b5ed1b7ce1153c9eb0169506d56019" "checksum atty 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "1803c647a3ec87095e7ae7acfca019e98de5ec9a7d01343f611cf3152ed71a90" "checksum autocfg 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "22130e92352b948e7e82a49cdb0aa94f2211761117f29e052dd397c1ac33542b" "checksum backtrace 0.3.34 (registry+https://github.com/rust-lang/crates.io-index)" = "b5164d292487f037ece34ec0de2fcede2faa162f085dd96d2385ab81b12765ba" @@ -4308,11 +4013,8 @@ dependencies = [ "checksum bitflags 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3d155346769a6855b86399e9bc3814ab343cd3d62c7e985113d46a0ec3c281fd" "checksum blake2b_simd 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "461f4b879a8eb70c1debf7d0788a9a5ff15f1ea9d25925fea264ef4258bed6b2" "checksum block-buffer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a076c298b9ecdb530ed9d967e74a6027d6a7478924520acddcddc24c1c8ab3ab" -"checksum block-buffer 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b" -"checksum block-padding 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "6d4dc3af3ee2e12f3e5d224e5e1e3d73668abbeb69e566d361f7d5563a4fdf09" "checksum broadcast 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fb214f702da3cc6aa1666520f40ea66f506644db5e1065be4bbc972f7ec3750b" "checksum byte-tools 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "560c32574a12a89ecd91f5e742165893f86e3ab98d21f8ea548658eb9eef5f40" -"checksum byte-tools 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" "checksum byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a7c3dd8985a7111efc5c80b44e23ecdd8c007de8ade3b96595387e812b957cf5" "checksum bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" "checksum c2-chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7d64d04786e0f528460fc884753cf8dddcc466be308f6026f8e355c41a0e4101" @@ -4340,15 +4042,9 @@ dependencies = [ "checksum crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)" = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6" "checksum crypto-mac 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0999b4ff4d3446d4ddb19a63e9e00c1876e75cd7000d20e57a693b4b3f08d958" "checksum ctrlc 3.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "5531b7f0698d9220b4729f8811931dbe0e91a05be2f7b3245fdc50dd856bae26" -"checksum darling 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fcfbcb0c5961907597a7d1148e3af036268f2b773886b8bb3eeb1e1281d3d3d6" -"checksum darling_core 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6afc018370c3bff3eb51f89256a6bdb18b4fdcda72d577982a14954a7a0b402c" -"checksum darling_macro 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c6d8dac1c6f1d29a41c4712b4400f878cb4fcc4c7628f298dd75038e024998d1" -"checksum derive_builder 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3ac53fa6a3cda160df823a9346442525dcaf1e171999a1cf23e67067e4fd64d4" -"checksum derive_builder_core 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0288a23da9333c246bb18c143426074a6ae96747995c5819d2947b64cd942b37" "checksum derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6d944ac6003ed268757ef1ee686753b57efc5fcf0ebe7b64c9fc81e7e32ff839" "checksum derive_more 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a141330240c921ec6d074a3e188a7c7ef95668bb95e7d44fa0e5778ec2a7afe" "checksum digest 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)" = "03b072242a8cbaf9c145665af9d250c59af3b958f83ed6824e13533cf76d5b90" -"checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" "checksum dirs 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3fd78930633bd1c6e35c4b42b1df7b0cbc6bc191146e512bb3bedf243fcc3901" "checksum dirs 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "13aea89a5c93364a98e9b37b2fa237effbb694d5cfe01c5b70941f7eb087d5e3" "checksum dirs-sys 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "afa0b23de8fd801745c471deffa6e12d248f962c9fd4b4c33787b055599bde7b" @@ -4365,7 +4061,6 @@ dependencies = [ "checksum encoding_rs 0.8.17 (registry+https://github.com/rust-lang/crates.io-index)" = "4155785c79f2f6701f185eb2e6b4caf0555ec03477cb4c70db67b465311620ed" "checksum enum-as-inner 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3d58266c97445680766be408285e798d3401c6d4c378ec5552e78737e681e37d" "checksum env 0.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "876927d21ef1ae98001c8c35a1d8dfd682136914b23ef04276820fa6d43c3630" -"checksum env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)" = "15b0a4d2e39f8420210be8b27eeda28029729e2fd4291019455016c348240c38" "checksum env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "aafcde04e90a5226a6443b7aabdb016ba2f8307c847d524724bd9b346dd1a2d3" "checksum env_proxy 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "700798562fcbc0a4c89546df5dfa8586e82345026e3992242646d527dec948e4" "checksum errno 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c2a071601ed01b988f896ab14b95e67335d1eeb50190932a1320f7fe3cadc84e" @@ -4391,7 +4086,6 @@ dependencies = [ "checksum futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)" = "45dc39533a6cae6da2b56da48edae506bb767ec07370f86f70fc062e9d435869" "checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" "checksum gcc 0.3.55 (registry+https://github.com/rust-lang/crates.io-index)" = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" -"checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec" "checksum generic-array 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ef25c5683767570c2bbd7deba372926a55eaae9982d7726ee2a1050239d45b9d" "checksum getrandom 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "e65cce4e5084b14874c4e7097f38cab54f47ee554f9194673456ea379dcc4c55" "checksum glob 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" @@ -4409,7 +4103,6 @@ dependencies = [ "checksum humantime 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3ca7e5f2e110db35f93b837c81797f3714500b81d517bf20c431b16d3ca4f114" "checksum hyper 0.12.33 (registry+https://github.com/rust-lang/crates.io-index)" = "7cb44cbce9d8ee4fb36e4c0ad7b794ac44ebaad924b9c8291a63215bb44c2c8f" "checksum hyper-tls 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3a800d6aa50af4b5850b2b0f659625ce9504df908e9733b635720483be26174f" -"checksum ident_case 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" "checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" "checksum idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9" "checksum indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e81a7c05f79578dbc15793d8b619db9ba32b4577003ef3af1a91c416798c58d" @@ -4418,7 +4111,6 @@ dependencies = [ "checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08" "checksum ipc-channel 0.9.0 (git+https://github.com/habitat-sh/ipc-channel?branch=hbt-windows)" = "" "checksum ipconfig 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "aa79fa216fbe60834a9c0737d7fcd30425b32d1c58854663e24d4c4b328ed83f" -"checksum itertools 0.7.11 (registry+https://github.com/rust-lang/crates.io-index)" = "0d47946d458e94a1b7bcabbf6521ea7c037062c81f534615abcad76e84d4970d" "checksum itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5b8467d9c1cebe26feb08c640139247fac215782d35371ade9a2136ed6085358" "checksum itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "501266b7edd0174f8530248f87f99c88fbe60ca4ef3dd486835b8d8d53136f7f" "checksum jemalloc-ctl 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c502a5ff9dd2924f1ed32ba96e3b65735d837b4bfd978d3161b1702e66aca4b7" @@ -4465,18 +4157,15 @@ dependencies = [ "checksum multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2eb04b9f127583ed176e163fb9ec6f3e793b87e21deedd5734a69386a18a0151" "checksum native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4b2df1a4c22fd44a62147fd8f13dd0f95c9d8ca7b2610299b2a2f9cf8964274e" "checksum net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88" -"checksum nitox 0.1.10 (git+https://github.com/habitat-sh/nitox?branch=feature/nats-server)" = "" "checksum nix 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d37e713a259ff641624b6cb20e3b12b2952313ba36b6823c0f16e6cfd9e5de17" "checksum nix 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "46f0f3210768d796e8fa79ec70ee6af172dacbe7147f5e69be5240a47778302b" "checksum nodrop 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "2f9667ddcc6cc8a43afc9b7917599d7216aa09c463919ea32c59ed6cac8bc945" "checksum nom 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf51a729ecf40266a2368ad335a5fdde43471f545a967109cd62146ecf8b66ff" -"checksum nom 4.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2ad2a91a8e869eeb30b9cb3119ae87773a8f4ae617f41b1eb9c154b2905f7bd6" "checksum notify 4.0.12 (registry+https://github.com/rust-lang/crates.io-index)" = "3572d71f13ea8ed41867accd971fd564aa75934cf7a1fae03ddb8c74a8a49943" "checksum num-integer 0.1.41 (registry+https://github.com/rust-lang/crates.io-index)" = "b85e541ef8255f6cf42bbfe4ef361305c6c135d10919ecc26126c4e5ae94bc09" "checksum num-traits 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "6ba9a427cfca2be13aa6f6403b0b7e7368fe982bfa16fccc450ce74c46cd9b32" "checksum num_cpus 1.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bcef43580c035376c0705c42792c294b66974abbfd2789b511784023f71f3273" "checksum numtoa 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b8f8bdf33df195859076e54ab11ee78a1b208382d3a26ec40d142ffc1ecc49ef" -"checksum opaque-debug 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" "checksum openssl 0.10.24 (registry+https://github.com/rust-lang/crates.io-index)" = "8152bb5a9b5b721538462336e3bef9a539f892715e5037fda0f984577311af15" "checksum openssl-probe 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de" "checksum openssl-sys 0.9.48 (registry+https://github.com/rust-lang/crates.io-index)" = "b5ba300217253bcc5dc68bed23d782affa45000193866e025329aa8a7a9f05b8" @@ -4508,13 +4197,9 @@ dependencies = [ "checksum proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)" = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759" "checksum procinfo 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6ab1427f3d2635891f842892dda177883dca0639e05fe66796a62c9d2f23b49c" "checksum prometheus 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5567486d5778e2c6455b1b90ff1c558f29e751fc018130fa182e15828e728af1" -"checksum prost 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b9f36c478cd43382388dfc3a3679af175c03d19ed8039e79a3e4447e944cd3f3" "checksum prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "96d14b1c185652833d24aaad41c5832b0be5616a590227c1fbff57c616754b23" -"checksum prost-build 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b6325275b85605f58f576456a47af44417edf5956a6f670bb59fbe12aff69597" "checksum prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "eb788126ea840817128183f8f603dce02cb7aea25c2a0b764359d8e20010702e" -"checksum prost-derive 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9787d1977ea72e8066d58e46ae66100324a2815e677897fe78dfe54958f48252" "checksum prost-derive 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5e7dc378b94ac374644181a2247cebf59a6ec1c88b49ac77f3a94b86b79d0e11" -"checksum prost-types 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5644c57d56bc085f9570e113495c1f08d7185beca700dcc296cb4672f380a679" "checksum prost-types 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1de482a366941c8d56d19b650fac09ca08508f2a696119ee7513ad590c8bac6f" "checksum protobuf 2.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8aefcec9f142b524d98fc81d07827743be89dd6586a1ba6ab21fa66a500b3fa5" "checksum publicsuffix 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "5afecba86dcf1e4fd610246f89899d1924fe12e1e89f555eb7c7f710f3c5ad1d" @@ -4537,7 +4222,6 @@ dependencies = [ "checksum rand_os 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "7b75f676a1e053fc562eafbb47838d67c84801e38fc1ba459e8f180deabd5071" "checksum rand_pcg 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "abf9b09b01790cfe0364f52bf32995ea3c39f4d2dd011eac241d2914146d0b44" "checksum rand_xorshift 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cbf7e9e623549b0e21f6e97cf8ecf247c1a8fd2e8a992ae265314300b2455d5c" -"checksum ratsio 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "78a24e7d8d62fd9728ba17a3f814a2991aca1e98a6013c268ec8a0a70ed4c771" "checksum rdrand 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" "checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" "checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76" @@ -4581,7 +4265,6 @@ dependencies = [ "checksum serde_yaml 0.8.9 (registry+https://github.com/rust-lang/crates.io-index)" = "38b08a9a90e5260fe01c6480ec7c811606df6d3a660415808c3c3fa8ed95b582" "checksum sha1 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" "checksum sha2 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9eb6be24e4c23a84d7184280d2722f7f2731fcdd4a9d886efbfe4413e4847ea0" -"checksum sha2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7b4d8bfd0e469f417657573d8451fb33d16cfe0989359b93baf3a1ffc639543d" "checksum shlex 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2" "checksum signal-hook 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4f61c4d59f3aaa9f61bba6450a9b80ba48362fd7d651689e7a10c453b1f6dc68" "checksum signal-hook-registry 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1797d48f38f91643908bb14e35e79928f9f4b3cefb2420a564dde0991b4358dc" @@ -4594,15 +4277,12 @@ dependencies = [ "checksum stable_deref_trait 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8" "checksum state 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7345c971d1ef21ffdbd103a75990a15eb03604fc8b8852ca8cb418ee1a099028" "checksum string 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d24114bfcceb867ca7f71a0d3fe45d45619ec47a6fbfa98cb14e14250bfa5d6d" -"checksum strsim 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb4f380125926a99e52bc279241539c018323fab05ad6368b56f93d9369ff550" "checksum strsim 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" -"checksum syn 0.14.9 (registry+https://github.com/rust-lang/crates.io-index)" = "261ae9ecaa397c42b960649561949d69311f08eeaea86a65696e6e46517cf741" "checksum syn 0.15.43 (registry+https://github.com/rust-lang/crates.io-index)" = "ee06ea4b620ab59a2267c6b48be16244a3389f8bfa0986bdd15c35b890b00af3" "checksum synstructure 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "02353edf96d6e4dc81aea2d8490a7e9db177bf8acb0e951c24940bf866cb313f" "checksum tabwriter 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9128e3a9149e51494cad59712a286e149fcb74e443d2298d69bd6eaa42cc4ebb" "checksum tar 0.4.25 (registry+https://github.com/rust-lang/crates.io-index)" = "7201214ded95b34e3bc00c9557b6dcec34fd1af428d343143f5db67c661762f0" "checksum tee 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "37c12559dba7383625faaff75be24becf35bfc885044375bcab931111799a3da" -"checksum tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" "checksum tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" "checksum termcolor 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "96d6098003bde162e4277c70665bd87c326f5a0c3f3fbfb285787fa482d54e6e" "checksum termion 1.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6a8fb22f7cde82c8220e5aeacb3258ed7ce996142c77cba193f203515e26c330" @@ -4627,7 +4307,6 @@ dependencies = [ "checksum tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1d14b10654be682ac43efee27401d792507e30fd8d26389e1da3b185de2e4119" "checksum tokio-threadpool 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)" = "90ca01319dea1e376a001e8dc192d42ebde6dd532532a5bad988ac37db365b19" "checksum tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "f2106812d500ed25a4f38235b9cae8f78a09edf43203e16e59c3b769a342a60e" -"checksum tokio-tls 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "354b8cd83825b3c20217a9dc174d6a0c67441a2fae5c41bcb1ea6679f6ae0f7c" "checksum tokio-udp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "66268575b80f4a4a710ef83d087fdfeeabdce9b74c797535fbac18a2cb906e92" "checksum tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "037ffc3ba0e12a0ab4aca92e5234e0dedeb48fddf6ccd260f1f150a36a9f2445" "checksum toml 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "736b60249cb25337bc196faa43ee12c705e426f3d55c214d73a4e7be06f92cb4" diff --git a/components/sup/Cargo.toml b/components/sup/Cargo.toml index 3f2ac32692..d68fcf08cf 100644 --- a/components/sup/Cargo.toml +++ b/components/sup/Cargo.toml @@ -14,15 +14,8 @@ name = "hab-sup" path = "src/main.rs" doc = false -[[bin]] -name = "subscriber" -path = "src/subscriber.rs" -doc = false - [dependencies] bytes = "*" -# This is temporary, until this is merged to the mainline and we can use the crate -nitox = { git = "https://github.com/habitat-sh/nitox", branch="feature/nats-server" } actix-web = { version = "*", default-features = false, features = [ "rust-tls" ] } byteorder = "*" clap = { version = "*", features = [ "suggestions", "color", "unstable" ] } @@ -51,7 +44,6 @@ prost = "*" prost-derive = "*" prost-types = "0.5.0" # This is current stable, but for some reason gets pulled in as 0.4.0 if we use "*" (2019-03-28) rand = "*" -ratsio = "*" regex = "*" rustls = "*" serde = { version = "*", features = ["rc"] } @@ -98,5 +90,3 @@ apidocs = [] ignore_integration_tests = [] lock_as_rwlock = ["habitat_common/lock_as_rwlock"] lock_as_mutex = ["habitat_common/lock_as_mutex"] -nitox_stream = [] -ratsio_stream = [] diff --git a/components/sup/src/event.rs b/components/sup/src/event.rs index b6cff0db1b..a9503a3518 100644 --- a/components/sup/src/event.rs +++ b/components/sup/src/event.rs @@ -1,25 +1,18 @@ //! Main interface for a stream of events the Supervisor can send out //! in the course of its operations. //! -//! Currently, the Supervisor is able to send events to a [NATS -//! Streaming][1] server. The `init_stream` function must be called +//! Currently, the Supervisor is able to send events to a [NATS][1] +//! server. The `init_stream` function must be called //! before sending events to initialize the publishing thread in the //! background. Thereafter, you can pass "event" structs to the //! `event` function, which will publish the event to the stream. //! //! All events are published under the "habitat" subject. //! -//! [1]:https://github.com/nats-io/nats-streaming-server +//! [1]:https://github.com/nats-io/nats-server mod error; -// ratsio_stream is the default, but setting it as a default in Cargo.toml -// makes it trickier to use nitox instead. -#[cfg(feature = "nitox_stream")] -#[path = "event/nitox.rs"] -mod stream_impl; -#[cfg(any(feature = "ratsio_stream", not(feature = "nitox_stream")))] -#[path = "event/ratsio.rs"] -mod stream_impl; +mod stream; mod types; pub(crate) use self::types::ServiceMetadata; @@ -67,7 +60,7 @@ pub fn init_stream(config: EventStreamConfig, event_core: EventCore) -> Result<( INIT.call_once(|| { let conn_info = EventStreamConnectionInfo::new(&event_core.supervisor_id, config); - match stream_impl::init_stream(conn_info) { + match stream::init_stream(conn_info) { Ok(event_stream) => { EVENT_STREAM.set(event_stream); EVENT_CORE.set(event_core); diff --git a/components/sup/src/event/nitox.rs b/components/sup/src/event/nitox.rs deleted file mode 100644 index 592611b239..0000000000 --- a/components/sup/src/event/nitox.rs +++ /dev/null @@ -1,101 +0,0 @@ -use crate::event::{Error, - EventStream, - EventStreamConnectionInfo, - Result}; -use futures::{sync::mpsc as futures_mpsc, - Future, - Stream}; -use nitox::{commands::ConnectCommand, - streaming::client::NatsStreamingClient, - NatsClient, - NatsClientOptions}; -use std::{sync::mpsc as std_mpsc, - thread}; -use tokio::{executor, - runtime::current_thread::Runtime as ThreadRuntime}; - -/// All messages are published under this subject. -const HABITAT_SUBJECT: &str = "habitat"; - -pub(super) fn init_stream(conn_info: EventStreamConnectionInfo) -> Result { - // TODO (CM): Investigate back-pressure scenarios - let (event_tx, event_rx) = futures_mpsc::unbounded(); - let (sync_tx, sync_rx) = std_mpsc::sync_channel(0); // rendezvous channel - - let EventStreamConnectionInfo { name, - verbose, - cluster_uri, - cluster_id, - auth_token, - connect_method, } = conn_info; - let connect_method_is_timeout = connect_method.is_timeout(); - - // TODO (CM): We could theoretically create this future and spawn - // it in the Supervisor's Tokio runtime, but there's currently a - // bug: https://github.com/YellowInnovation/nitox/issues/24 - - // Disabling rustfmt on this... I think we might be running into - // https://github.com/rust-lang/rustfmt/issues/1762 - #[rustfmt::skip] - thread::Builder::new().name("events".to_string()) - .spawn(move || { - let cc = - ConnectCommand::builder().name(Some(name)) - .verbose(verbose) - .auth_token(Some(auth_token.to_string())) - .tls_required(false) - .build() - .expect("Could not create NATS \ - ConnectCommand"); - let opts = - NatsClientOptions::builder().connect_command(cc) - .cluster_uri(cluster_uri.as_str()) - .build() - .expect("Could not create \ - NatsClientOptions"); - - let publisher = NatsClient::from_options(opts).map_err(|e| { - error!("Error connecting to NATS: {}", e); - e.into() - }) - .and_then(|client| { - NatsStreamingClient::from(client) - .cluster_id(cluster_id) - .connect() - }) - .map_err(|streaming_error| { - error!("Error upgrading to streaming NATS \ - client: {}", - streaming_error) - }) - .and_then(move |client| { - if connect_method_is_timeout { - sync_tx.send(()) - .expect("Couldn't synchronize event \ - thread!"); - } - event_rx.for_each(move |event: Vec| { - let publish_event = client - .publish(HABITAT_SUBJECT.into(), event.into()) - .map_err(|e| { - error!("Error publishing event: {}", e); - }); - executor::spawn(publish_event); - Ok(()) - }) - }); - - ThreadRuntime::new().expect("Couldn't create event stream runtime!") - .spawn(publisher) - .run() - .expect("Could not create Tokio runtime for \ - event publication thread"); - }) - .map_err(Error::SpawnEventThreadError)?; - - if let Some(connect_method) = connect_method.into() { - sync_rx.recv_timeout(connect_method) - .map_err(Error::ConnectEventServerError)?; - } - Ok(EventStream(event_tx)) -} diff --git a/components/sup/src/event/ratsio.rs b/components/sup/src/event/ratsio.rs deleted file mode 100644 index 2e83f2a5fa..0000000000 --- a/components/sup/src/event/ratsio.rs +++ /dev/null @@ -1,86 +0,0 @@ -use crate::event::{Error, - EventStream, - EventStreamConnectionInfo, - Result}; -use futures::{sync::mpsc as futures_mpsc, - Future, - Stream}; -use ratsio::{nats_client::NatsClientOptions, - stan_client::{StanClient, - StanMessage, - StanOptions}}; -use std::{sync::mpsc as std_mpsc, - thread}; -use tokio::{executor, - runtime::current_thread::Runtime as ThreadRuntime}; - -/// All messages are published under this subject. -const HABITAT_SUBJECT: &str = "habitat"; - -pub(super) fn init_stream(conn_info: EventStreamConnectionInfo) -> Result { - let (event_tx, event_rx) = futures_mpsc::unbounded(); - let (sync_tx, sync_rx) = std_mpsc::sync_channel(0); // rendezvous channel - - let EventStreamConnectionInfo { name, - verbose, - cluster_uri, - cluster_id, - auth_token, - connect_method, } = conn_info; - let connect_method_is_timeout = connect_method.is_timeout(); - - // Disabling rustfmt on this... I think we might be running into - // https://github.com/rust-lang/rustfmt/issues/1762 - #[rustfmt::skip] - thread::Builder::new().name("events".to_string()) - .spawn(move || { - let nats_options = - NatsClientOptions::builder().cluster_uris(cluster_uri) - .auth_token(auth_token.to_string()) - .verbose(verbose) - .build() - .unwrap(); - let stan_options = StanOptions::builder().nats_options(nats_options) - .cluster_id(cluster_id) - .client_id(name) - .build() - .unwrap(); - - let publisher = - StanClient::from_options(stan_options).map_err(|ratsio_error| { - error!("Error upgrading to streaming NATS client: {}", - ratsio_error) - }) - .and_then(move |client| { - if connect_method_is_timeout { - sync_tx.send(()) - .expect("Couldn't synchronize event thread!"); - } - - event_rx.for_each(move |event: Vec| { - let stan_msg = - StanMessage::new(HABITAT_SUBJECT.into(), - event); - let publish_event = client - .send(stan_msg) - .map_err(|e| { - error!("Error publishing event: {}", e) - }); - executor::spawn(publish_event); - Ok(()) - }) - }); - - ThreadRuntime::new().expect("Couldn't create event stream runtime!") - .spawn(publisher) - .run() - .expect("something seriously wrong has occurred"); - }) - .map_err(Error::SpawnEventThreadError)?; - - if let Some(connect_method) = connect_method.into() { - sync_rx.recv_timeout(connect_method) - .map_err(Error::ConnectEventServerError)?; - } - Ok(EventStream(event_tx)) -} diff --git a/components/sup/src/event/stream.rs b/components/sup/src/event/stream.rs new file mode 100644 index 0000000000..9e89e999bb --- /dev/null +++ b/components/sup/src/event/stream.rs @@ -0,0 +1,33 @@ +use crate::event::{Error, + EventStream, + EventStreamConnectionInfo, + Result}; +use futures::sync::mpsc as futures_mpsc; +use std::{sync::mpsc as std_mpsc, + thread}; + +/// All messages are published under this subject. +const HABITAT_SUBJECT: &str = "habitat"; + +pub(super) fn init_stream(conn_info: EventStreamConnectionInfo) -> Result { + let (event_tx, event_rx) = futures_mpsc::unbounded(); + let (sync_tx, sync_rx) = std_mpsc::sync_channel(0); // rendezvous channel + + let EventStreamConnectionInfo { name, + verbose, + cluster_uri, + cluster_id, + auth_token, + connect_method, } = conn_info; + let connect_method_is_timeout = connect_method.is_timeout(); + + thread::Builder::new().name("events".to_string()) + .spawn(move || {}) + .map_err(Error::SpawnEventThreadError)?; + + if let Some(connect_method) = connect_method.into() { + sync_rx.recv_timeout(connect_method) + .map_err(Error::ConnectEventServerError)?; + } + Ok(EventStream(event_tx)) +} diff --git a/components/sup/src/subscriber.rs b/components/sup/src/subscriber.rs deleted file mode 100644 index bdd1071c8b..0000000000 --- a/components/sup/src/subscriber.rs +++ /dev/null @@ -1,51 +0,0 @@ -extern crate futures; -extern crate nitox; -extern crate tokio; - -use futures::{future::{ok, - Future}, - Stream}; -use nitox::{commands::ConnectCommand, - streaming::{client::{NatsStreamingClient, - SubscribeOptionsBuilder}, - error::NatsStreamingError}, - NatsClient, - NatsClientOptions}; - -fn main() { - println!("Welcome to the testing subscriber for the Automate / Habitat Event Streaming \ - Prototype"); - println!("Press '^C' to end"); - let connect_cmd = ConnectCommand::builder().build().unwrap(); - let options = NatsClientOptions::builder().connect_command(connect_cmd) - .cluster_uri("127.0.0.1:4223") - .build() - .unwrap(); - - let listener = - NatsClient::from_options(options).map_err(NatsStreamingError::from) - .and_then(|client| { - NatsStreamingClient::from(client) - .cluster_id("test-cluster".to_string()) - .connect() - }) - .and_then(|client| { - let opts = SubscribeOptionsBuilder::default().build() - .unwrap(); - let topic = "habitat".to_string(); - println!("Subscribed to topic: '{}'", topic); - println!("==================="); - client.subscribe(topic, opts) - .and_then(move |message_stream| { - message_stream.for_each(|msg| { - println!("Message: {:#?}", String::from_utf8_lossy(&msg.proto.data)); - ok(()) - }) - }) - }) - .map_err(|e| println!("ERROR: {:?}", e)); - - tokio::runtime::Runtime::new().unwrap() - .block_on(listener) - .unwrap(); -} From 60df8c64d77125027af3c9864c96d1c166c774d0 Mon Sep 17 00:00:00 2001 From: David McNeil Date: Wed, 14 Aug 2019 12:51:28 -0400 Subject: [PATCH 07/14] Add non-streaming client Signed-off-by: David McNeil --- Cargo.lock | 14 +++++++ components/sup/Cargo.toml | 1 + components/sup/src/event.rs | 2 - components/sup/src/event/error.rs | 12 ++++++ components/sup/src/event/stream.rs | 66 ++++++++++++++++++++++++++---- 5 files changed, 85 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3e6747e8b7..ae4ec163f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1415,6 +1415,7 @@ dependencies = [ "libc 0.2.54 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "log4rs 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", + "nats 0.3.2 (git+https://github.com/habitat-sh/rust-nats)", "notify 4.0.12 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "palaver 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2060,6 +2061,18 @@ dependencies = [ "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "nats" +version = "0.3.2" +source = "git+https://github.com/habitat-sh/rust-nats#466149fa744dc61c9b7a2e5c8c7939ec75c6e86f" +dependencies = [ + "openssl 0.10.24 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.98 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", + "url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "net2" version = "0.2.33" @@ -4156,6 +4169,7 @@ dependencies = [ "checksum mktemp 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "edf4fc746c5c977923b802d86fc9a95ca79a27d8c487613f68830d68d07c7b27" "checksum multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2eb04b9f127583ed176e163fb9ec6f3e793b87e21deedd5734a69386a18a0151" "checksum native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4b2df1a4c22fd44a62147fd8f13dd0f95c9d8ca7b2610299b2a2f9cf8964274e" +"checksum nats 0.3.2 (git+https://github.com/habitat-sh/rust-nats)" = "" "checksum net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88" "checksum nix 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d37e713a259ff641624b6cb20e3b12b2952313ba36b6823c0f16e6cfd9e5de17" "checksum nix 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "46f0f3210768d796e8fa79ec70ee6af172dacbe7147f5e69be5240a47778302b" diff --git a/components/sup/Cargo.toml b/components/sup/Cargo.toml index d68fcf08cf..515e31e061 100644 --- a/components/sup/Cargo.toml +++ b/components/sup/Cargo.toml @@ -37,6 +37,7 @@ lazy_static = "*" libc = "= 0.2.54" log = "*" log4rs = "*" +nats = { git = "https://github.com/habitat-sh/rust-nats" } notify = "*" num_cpus = "*" prometheus = "*" diff --git a/components/sup/src/event.rs b/components/sup/src/event.rs index a9503a3518..96268524f8 100644 --- a/components/sup/src/event.rs +++ b/components/sup/src/event.rs @@ -109,7 +109,6 @@ pub struct EventStreamConnectionInfo { pub name: String, pub verbose: bool, pub cluster_uri: String, - pub cluster_id: String, pub auth_token: AutomateAuthToken, pub connect_method: EventStreamConnectMethod, } @@ -119,7 +118,6 @@ impl EventStreamConnectionInfo { EventStreamConnectionInfo { name: format!("hab_client_{}", supervisor_id), verbose: true, cluster_uri: config.url, - cluster_id: "event-service".to_string(), auth_token: config.token, connect_method: config.connect_method, } } diff --git a/components/sup/src/event/error.rs b/components/sup/src/event/error.rs index c54c10206c..1f474f5065 100644 --- a/components/sup/src/event/error.rs +++ b/components/sup/src/event/error.rs @@ -1,5 +1,6 @@ //! Event subsystem-specific error handling +use nats::NatsError; use std::{error, fmt, io, @@ -11,6 +12,7 @@ pub type Result = result::Result; #[derive(Debug)] pub enum Error { ConnectEventServerError(mpsc::RecvTimeoutError), + NatsError(NatsError), SpawnEventThreadError(io::Error), } @@ -28,6 +30,7 @@ impl fmt::Display for Error { Error::ConnectEventServerError(_) => { "Could not establish streaming connection to NATS server".fmt(f) } + Error::NatsError(e) => e.fmt(f), Error::SpawnEventThreadError(_) => "Could not spawn eventing thread".fmt(f), } } @@ -37,7 +40,16 @@ impl error::Error for Error { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { Error::ConnectEventServerError(ref e) => Some(e), + Error::NatsError(ref e) => Some(e), Error::SpawnEventThreadError(ref e) => Some(e), } } } + +impl From for Error { + fn from(error: NatsError) -> Self { Error::NatsError(error) } +} + +impl From for Error { + fn from(error: mpsc::RecvTimeoutError) -> Self { Error::ConnectEventServerError(error) } +} diff --git a/components/sup/src/event/stream.rs b/components/sup/src/event/stream.rs index 9e89e999bb..f11c0c680b 100644 --- a/components/sup/src/event/stream.rs +++ b/components/sup/src/event/stream.rs @@ -3,31 +3,81 @@ use crate::event::{Error, EventStreamConnectionInfo, Result}; use futures::sync::mpsc as futures_mpsc; +use nats::Client; use std::{sync::mpsc as std_mpsc, - thread}; + thread, + time::Duration}; +use tokio::{prelude::Stream, + runtime::current_thread::Runtime}; /// All messages are published under this subject. const HABITAT_SUBJECT: &str = "habitat"; pub(super) fn init_stream(conn_info: EventStreamConnectionInfo) -> Result { + // TODO (DM): This cannot be unbounded. We need backpressure. If the connection is down when we + // try to publish we try to reconnect this can be time consuming so we can easily get + // behind. let (event_tx, event_rx) = futures_mpsc::unbounded(); let (sync_tx, sync_rx) = std_mpsc::sync_channel(0); // rendezvous channel let EventStreamConnectionInfo { name, verbose, cluster_uri, - cluster_id, - auth_token, + // TODO (DM): The nats client we are using does not support + // auth tokens and will need to be patched + auth_token: _, connect_method, } = conn_info; - let connect_method_is_timeout = connect_method.is_timeout(); + let connection_is_timeout = connect_method.is_timeout(); + + // Note: With the way we are using the client, we will not respond to pings from the server + // (https://nats-io.github.io/docs/nats_protocol/nats-protocol.html#pingpong). + // Instead, we rely on publishing events to keep us connected to the server. This is completely + // valid according to the protocol, and in fact, the server will not send pings if there is + // other traffic from a client. If we have no events for an extended period of time, we will + // be automatically disconnected (because we do not respond to pings). When the next event comes + // in we will try to reconnect. + let mut client = Client::new(cluster_uri)?; + client.set_name(&name); + client.set_synchronous(verbose); + let closure = move || { + // Try to establish an intial connection to the NATS server. + loop { + if let Err(e) = client.connect() { + if connection_is_timeout { + error!("Failed to connect to NATS server '{}'. Retrying...", e); + } else { + warn!("Failed to connect to NATS server '{}'.", e); + break; + } + } else { + if connection_is_timeout { + sync_tx.send(()) + .expect("Couldn't synchronize event thread!"); + } + break; + } + thread::sleep(Duration::from_secs(1)) + } + + let event_handler = event_rx.for_each(move |event: Vec| { + if let Err(e) = client.publish(HABITAT_SUBJECT, &event) { + error!("Failed to publish event, '{}'", e); + } + Ok(()) + }); + + Runtime::new().expect("Couldn't create event stream runtime!") + .spawn(event_handler) + .run() + .expect("something seriously wrong has occurred"); + }; thread::Builder::new().name("events".to_string()) - .spawn(move || {}) + .spawn(closure) .map_err(Error::SpawnEventThreadError)?; - if let Some(connect_method) = connect_method.into() { - sync_rx.recv_timeout(connect_method) - .map_err(Error::ConnectEventServerError)?; + if let Some(timeout) = connect_method.into() { + sync_rx.recv_timeout(timeout)?; } Ok(EventStream(event_tx)) } From d137ddcb5684538bded321235620cf15fd1bc6f3 Mon Sep 17 00:00:00 2001 From: David McNeil Date: Thu, 15 Aug 2019 10:46:24 -0400 Subject: [PATCH 08/14] Add auth token support Signed-off-by: David McNeil --- Cargo.lock | 2 +- components/sup/src/event/error.rs | 2 +- components/sup/src/event/stream.rs | 42 +++++++++++++++++++++++++++--- 3 files changed, 40 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ae4ec163f6..8325bde3fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2064,7 +2064,7 @@ dependencies = [ [[package]] name = "nats" version = "0.3.2" -source = "git+https://github.com/habitat-sh/rust-nats#466149fa744dc61c9b7a2e5c8c7939ec75c6e86f" +source = "git+https://github.com/habitat-sh/rust-nats#cc51a37e49751d393abb0aee1971789f260410e5" dependencies = [ "openssl 0.10.24 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/components/sup/src/event/error.rs b/components/sup/src/event/error.rs index 1f474f5065..737d168c4d 100644 --- a/components/sup/src/event/error.rs +++ b/components/sup/src/event/error.rs @@ -30,7 +30,7 @@ impl fmt::Display for Error { Error::ConnectEventServerError(_) => { "Could not establish streaming connection to NATS server".fmt(f) } - Error::NatsError(e) => e.fmt(f), + Error::NatsError(e) => format!("NATS event stream error '{}'", e).fmt(f), Error::SpawnEventThreadError(_) => "Could not spawn eventing thread".fmt(f), } } diff --git a/components/sup/src/event/stream.rs b/components/sup/src/event/stream.rs index f11c0c680b..8c033e6925 100644 --- a/components/sup/src/event/stream.rs +++ b/components/sup/src/event/stream.rs @@ -12,6 +12,19 @@ use tokio::{prelude::Stream, /// All messages are published under this subject. const HABITAT_SUBJECT: &str = "habitat"; +const NATS_SCHEME: &str = "nats://"; + +fn nats_uri(uri: &str, auth_token: &str) -> String { + // Unconditionally, remove the scheme. We will add it back. + let uri = String::from(uri).replace(NATS_SCHEME, ""); + // If the uri contains credentials or the auth token is empty use the uri as is. Otherwise, add + // the auth token. + if uri.contains('@') || auth_token.is_empty() { + format!("{}{}", NATS_SCHEME, uri) + } else { + format!("{}{}@{}", NATS_SCHEME, auth_token, uri) + } +} pub(super) fn init_stream(conn_info: EventStreamConnectionInfo) -> Result { // TODO (DM): This cannot be unbounded. We need backpressure. If the connection is down when we @@ -23,11 +36,10 @@ pub(super) fn init_stream(conn_info: EventStreamConnectionInfo) -> Result Result Result Date: Thu, 15 Aug 2019 12:29:46 -0400 Subject: [PATCH 09/14] Switch to bounded event channel Signed-off-by: David McNeil --- Cargo.lock | 1 + components/sup/Cargo.toml | 1 + components/sup/src/event.rs | 20 +++++++++++--------- components/sup/src/event/stream.rs | 6 ++---- 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8325bde3fc..3da00c8011 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1419,6 +1419,7 @@ dependencies = [ "notify 4.0.12 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "palaver 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", "prometheus 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/components/sup/Cargo.toml b/components/sup/Cargo.toml index 515e31e061..ecb3cc78d3 100644 --- a/components/sup/Cargo.toml +++ b/components/sup/Cargo.toml @@ -40,6 +40,7 @@ log4rs = "*" nats = { git = "https://github.com/habitat-sh/rust-nats" } notify = "*" num_cpus = "*" +parking_lot = "*" prometheus = "*" prost = "*" prost-derive = "*" diff --git a/components/sup/src/event.rs b/components/sup/src/event.rs index 96268524f8..dcc9316512 100644 --- a/components/sup/src/event.rs +++ b/components/sup/src/event.rs @@ -28,11 +28,12 @@ use crate::manager::{service::{HealthCheckResult, use clap::ArgMatches; pub use error::{Error, Result}; -use futures::sync::mpsc::UnboundedSender; +use futures::sync::mpsc::Sender; use habitat_common::types::{AutomateAuthToken, EventStreamConnectMethod, EventStreamMetadata}; use habitat_core::package::ident::PackageIdent; +use parking_lot::Mutex; use state::Container; use std::{net::SocketAddr, sync::Once, @@ -48,6 +49,7 @@ lazy_static! { /// Core information that is shared between all events. static ref EVENT_CORE: Container = Container::new(); } +type EventStreamContainer = Mutex; /// Starts a new thread for sending events to a NATS Streaming /// server. Stashes the handle to the stream, as well as the core @@ -62,7 +64,7 @@ pub fn init_stream(config: EventStreamConfig, event_core: EventCore) -> Result<( let conn_info = EventStreamConnectionInfo::new(&event_core.supervisor_id, config); match stream::init_stream(conn_info) { Ok(event_stream) => { - EVENT_STREAM.set(event_stream); + EVENT_STREAM.set(EventStreamContainer::new(event_stream)); EVENT_CORE.set(event_core); } Err(e) => return_value = Err(e), @@ -204,7 +206,7 @@ pub fn health_check(metadata: ServiceMetadata, /// Internal helper function to know whether or not to go to the trouble of /// creating event structures. If the event stream hasn't been /// initialized, then we shouldn't need to do anything. -fn stream_initialized() -> bool { EVENT_STREAM.try_get::().is_some() } +fn stream_initialized() -> bool { EVENT_STREAM.try_get::().is_some() } /// Publish an event. This is the main interface that client code will /// use. @@ -212,7 +214,7 @@ fn stream_initialized() -> bool { EVENT_STREAM.try_get::().is_some( /// If `init_stream` has not been called already, this function will /// be a no-op. fn publish(mut event: impl EventMessage) { - if let Some(e) = EVENT_STREAM.try_get::() { + if let Some(e) = EVENT_STREAM.try_get::() { // TODO (CM): Yeah... this is looking pretty gross. The // intention is to be able to timestamp the events right as // they go out. @@ -229,20 +231,20 @@ fn publish(mut event: impl EventMessage) { Some(std::time::SystemTime::now().into()), ..EVENT_CORE.get::().to_event_metadata() }); - e.send(event.to_bytes()); + e.lock().send(event.to_bytes()); } } /// A lightweight handle for the event stream. All events get to the /// event stream through this. -struct EventStream(UnboundedSender>); +struct EventStream(Sender>); impl EventStream { /// Queues an event to be sent out. - fn send(&self, event: Vec) { + fn send(&mut self, event: Vec) { trace!("About to queue an event: {:?}", event); - if let Err(e) = self.0.unbounded_send(event) { - error!("Failed to queue event: {:?}", e); + if let Err(e) = self.0.try_send(event) { + error!("Failed to queue event: {}", e); } } } diff --git a/components/sup/src/event/stream.rs b/components/sup/src/event/stream.rs index 8c033e6925..add6fe1bf4 100644 --- a/components/sup/src/event/stream.rs +++ b/components/sup/src/event/stream.rs @@ -13,6 +13,7 @@ use tokio::{prelude::Stream, /// All messages are published under this subject. const HABITAT_SUBJECT: &str = "habitat"; const NATS_SCHEME: &str = "nats://"; +const EVENT_CHANNEL_SIZE: usize = 1024; fn nats_uri(uri: &str, auth_token: &str) -> String { // Unconditionally, remove the scheme. We will add it back. @@ -27,10 +28,7 @@ fn nats_uri(uri: &str, auth_token: &str) -> String { } pub(super) fn init_stream(conn_info: EventStreamConnectionInfo) -> Result { - // TODO (DM): This cannot be unbounded. We need backpressure. If the connection is down when we - // try to publish we try to reconnect this can be time consuming so we can easily get - // behind. - let (event_tx, event_rx) = futures_mpsc::unbounded(); + let (event_tx, event_rx) = futures_mpsc::channel(EVENT_CHANNEL_SIZE); let (sync_tx, sync_rx) = std_mpsc::sync_channel(0); // rendezvous channel let EventStreamConnectionInfo { name, From 142fd6f980650f8216e0fdf8cbb32aba9514691d Mon Sep 17 00:00:00 2001 From: David McNeil Date: Thu, 15 Aug 2019 12:36:48 -0400 Subject: [PATCH 10/14] Remove nitox pipeline tests Signed-off-by: David McNeil --- .expeditor/verify.pipeline.yml | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/.expeditor/verify.pipeline.yml b/.expeditor/verify.pipeline.yml index 9ed733b0c2..da1eef2108 100644 --- a/.expeditor/verify.pipeline.yml +++ b/.expeditor/verify.pipeline.yml @@ -330,20 +330,6 @@ steps: timeout_in_minutes: 40 soft_fail: true - - label: "[unit] :linux: sup nitox_stream" - command: - - ./test/run_cargo_test.sh sup --features "ignore_integration_tests nitox_stream" - agents: - queue: 'default-privileged' - plugins: - docker#v3.0.1: - always-pull: true - user: "buildkite-agent" - group: "buildkite-agent" - image: "chefes/buildkite" - timeout_in_minutes: 10 - soft_fail: true - - label: "[unit] :linux: sup-client" command: - ./test/run_cargo_test.sh sup-client @@ -628,23 +614,6 @@ steps: automatic: limit: 1 - - label: "[unit] :windows: sup nitox_stream" - command: - # This test has test (not code) concurrency issues and will fail if we don't limit it - - ./test/run_cargo_test.ps1 sup -Features "nitox_stream" -TestOptions "--test-threads=1" - agents: - queue: 'default-windows-2016-privileged' - plugins: - docker#v3.2.0: - image: "chefes/buildkite-windows-2016" - shell: [ "powershell", "-Command" ] - always-pull: true - propagate-environment: true - timeout_in_minutes: 40 - retry: - automatic: - limit: 1 - - label: "[unit] :windows: sup-client" command: - ./test/run_cargo_test.ps1 sup-client From 799feaa6d603a7b010d51c781f30412819dffc04 Mon Sep 17 00:00:00 2001 From: David McNeil Date: Thu, 15 Aug 2019 16:03:04 -0400 Subject: [PATCH 11/14] Use correct healthcheck subject Signed-off-by: David McNeil --- Cargo.lock | 3 ++- components/sup/src/event/stream.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3da00c8011..fe9d6d7f90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2065,9 +2065,10 @@ dependencies = [ [[package]] name = "nats" version = "0.3.2" -source = "git+https://github.com/habitat-sh/rust-nats#cc51a37e49751d393abb0aee1971789f260410e5" +source = "git+https://github.com/habitat-sh/rust-nats#833fe87df57e21586848e61b612f94f154af7127" dependencies = [ "openssl 0.10.24 (registry+https://github.com/rust-lang/crates.io-index)", + "percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.98 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/components/sup/src/event/stream.rs b/components/sup/src/event/stream.rs index add6fe1bf4..58e8817447 100644 --- a/components/sup/src/event/stream.rs +++ b/components/sup/src/event/stream.rs @@ -11,7 +11,7 @@ use tokio::{prelude::Stream, runtime::current_thread::Runtime}; /// All messages are published under this subject. -const HABITAT_SUBJECT: &str = "habitat"; +const HABITAT_SUBJECT: &str = "habitat.event.healthcheck"; const NATS_SCHEME: &str = "nats://"; const EVENT_CHANNEL_SIZE: usize = 1024; From 15228661d18f3ff4adcde8d5072e66934b7e941c Mon Sep 17 00:00:00 2001 From: David McNeil Date: Fri, 16 Aug 2019 10:37:06 -0400 Subject: [PATCH 12/14] Move runtime into Manager Signed-off-by: David McNeil --- components/sup/src/manager.rs | 51 ++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/components/sup/src/manager.rs b/components/sup/src/manager.rs index c0c3266011..bea8350993 100644 --- a/components/sup/src/manager.rs +++ b/components/sup/src/manager.rs @@ -404,6 +404,9 @@ pub struct Manager { services_need_reconciliation: ReconciliationFlag, feature_flags: FeatureFlag, + /// The runtime for spawning various `Manager` futures. Eventually, `Manager` could become a + /// future itself. This would allow us to remove this runtime and simply use `tokio::spawn`. + runtime: Runtime, } impl Manager { @@ -447,6 +450,11 @@ impl Manager { /// * `MemberList::initial_members` (write) fn new_imlw(cfg: ManagerConfig, fs_cfg: FsCfg, launcher: LauncherCli) -> Result { debug!("new(cfg: {:?}, fs_cfg: {:?}", cfg, fs_cfg); + let runtime = + RuntimeBuilder::new().name_prefix("tokio-") + .core_threads(TokioThreadCount::configured_value().into()) + .build() + .expect("Couldn't build Tokio Runtime!"); let current = PackageIdent::from_str(&format!("{}/{}", SUP_PKG_IDENT, VERSION)).unwrap(); outputln!("{} ({})", SUP_PKG_IDENT, current); let cfg_static = cfg.clone(); @@ -537,7 +545,8 @@ impl Manager { http_disable: cfg.http_disable, busy_services: Arc::new(Mutex::new(HashSet::new())), services_need_reconciliation: ReconciliationFlag::new(false), - feature_flags: cfg.feature_flags }) + feature_flags: cfg.feature_flags, + runtime }) } /// Load the initial Butterly Member which is used in initializing the Butterfly server. This @@ -713,12 +722,6 @@ impl Manager { let mut next_cpu_measurement = SteadyTime::now(); let mut cpu_start = ProcessTime::now(); - let mut runtime = - RuntimeBuilder::new().name_prefix("tokio-") - .core_threads(TokioThreadCount::configured_value().into()) - .build() - .expect("Couldn't build Tokio Runtime!"); - // TODO (CM): consider bundling up these disparate channel // ends into a single struct that handles the communication // between the CtlAcceptor and this main loop. @@ -736,7 +739,7 @@ impl Manager { executor::spawn(handler); Ok(()) }); - runtime.spawn(ctl_handler); + self.runtime.spawn(ctl_handler); if let Some(svc_load) = svc { commands::service_load(&self.state, &mut CtlRequest::default(), svc_load)?; @@ -744,7 +747,7 @@ impl Manager { // This serves to start up any services that need starting // (which will be all of them at this point!) - self.maybe_spawn_service_futures_rsw_mlw(&mut runtime); + self.maybe_spawn_service_futures_rsw_mlw(); outputln!("Starting gossip-listener on {}", self.butterfly.gossip_addr()); @@ -916,12 +919,12 @@ impl Manager { warn!("Tried to stop '{}', but couldn't update the spec: {:?}", service_spec.ident, err); } - self.stop_service(&service_spec.ident, &shutdown_input, &mut runtime); + self.stop_service(&service_spec.ident, &shutdown_input); } SupervisorAction::UnloadService { service_spec, shutdown_input, } => { self.remove_spec_file(&service_spec.ident).ok(); - self.stop_service(&service_spec.ident, &shutdown_input, &mut runtime); + self.stop_service(&service_spec.ident, &shutdown_input); } } } @@ -948,14 +951,14 @@ impl Manager { // event in the specs directory is registered, or // another service finishes shutting down). self.services_need_reconciliation.toggle_if_set(); - self.maybe_spawn_service_futures_rsw_mlw(&mut runtime); + self.maybe_spawn_service_futures_rsw_mlw(); } self.update_peers_from_watch_file_mlr_imlw()?; self.update_running_services_from_user_config_watcher(); for f in self.stop_services_with_updates_rsw_mlr() { - runtime.spawn(f); + self.runtime.spawn(f); } self.restart_elections_rsw_mlr(self.feature_flags); @@ -986,7 +989,7 @@ impl Manager { // this var goes out of scope #[allow(unused_variables)] let service_timer = service_hist.start_timer(); - if service.tick(&self.census_ring, &self.launcher, &runtime.executor()) { + if service.tick(&self.census_ring, &self.launcher, &self.runtime.executor()) { self.gossip_latest_service_rumor_rsw_mlw(&service); } } @@ -1046,15 +1049,16 @@ impl Manager { .expect("Services lock is poisoned!"); for (_ident, svc) in svcs.drain() { - runtime.spawn(self.stop_service_future(svc, None)); + self.runtime.spawn(self.stop_service_future(svc, None)); } } } // Allow all existing futures to run to completion. - runtime.shutdown_on_idle() - .wait() - .expect("Error waiting on Tokio runtime to shutdown"); + self.runtime + .shutdown_on_idle() + .wait() + .expect("Error waiting on Tokio runtime to shutdown"); release_process_lock(&self.fs_cfg); self.butterfly.persist_data_rsr_mlr(); @@ -1283,13 +1287,10 @@ impl Manager { self.butterfly.restart_elections_rsw_mlr(feature_flags); } - fn stop_service(&mut self, - ident: &PackageIdent, - shutdown_input: &ShutdownInput, - runtime: &mut Runtime) { + fn stop_service(&mut self, ident: &PackageIdent, shutdown_input: &ShutdownInput) { if let Some(service) = self.remove_service_from_state(&ident) { let future = self.stop_service_future(service, Some(shutdown_input)); - runtime.spawn(future); + self.runtime.spawn(future); } else { warn!("Tried to stop '{}', but couldn't find it in our list of running services!", ident); @@ -1398,10 +1399,10 @@ impl Manager { /// # Locking (see locking.md) /// * `RumorStore::list` (write) /// * `MemberList::entries` (write) - fn maybe_spawn_service_futures_rsw_mlw(&mut self, runtime: &mut Runtime) { + fn maybe_spawn_service_futures_rsw_mlw(&mut self) { let ops = self.compute_service_operations(); for f in self.operations_into_futures_rsw_mlw(ops) { - runtime.spawn(f); + self.runtime.spawn(f); } } From b9d80a29d0b853f159fd97d8ea9efa828e25d7aa Mon Sep 17 00:00:00 2001 From: David McNeil Date: Fri, 16 Aug 2019 16:20:03 -0400 Subject: [PATCH 13/14] Use Manager's runtime for event stream Signed-off-by: David McNeil --- components/common/src/types.rs | 9 --- components/sup/src/event.rs | 42 +++++++++++--- components/sup/src/event/error.rs | 13 ++--- components/sup/src/event/stream.rs | 90 ++++++++++++++---------------- components/sup/src/manager.rs | 5 +- 5 files changed, 83 insertions(+), 76 deletions(-) diff --git a/components/common/src/types.rs b/components/common/src/types.rs index 5835c28e33..5bd43573f6 100644 --- a/components/common/src/types.rs +++ b/components/common/src/types.rs @@ -189,15 +189,6 @@ impl<'a> From<&'a ArgMatches<'a>> for EventStreamConnectMethod { } } -impl EventStreamConnectMethod { - pub fn is_timeout(&self) -> bool { - match self { - EventStreamConnectMethod::Timeout { .. } => true, - _ => false, - } - } -} - impl Into> for EventStreamConnectMethod { fn into(self) -> Option { match self { diff --git a/components/sup/src/event.rs b/components/sup/src/event.rs index dcc9316512..f7ccf0cb4a 100644 --- a/components/sup/src/event.rs +++ b/components/sup/src/event.rs @@ -22,9 +22,10 @@ use self::types::{EventMessage, ServiceStartedEvent, ServiceStoppedEvent, ServiceUpdateStartedEvent}; -use crate::manager::{service::{HealthCheckResult, - Service}, - sys::Sys}; +use crate::{manager::{service::{HealthCheckResult, + Service}, + sys::Sys}, + sup_futures::FutureHandle}; use clap::ArgMatches; pub use error::{Error, Result}; @@ -38,6 +39,7 @@ use state::Container; use std::{net::SocketAddr, sync::Once, time::Duration}; +use tokio::runtime::Runtime; static INIT: Once = Once::new(); lazy_static! { @@ -55,14 +57,17 @@ type EventStreamContainer = Mutex; /// server. Stashes the handle to the stream, as well as the core /// event information that will be a part of all events, in a global /// static reference for access later. -pub fn init_stream(config: EventStreamConfig, event_core: EventCore) -> Result<()> { +pub fn init_stream(config: EventStreamConfig, + event_core: EventCore, + runtime: &mut Runtime) + -> Result<()> { // call_once can't return a Result (or anything), so we'll fake it // by hanging onto any error we might receive. let mut return_value: Result<()> = Ok(()); INIT.call_once(|| { let conn_info = EventStreamConnectionInfo::new(&event_core.supervisor_id, config); - match stream::init_stream(conn_info) { + match stream::init_stream(conn_info, runtime) { Ok(event_stream) => { EVENT_STREAM.set(EventStreamContainer::new(event_stream)); EVENT_CORE.set(event_core); @@ -74,6 +79,19 @@ pub fn init_stream(config: EventStreamConfig, event_core: EventCore) -> Result<( return_value } +/// Stop the event stream future so we can gracefully shutdown the runtime. +/// +/// `init_stream` and `stop_stream` cannot be called more than once. The singleton event stream can +/// only be set once. If `init_stream` is called after calling `stop_stream` no new event stream +/// will be started. +pub fn stop_stream() { + if let Some(e) = EVENT_STREAM.try_get::() { + if let Some(h) = e.lock().handle.take() { + h.terminate(); + } + } +} + /// Captures all event stream-related configuration options that would /// be passed in by a user #[derive(Clone, Debug)] @@ -237,13 +255,23 @@ fn publish(mut event: impl EventMessage) { /// A lightweight handle for the event stream. All events get to the /// event stream through this. -struct EventStream(Sender>); +struct EventStream { + sender: Sender>, + handle: Option, +} impl EventStream { + fn new(sender: Sender>, handle: FutureHandle) -> Self { + Self { sender, + handle: Some(handle) } + } + /// Queues an event to be sent out. fn send(&mut self, event: Vec) { trace!("About to queue an event: {:?}", event); - if let Err(e) = self.0.try_send(event) { + // If the server is down, the event stream channel may fill up. If this happens we are ok + // dropping events. + if let Err(e) = self.sender.try_send(event) { error!("Failed to queue event: {}", e); } } diff --git a/components/sup/src/event/error.rs b/components/sup/src/event/error.rs index 737d168c4d..02c59daee6 100644 --- a/components/sup/src/event/error.rs +++ b/components/sup/src/event/error.rs @@ -4,14 +4,13 @@ use nats::NatsError; use std::{error, fmt, io, - result, - sync::mpsc}; + result}; pub type Result = result::Result; #[derive(Debug)] pub enum Error { - ConnectEventServerError(mpsc::RecvTimeoutError), + ConnectEventServerError, NatsError(NatsError), SpawnEventThreadError(io::Error), } @@ -27,7 +26,7 @@ pub enum Error { impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Error::ConnectEventServerError(_) => { + Error::ConnectEventServerError => { "Could not establish streaming connection to NATS server".fmt(f) } Error::NatsError(e) => format!("NATS event stream error '{}'", e).fmt(f), @@ -39,7 +38,7 @@ impl fmt::Display for Error { impl error::Error for Error { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { - Error::ConnectEventServerError(ref e) => Some(e), + Error::ConnectEventServerError => None, Error::NatsError(ref e) => Some(e), Error::SpawnEventThreadError(ref e) => Some(e), } @@ -49,7 +48,3 @@ impl error::Error for Error { impl From for Error { fn from(error: NatsError) -> Self { Error::NatsError(error) } } - -impl From for Error { - fn from(error: mpsc::RecvTimeoutError) -> Self { Error::ConnectEventServerError(error) } -} diff --git a/components/sup/src/event/stream.rs b/components/sup/src/event/stream.rs index 58e8817447..c7c8b2f657 100644 --- a/components/sup/src/event/stream.rs +++ b/components/sup/src/event/stream.rs @@ -1,14 +1,16 @@ -use crate::event::{Error, - EventStream, - EventStreamConnectionInfo, - Result}; -use futures::sync::mpsc as futures_mpsc; +use crate::{event::{Error, + EventStream, + EventStreamConnectionInfo, + Result}, + sup_futures}; +use futures::{future::Future, + sync::mpsc as futures_mpsc}; use nats::Client; -use std::{sync::mpsc as std_mpsc, - thread, - time::Duration}; +use std::{thread, + time::{Duration, + Instant}}; use tokio::{prelude::Stream, - runtime::current_thread::Runtime}; + runtime::Runtime}; /// All messages are published under this subject. const HABITAT_SUBJECT: &str = "habitat.event.healthcheck"; @@ -27,16 +29,16 @@ fn nats_uri(uri: &str, auth_token: &str) -> String { } } -pub(super) fn init_stream(conn_info: EventStreamConnectionInfo) -> Result { +pub(super) fn init_stream(conn_info: EventStreamConnectionInfo, + runtime: &mut Runtime) + -> Result { let (event_tx, event_rx) = futures_mpsc::channel(EVENT_CHANNEL_SIZE); - let (sync_tx, sync_rx) = std_mpsc::sync_channel(0); // rendezvous channel let EventStreamConnectionInfo { name, verbose, cluster_uri, auth_token, connect_method, } = conn_info; - let connection_is_timeout = connect_method.is_timeout(); let uri = nats_uri(&cluster_uri, &auth_token.to_string()); // Note: With the way we are using the client, we will not respond to pings from the server @@ -49,47 +51,37 @@ pub(super) fn init_stream(conn_info: EventStreamConnectionInfo) -> Result start + timeout { + return Err(Error::ConnectEventServerError); } - thread::sleep(Duration::from_secs(1)) + } else { + warn!("Failed to connect to NATS server '{}'.", e); + break; } + thread::sleep(Duration::from_secs(1)); + } - let event_handler = event_rx.for_each(move |event: Vec| { - if let Err(e) = client.publish(HABITAT_SUBJECT, &event) { - error!("Failed to publish event, '{}'", e); - } - Ok(()) - }); - - Runtime::new().expect("Couldn't create event stream runtime!") - .spawn(event_handler) - .run() - .expect("something seriously wrong has occurred"); - }; - - thread::Builder::new().name("events".to_string()) - .spawn(closure) - .map_err(Error::SpawnEventThreadError)?; + let (handle, event_handler) = + sup_futures::cancelable_future(event_rx.for_each(move |event: Vec| { + if let Err(e) = + client.publish(HABITAT_SUBJECT, &event) + { + error!("Failed to publish event, '{}'", e); + } + Ok(()) + })); + // Convert the error type so `spawn` can handle it. + let event_handler = event_handler.map_err(|_| ()); + runtime.spawn(event_handler); - if let Some(timeout) = connect_method.into() { - sync_rx.recv_timeout(timeout)?; - } - Ok(EventStream(event_tx)) + Ok(EventStream::new(event_tx, handle)) } #[cfg(test)] diff --git a/components/sup/src/manager.rs b/components/sup/src/manager.rs index bea8350993..18ceae63c9 100644 --- a/components/sup/src/manager.rs +++ b/components/sup/src/manager.rs @@ -450,7 +450,7 @@ impl Manager { /// * `MemberList::initial_members` (write) fn new_imlw(cfg: ManagerConfig, fs_cfg: FsCfg, launcher: LauncherCli) -> Result { debug!("new(cfg: {:?}, fs_cfg: {:?}", cfg, fs_cfg); - let runtime = + let mut runtime = RuntimeBuilder::new().name_prefix("tokio-") .core_threads(TokioThreadCount::configured_value().into()) .build() @@ -522,7 +522,7 @@ impl Manager { let ec = EventCore::new(&es_config, &sys, fqdn); // unwrap won't fail here; if there were an issue, from_env() // would have already propagated an error up the stack. - event::init_stream(es_config, ec)?; + event::init_stream(es_config, ec, &mut runtime)?; } Ok(Manager { state: Arc::new(ManagerState { cfg: cfg_static, @@ -1055,6 +1055,7 @@ impl Manager { } // Allow all existing futures to run to completion. + event::stop_stream(); self.runtime .shutdown_on_idle() .wait() From 0df9b3a550903386fbcf6a3cda59fbe056a9f270 Mon Sep 17 00:00:00 2001 From: David McNeil Date: Mon, 19 Aug 2019 08:56:48 -0400 Subject: [PATCH 14/14] Use correct subjects for event types Signed-off-by: David McNeil --- components/sup/src/event.rs | 43 ++++++++++++++++++++++-------- components/sup/src/event/stream.rs | 15 ++++++----- 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/components/sup/src/event.rs b/components/sup/src/event.rs index f7ccf0cb4a..a11778a57a 100644 --- a/components/sup/src/event.rs +++ b/components/sup/src/event.rs @@ -41,6 +41,11 @@ use std::{net::SocketAddr, time::Duration}; use tokio::runtime::Runtime; +const SERVICE_STARTED_SUBJECT: &str = "habitat.event.service_started"; +const SERVICE_STOPPED_SUBJECT: &str = "habitat.event.service_stopped"; +const SERVICE_UPDATE_STARTED_SUBJECT: &str = "habitat.event.service_update_started"; +const HEALTHCHECK_SUBJECT: &str = "habitat.event.healthcheck"; + static INIT: Once = Once::new(); lazy_static! { // TODO (CM): When const fn support lands in stable, we can ditch @@ -179,7 +184,8 @@ impl EventCore { /// Send an event for the start of a Service. pub fn service_started(service: &Service) { if stream_initialized() { - publish(ServiceStartedEvent { service_metadata: Some(service.to_service_metadata()), + publish(SERVICE_STARTED_SUBJECT, + ServiceStartedEvent { service_metadata: Some(service.to_service_metadata()), event_metadata: None, }); } } @@ -187,7 +193,8 @@ pub fn service_started(service: &Service) { /// Send an event for the stop of a Service. pub fn service_stopped(service: &Service) { if stream_initialized() { - publish(ServiceStoppedEvent { service_metadata: Some(service.to_service_metadata()), + publish(SERVICE_STOPPED_SUBJECT, + ServiceStoppedEvent { service_metadata: Some(service.to_service_metadata()), event_metadata: None, }); } } @@ -195,7 +202,8 @@ pub fn service_stopped(service: &Service) { /// Send an event at the start of a Service update. pub fn service_update_started(service: &Service, update: &PackageIdent) { if stream_initialized() { - publish(ServiceUpdateStartedEvent { event_metadata: None, + publish(SERVICE_UPDATE_STARTED_SUBJECT, + ServiceUpdateStartedEvent { event_metadata: None, service_metadata: Some(service.to_service_metadata()), update_package_ident: update.clone().to_string(), }); @@ -212,7 +220,8 @@ pub fn health_check(metadata: ServiceMetadata, execution: Option) { if stream_initialized() { let check_result: types::HealthCheckResult = check_result.into(); - publish(HealthCheckEvent { service_metadata: Some(metadata), + publish(HEALTHCHECK_SUBJECT, + HealthCheckEvent { service_metadata: Some(metadata), event_metadata: None, result: i32::from(check_result), execution: execution.map(Duration::into), }); @@ -231,7 +240,7 @@ fn stream_initialized() -> bool { EVENT_STREAM.try_get::() /// /// If `init_stream` has not been called already, this function will /// be a no-op. -fn publish(mut event: impl EventMessage) { +fn publish(subject: &'static str, mut event: impl EventMessage) { if let Some(e) = EVENT_STREAM.try_get::() { // TODO (CM): Yeah... this is looking pretty gross. The // intention is to be able to timestamp the events right as @@ -249,29 +258,41 @@ fn publish(mut event: impl EventMessage) { Some(std::time::SystemTime::now().into()), ..EVENT_CORE.get::().to_event_metadata() }); - e.lock().send(event.to_bytes()); + let packet = EventPacket::new(subject, event.to_bytes()); + e.lock().send(packet); } } +/// The subject and payload of an event message. +#[derive(Debug)] +struct EventPacket { + subject: &'static str, + payload: Vec, +} + +impl EventPacket { + fn new(subject: &'static str, payload: Vec) -> Self { Self { subject, payload } } +} + /// A lightweight handle for the event stream. All events get to the /// event stream through this. struct EventStream { - sender: Sender>, + sender: Sender, handle: Option, } impl EventStream { - fn new(sender: Sender>, handle: FutureHandle) -> Self { + fn new(sender: Sender, handle: FutureHandle) -> Self { Self { sender, handle: Some(handle) } } /// Queues an event to be sent out. - fn send(&mut self, event: Vec) { - trace!("About to queue an event: {:?}", event); + fn send(&mut self, event_packet: EventPacket) { + trace!("About to queue an event: {:?}", event_packet); // If the server is down, the event stream channel may fill up. If this happens we are ok // dropping events. - if let Err(e) = self.sender.try_send(event) { + if let Err(e) = self.sender.try_send(event_packet) { error!("Failed to queue event: {}", e); } } diff --git a/components/sup/src/event/stream.rs b/components/sup/src/event/stream.rs index c7c8b2f657..40c1c176d6 100644 --- a/components/sup/src/event/stream.rs +++ b/components/sup/src/event/stream.rs @@ -1,4 +1,5 @@ use crate::{event::{Error, + EventPacket, EventStream, EventStreamConnectionInfo, Result}, @@ -12,8 +13,6 @@ use std::{thread, use tokio::{prelude::Stream, runtime::Runtime}; -/// All messages are published under this subject. -const HABITAT_SUBJECT: &str = "habitat.event.healthcheck"; const NATS_SCHEME: &str = "nats://"; const EVENT_CHANNEL_SIZE: usize = 1024; @@ -57,10 +56,10 @@ pub(super) fn init_stream(conn_info: EventStreamConnectionInfo, let maybe_timeout = connect_method.into(); while let Err(e) = client.connect() { if let Some(timeout) = maybe_timeout { - error!("Failed to connect to NATS server '{}'. Retrying...", e); if Instant::now() > start + timeout { return Err(Error::ConnectEventServerError); } + error!("Failed to connect to NATS server '{}'. Retrying...", e); } else { warn!("Failed to connect to NATS server '{}'.", e); break; @@ -69,11 +68,13 @@ pub(super) fn init_stream(conn_info: EventStreamConnectionInfo, } let (handle, event_handler) = - sup_futures::cancelable_future(event_rx.for_each(move |event: Vec| { - if let Err(e) = - client.publish(HABITAT_SUBJECT, &event) + sup_futures::cancelable_future(event_rx.for_each(move |packet: EventPacket| { + if let Err(e) = client.publish(packet.subject, + &packet.payload) { - error!("Failed to publish event, '{}'", e); + error!("Failed to publish event to '{}', \ + '{}'", + packet.subject, e); } Ok(()) }));