Skip to content

Commit

Permalink
Use correct subjects for event types
Browse files Browse the repository at this point in the history
Signed-off-by: David McNeil <[email protected]>
  • Loading branch information
davidMcneil committed Aug 19, 2019
1 parent b9d80a2 commit 0df9b3a
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 18 deletions.
43 changes: 32 additions & 11 deletions components/sup/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ use std::{net::SocketAddr,
time::Duration};
use tokio::runtime::Runtime;

const SERVICE_STARTED_SUBJECT: &str = "habitat.event.service_started";
const SERVICE_STOPPED_SUBJECT: &str = "habitat.event.service_stopped";
const SERVICE_UPDATE_STARTED_SUBJECT: &str = "habitat.event.service_update_started";
const HEALTHCHECK_SUBJECT: &str = "habitat.event.healthcheck";

static INIT: Once = Once::new();
lazy_static! {
// TODO (CM): When const fn support lands in stable, we can ditch
Expand Down Expand Up @@ -179,23 +184,26 @@ impl EventCore {
/// Send an event for the start of a Service.
pub fn service_started(service: &Service) {
if stream_initialized() {
publish(ServiceStartedEvent { service_metadata: Some(service.to_service_metadata()),
publish(SERVICE_STARTED_SUBJECT,
ServiceStartedEvent { service_metadata: Some(service.to_service_metadata()),
event_metadata: None, });
}
}

/// Send an event for the stop of a Service.
pub fn service_stopped(service: &Service) {
if stream_initialized() {
publish(ServiceStoppedEvent { service_metadata: Some(service.to_service_metadata()),
publish(SERVICE_STOPPED_SUBJECT,
ServiceStoppedEvent { service_metadata: Some(service.to_service_metadata()),
event_metadata: None, });
}
}

/// Send an event at the start of a Service update.
pub fn service_update_started(service: &Service, update: &PackageIdent) {
if stream_initialized() {
publish(ServiceUpdateStartedEvent { event_metadata: None,
publish(SERVICE_UPDATE_STARTED_SUBJECT,
ServiceUpdateStartedEvent { event_metadata: None,
service_metadata:
Some(service.to_service_metadata()),
update_package_ident: update.clone().to_string(), });
Expand All @@ -212,7 +220,8 @@ pub fn health_check(metadata: ServiceMetadata,
execution: Option<Duration>) {
if stream_initialized() {
let check_result: types::HealthCheckResult = check_result.into();
publish(HealthCheckEvent { service_metadata: Some(metadata),
publish(HEALTHCHECK_SUBJECT,
HealthCheckEvent { service_metadata: Some(metadata),
event_metadata: None,
result: i32::from(check_result),
execution: execution.map(Duration::into), });
Expand All @@ -231,7 +240,7 @@ fn stream_initialized() -> bool { EVENT_STREAM.try_get::<EventStreamContainer>()
///
/// If `init_stream` has not been called already, this function will
/// be a no-op.
fn publish(mut event: impl EventMessage) {
fn publish(subject: &'static str, mut event: impl EventMessage) {
if let Some(e) = EVENT_STREAM.try_get::<EventStreamContainer>() {
// TODO (CM): Yeah... this is looking pretty gross. The
// intention is to be able to timestamp the events right as
Expand All @@ -249,29 +258,41 @@ fn publish(mut event: impl EventMessage) {
Some(std::time::SystemTime::now().into()),
..EVENT_CORE.get::<EventCore>().to_event_metadata() });

e.lock().send(event.to_bytes());
let packet = EventPacket::new(subject, event.to_bytes());
e.lock().send(packet);
}
}

/// The subject and payload of an event message.
#[derive(Debug)]
struct EventPacket {
subject: &'static str,
payload: Vec<u8>,
}

impl EventPacket {
fn new(subject: &'static str, payload: Vec<u8>) -> Self { Self { subject, payload } }
}

/// A lightweight handle for the event stream. All events get to the
/// event stream through this.
struct EventStream {
sender: Sender<Vec<u8>>,
sender: Sender<EventPacket>,
handle: Option<FutureHandle>,
}

impl EventStream {
fn new(sender: Sender<Vec<u8>>, handle: FutureHandle) -> Self {
fn new(sender: Sender<EventPacket>, handle: FutureHandle) -> Self {
Self { sender,
handle: Some(handle) }
}

/// Queues an event to be sent out.
fn send(&mut self, event: Vec<u8>) {
trace!("About to queue an event: {:?}", event);
fn send(&mut self, event_packet: EventPacket) {
trace!("About to queue an event: {:?}", event_packet);
// If the server is down, the event stream channel may fill up. If this happens we are ok
// dropping events.
if let Err(e) = self.sender.try_send(event) {
if let Err(e) = self.sender.try_send(event_packet) {
error!("Failed to queue event: {}", e);
}
}
Expand Down
15 changes: 8 additions & 7 deletions components/sup/src/event/stream.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{event::{Error,
EventPacket,
EventStream,
EventStreamConnectionInfo,
Result},
Expand All @@ -12,8 +13,6 @@ use std::{thread,
use tokio::{prelude::Stream,
runtime::Runtime};

/// All messages are published under this subject.
const HABITAT_SUBJECT: &str = "habitat.event.healthcheck";
const NATS_SCHEME: &str = "nats://";
const EVENT_CHANNEL_SIZE: usize = 1024;

Expand Down Expand Up @@ -57,10 +56,10 @@ pub(super) fn init_stream(conn_info: EventStreamConnectionInfo,
let maybe_timeout = connect_method.into();
while let Err(e) = client.connect() {
if let Some(timeout) = maybe_timeout {
error!("Failed to connect to NATS server '{}'. Retrying...", e);
if Instant::now() > start + timeout {
return Err(Error::ConnectEventServerError);
}
error!("Failed to connect to NATS server '{}'. Retrying...", e);
} else {
warn!("Failed to connect to NATS server '{}'.", e);
break;
Expand All @@ -69,11 +68,13 @@ pub(super) fn init_stream(conn_info: EventStreamConnectionInfo,
}

let (handle, event_handler) =
sup_futures::cancelable_future(event_rx.for_each(move |event: Vec<u8>| {
if let Err(e) =
client.publish(HABITAT_SUBJECT, &event)
sup_futures::cancelable_future(event_rx.for_each(move |packet: EventPacket| {
if let Err(e) = client.publish(packet.subject,
&packet.payload)
{
error!("Failed to publish event, '{}'", e);
error!("Failed to publish event to '{}', \
'{}'",
packet.subject, e);
}
Ok(())
}));
Expand Down

0 comments on commit 0df9b3a

Please sign in to comment.