Skip to content

Commit

Permalink
benchmark-data-update: export csv
Browse files Browse the repository at this point in the history
  • Loading branch information
farnyser committed Mar 29, 2024
1 parent 8fcc11e commit 5b1b658
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 0 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/benchmark-data-update/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@ rustls = "0.20.8"
tracing = { version = "0.1", features = ["log"] }

hdrhistogram = "7.5.4"
csv = "1.0"
2 changes: 2 additions & 0 deletions bin/benchmark-data-update/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use services_mango_lib::env_helper::string_or_env;
pub struct Configuration {
#[serde(deserialize_with = "string_or_env")]
pub mango_group: String,
#[serde(deserialize_with = "string_or_env")]
pub export_csv_path: String,
pub source_configuration: SourceConfiguration,
}

Expand Down
10 changes: 10 additions & 0 deletions bin/benchmark-data-update/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::sync::atomic::Ordering;
use crate::configuration::Configuration;
use crate::processors::data::{DataEventSource, DataProcessor};
use crate::processors::exit::ExitProcessor;
use crate::processors::exporter::ExporterProcessor;
use crate::processors::logger::LoggerProcessor;

#[tokio::main]
Expand Down Expand Up @@ -53,11 +54,20 @@ async fn main() -> anyhow::Result<()> {
)
.await?;

let exporter_processor = ExporterProcessor::init(
&configuration,
&ws_processor.channel,
&grpc_processor.channel,
exit_processor.exit.clone(),
)
.await?;

let jobs = vec![
exit_processor.job,
ws_processor.job,
grpc_processor.job,
logger_processor.job,
exporter_processor.job,
];
let mut jobs: futures::stream::FuturesUnordered<_> = jobs.into_iter().collect();

Expand Down
100 changes: 100 additions & 0 deletions bin/benchmark-data-update/src/processors/exporter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use csv::Writer;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::task::JoinHandle;
use tracing::warn;

use crate::configuration::Configuration;

use super::data::{AccountUpdateEvent, DataEvent, DataEventSource};

pub struct ExporterProcessor {
pub job: JoinHandle<()>,
}

impl ExporterProcessor {
pub async fn init(
configuration: &Configuration,
data_sender_1: &tokio::sync::broadcast::Sender<DataEvent>,
data_sender_2: &tokio::sync::broadcast::Sender<DataEvent>,
exit: Arc<AtomicBool>,
) -> anyhow::Result<ExporterProcessor> {
let export_csv_path = configuration.export_csv_path.clone();
let mut data_1 = data_sender_1.subscribe();
let mut data_2: tokio::sync::broadcast::Receiver<DataEvent> = data_sender_2.subscribe();

let job = tokio::spawn(async move {
let mut wtr = Writer::from_path(export_csv_path).expect("could not create csv file");
let mut interval = tokio::time::interval(std::time::Duration::from_millis(5 * 1000));

wtr.write_record(&["slot", "time", "source", "account", "snap"])
.expect("failed to write header");

loop {
if exit.load(Ordering::Relaxed) {
warn!("shutting down logger processor...");
break;
}

tokio::select! {
_ = interval.tick() => {
wtr.flush().expect("flushing csv file failed");
},
Ok(msg) = data_1.recv() => Self::handle(msg, &mut wtr),
Ok(msg) = data_2.recv() => Self::handle(msg, &mut wtr),
}
}
});

let result = ExporterProcessor { job };

Ok(result)
}

fn handle_account<T: std::io::Write>(
upd: AccountUpdateEvent,
writer: &mut Writer<T>,
is_snapshot: bool,
) {
let source = match upd.source {
DataEventSource::Websocket => "ws".to_string(),
DataEventSource::Grpc => "grpc".to_string(),
};
let snap = match is_snapshot {
true => "snapshot".to_string(),
false => "single".to_string(),
};
writer
.write_record(&[
upd.slot.to_string(),
upd.received_at.to_string(),
source,
upd.account.to_string(),
snap,
])
.expect("failed to write account update");
}

fn handle<T: std::io::Write>(msg: DataEvent, writer: &mut Writer<T>) {
match msg {
DataEvent::Other => {}
DataEvent::Snapshot(upd) => {
for acc in upd.accounts {
Self::handle_account(
AccountUpdateEvent {
received_at: upd.received_at,
account: acc,
source: upd.source,
slot: upd.slot,
},
writer,
true,
);
}
}
DataEvent::AccountUpdate(upd) => {
Self::handle_account(upd, writer, false);
}
}
}
}
1 change: 1 addition & 0 deletions bin/benchmark-data-update/src/processors/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl LoggerProcessor {
warn!("shutting down logger processor...");
break;
}

tokio::select! {
_ = interval.tick() => {
if !first {
Expand Down
1 change: 1 addition & 0 deletions bin/benchmark-data-update/src/processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod data;
pub mod exit;
pub mod exporter;
pub mod logger;

0 comments on commit 5b1b658

Please sign in to comment.