Skip to content

Commit

Permalink
enhancement(dedupe transform): add internal events (#3809)
Browse files Browse the repository at this point in the history
Closes #3393

Signed-off-by: Luke Steensen <[email protected]>
Co-authored-by: Jean Mertz <[email protected]>
  • Loading branch information
lukesteensen and JeanMertz authored Sep 11, 2020
1 parent 54a4d8e commit ff555aa
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 5 deletions.
36 changes: 36 additions & 0 deletions src/internal_events/dedupe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub(crate) struct DedupeEventProcessed;

impl InternalEvent for DedupeEventProcessed {
fn emit_metrics(&self) {
counter!("events_processed", 1,
"component_kind" => "transform",
"component_type" => "dedupe",
);
}
}

#[derive(Debug)]
pub(crate) struct DedupeEventDiscarded {
pub event: crate::Event,
}

impl InternalEvent for DedupeEventDiscarded {
fn emit_logs(&self) {
warn!(
message = "Encountered duplicate event; discarding.",
rate_limit_secs = 30
);
trace!(message = "Encountered duplicate event; discarding.", event = ?self.event);
}

fn emit_metrics(&self) {
counter!("duplicate_events_discarded", 1,
"component_kind" => "transform",
"component_type" => "dedupe",
);
}
}
4 changes: 4 additions & 0 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ mod blackhole;
mod coercer;
#[cfg(feature = "transforms-concat")]
mod concat;
#[cfg(feature = "transforms-dedupe")]
mod dedupe;
#[cfg(feature = "sources-docker")]
mod docker;
mod elasticsearch;
Expand Down Expand Up @@ -80,6 +82,8 @@ pub use self::blackhole::*;
pub(crate) use self::coercer::*;
#[cfg(feature = "transforms-concat")]
pub use self::concat::*;
#[cfg(feature = "transforms-dedupe")]
pub(crate) use self::dedupe::*;
#[cfg(feature = "sources-docker")]
pub use self::docker::*;
pub use self::elasticsearch::*;
Expand Down
8 changes: 3 additions & 5 deletions src/transforms/dedupe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::Transform;
use crate::{
config::{log_schema, DataType, TransformConfig, TransformContext, TransformDescription},
event::{Event, Value},
internal_events::{DedupeEventDiscarded, DedupeEventProcessed},
};
use bytes::Bytes;
use lru::LruCache;
Expand Down Expand Up @@ -182,13 +183,10 @@ fn build_cache_entry(event: &Event, fields: &FieldMatchConfig) -> CacheEntry {

impl Transform for Dedupe {
fn transform(&mut self, event: Event) -> Option<Event> {
emit!(DedupeEventProcessed);
let cache_entry = build_cache_entry(&event, &self.config.fields);
if self.cache.put(cache_entry, true).is_some() {
warn!(
message = "Encountered duplicate event; discarding",
rate_limit_secs = 30
);
trace!(message = "Encountered duplicate event; discarding", ?event);
emit!(DedupeEventDiscarded { event });
None
} else {
Some(event)
Expand Down

0 comments on commit ff555aa

Please sign in to comment.