Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev msaff scenario runner #21

Closed
wants to merge 10 commits into from
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ Cargo.lock
*.pdb

.vscode/
.idea/
.idea/

*.png

run.sh
36 changes: 33 additions & 3 deletions apps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,54 @@ edition = "2021"
name = "server"
path = "./src/server.rs"

[[bin]]
name = "run_scenario"
path = "./src/test_client/src/run_scenario.rs"

[[bin]]
name = "producer"
path = "./src/test_client/src/producer.rs"

[[bin]]
name = "consumer"
path = "./src/test_client/src/consumer.rs"

[[bin]]
name = "scenario_validator"
path = "./src/test_client/src/scenario_validator.rs"

[[bin]]
name = "scenario_builder"
path = "./src/test_client/src/scenario_builder.rs"

[dependencies]
opentelemetry = { version = "0.23" }
opentelemetry_sdk = { version = "0.23", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.16", features = ["tonic"] }
opentelemetry-semantic-conventions = { version = "0.15" }
opentelemetry-stdout = { version = "0.2.0", features = ["trace"] }
opentelemetry-stdout = { version = "0.4.0", features = ["trace"] }

tracing = "0.1.4"
tracing-opentelemetry = "0.24"
tracing-subscriber = "0.3"

tracing-appender = "0.2.3"

tonic = "0.11.0"
tonic-reflection = "0.11.0"
tokio = { version = "1.37.0", features = ["tokio-macros", "macros", "rt-multi-thread"] }
tokio-stream = "0.1.15"

uuid = { version = "1.8.0", features = ["v4"]}
rand = "0.8.5"
thiserror = "1.0.61"

serde_json = "1.0.117"
serde = { version = "1.0.203", features = ["derive"] }
serde_derive = "1.0.203"

exchanges = { path = "../exchanges" }
ruuster-grpc = { path = "../ruuster_grpc" }
protos = { path = "../protos" }
utils = { path = "../utils" }
utils = { path = "../utils" }

clap = { version = "4.5.6", features=["derive"]}
145 changes: 77 additions & 68 deletions apps/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,68 +1,77 @@
use std::fs;
use protos::ruuster_server::RuusterServer;
use ruuster_grpc::RuusterQueuesGrpc;
use tonic::transport::Server;
use opentelemetry::{trace::TraceError, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{
runtime, trace as sdktrace, Resource,
};
use opentelemetry_semantic_conventions::resource::SERVICE_NAME;
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::Registry;

const SERVER_IP: &str = "127.0.0.1";
const SERVER_PORT: &str = "50051";

fn init_tracer() -> Result<opentelemetry_sdk::trace::Tracer, TraceError> {
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("http://localhost:4317"),
)
.with_trace_config(
sdktrace::config().with_resource(Resource::new(vec![KeyValue::new(
SERVICE_NAME,
"ruuster-tracer",
)])),
)
.install_batch(runtime::Tokio)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let tracer = init_tracer().expect("Failed to initialize tracer.");
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = Registry::default().with(telemetry);
tracing::subscriber::set_global_default(subscriber)?;

let addr = format!("{}:{}", SERVER_IP, SERVER_PORT).parse().unwrap();
let ruuster_queue_service = RuusterQueuesGrpc::new();
let current_dir = std::env::current_dir()?;
let ruuster_descriptor_path = current_dir
.join("protos")
.join("defs")
.join("ruuster_descriptor.bin");

let ruuster_descriptor_content = fs::read(ruuster_descriptor_path)?;

let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(&ruuster_descriptor_content)
.build()?;
{
let span = tracing::info_span!("app_start");
let _enter = span.enter();
info!("starting server on address: {}", &addr);
}

Server::builder()
.add_service(RuusterServer::new(ruuster_queue_service))
.add_service(reflection_service)
.serve(addr)
.await?;

Ok(())
}
use opentelemetry::{trace::TraceError, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{runtime, trace as sdktrace, Resource};
use opentelemetry_semantic_conventions::resource::SERVICE_NAME;
use protos::ruuster_server::RuusterServer;
use ruuster_grpc::RuusterQueuesGrpc;
use std::fs;
use std::io::Stdout;
use tonic::transport::Server;
use tracing::info;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::{filter, fmt, Layer, Registry};

const SERVER_IP: &str = "127.0.0.1";
const SERVER_PORT: &str = "50051";

fn init_tracer() -> Result<opentelemetry_sdk::trace::Tracer, TraceError> {
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("http://localhost:4317"),
)
.with_trace_config(
sdktrace::config().with_resource(Resource::new(vec![KeyValue::new(
SERVICE_NAME,
"ruuster-tracer",
)])),
)
.install_batch(runtime::Tokio)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let tracer = init_tracer().expect("Failed to initialize tracer.");
let filter_layer = filter::Targets::new().with_targets([
("ruuster_grpc", LevelFilter::INFO),
("queues", LevelFilter::INFO),
("exchanges", LevelFilter::INFO),
]);
let stdout_log = tracing_subscriber::fmt::layer().pretty();
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = Registry::default()
.with(telemetry)
.with(filter_layer)
.with(stdout_log.with_filter(filter::LevelFilter::INFO));
tracing::subscriber::set_global_default(subscriber)?;

let addr = format!("{}:{}", SERVER_IP, SERVER_PORT).parse().unwrap();
let ruuster_queue_service = RuusterQueuesGrpc::new();
let current_dir = std::env::current_dir()?;
let ruuster_descriptor_path = current_dir
.join("protos")
.join("defs")
.join("ruuster_descriptor.bin");

let ruuster_descriptor_content = fs::read(ruuster_descriptor_path)?;

let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(&ruuster_descriptor_content)
.build()?;
{
let span = tracing::info_span!("app_start");
let _enter = span.enter();
info!("starting server on address: {}", &addr);
}

Server::builder()
.add_service(RuusterServer::new(ruuster_queue_service))
.add_service(reflection_service)
.serve(addr)
.await?;

Ok(())
}
45 changes: 45 additions & 0 deletions apps/src/test_client/scenarios/all_consumers.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"server_metadata": {
"name": "small_test",
"server_addr": "http://127.0.0.1:50051"
},
"queues": [
{
"name": "q4"
}
],
"exchanges": [
{
"name": "e0",
"kind": 0,
"bindings": [
{
"queue_name": "q4",
"metadata": null
}
]
}
],
"producers": [
{
"name": "p0",
"destination": "e0",
"messages_produced": 2530,
"message_payload_bytes": 10,
"post_message_delay_ms": 10,
"metadata": null
}
],
"consumers": [
{
"name": "c4",
"source": "q4",
"consuming_method": "stream",
"ack_method": "bulk",
"workload_ms": {
"min": 0,
"max": 0
}
}
]
}
Loading
Loading