Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

change: telegraf reqired tags added at internal code #182

Merged
merged 5 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 174 additions & 16 deletions core/wave-autoscale/src/metric_collector_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use flate2::read::GzDecoder;
use futures_util::StreamExt;
use log::{debug, error, info};
use std::cmp::min;
use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use tar::Archive;
Expand Down Expand Up @@ -383,9 +384,10 @@ impl MetricCollectorManager {
continue;
}

// metadata.outputs remove
// metadata.outputs, metadata.inputs remove
let mut metadata = metric_definition.metadata.clone();
metadata.remove("outputs");
metadata.remove("inputs");

// convert metric_definition.metadata to toml
let Ok(metadata_toml) =
Expand All @@ -395,7 +397,64 @@ impl MetricCollectorManager {
};

let mut outputs_toml = toml::value::Array::new();
let mut inputs_map = HashMap::<String, Vec<serde_json::Map<std::string::String, serde_json::Value>>>::new();
if metric_definition.collector == "telegraf" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicated code

metric_definition.collector == "telegraf"


// metadata.inputs
let Some(inputs) = metric_definition.metadata.get("inputs") else {
error!("[telegraf] inputs not found");
continue;
};
let Some(inputs_object) = inputs.as_object() else {
error!("[telegraf] Failed to convert metadata.inputs to as_object");
continue;
};

for (inputs_key, array_objects) in inputs_object.iter() {
let Some(object) = array_objects.as_array() else {
error!("[telegraf] Failed to convert metadata.inputs to as_array");
continue;
};

let mut inputs_object_arr = Vec::<serde_json::Map<std::string::String, serde_json::Value>>::new();

// insert required tags to inputs
object.iter().for_each(|object| {
let inputs_tags = object.get("tags");
let mut inputs_tags_append = serde_json::Map::new();
if let Some(inputs_tags) = inputs_tags {
let Some(inputs_tags) = inputs_tags.as_object() else {
error!("[telegraf] Failed to convert metadata.inputs.tags to as_object");
return;
};
inputs_tags_append = inputs_tags.clone();
}
inputs_tags_append.insert(
"metric_id".to_string(),
serde_json::json!(metric_definition.id.clone()),
);
let Some(object) = object.as_object() else {
error!("[telegraf] Failed to convert metadata.inputs array object to as_object");
return;
};

let mut object_tags = serde_json::Map::new();
object_tags.insert(
"tags".to_string(),
serde_json::json!(inputs_tags_append),
);
let mut object_append = object.clone();
object_append.append(&mut object_tags);
inputs_object_arr.push(object_append);
});

inputs_map.insert(
inputs_key.to_string(),
inputs_object_arr
);
}

// metadata.outputs
let Some(outputs) = metric_definition.metadata.get("outputs") else {
error!("[telegraf] outputs not found");
continue;
Expand Down Expand Up @@ -427,25 +486,64 @@ impl MetricCollectorManager {
outputs_toml.push(toml::Value::Table(output_metric));
}
}

// [[inputs]]
let Ok(inputs_toml) =
serde_json::from_value::<toml::Value>(serde_json::json!(inputs_map)) else {
error!("[telegraf] Failed to convert inputs to toml");
continue;
};
let mut root_inputs = toml::value::Table::new();
root_inputs.insert("inputs".to_string(), inputs_toml);
let Ok(inputs_toml_str) = toml::to_string(&root_inputs) else {
error!("[telegraf] Failed to convert metadata to toml string");
continue;
};

// [[outputs]]
let mut outputs = toml::value::Table::new();
outputs.insert("http".to_string(), toml::Value::Array(outputs_toml));
let mut root_outputs = toml::value::Table::new();
root_outputs.insert("outputs".to_string(), toml::Value::Table(outputs));
let Ok(root_outputs_toml_str) = toml::to_string(&root_outputs) else {
error!("[telegraf] Failed to convert metadata.output to toml string");
continue;
};

// other metadata
let Ok(metadata_toml_str) = toml::to_string(&metadata_toml) else {
error!("[telegraf] Failed to convert metadata to toml string");
continue;
};
let Ok(root_outputs_toml_str) = toml::to_string(&root_outputs) else {
error!("[telegraf] Failed to convert metadata.output to toml string");
continue;
};

root_toml =
root_toml + "\n" + &metadata_toml_str + "\n" + &root_outputs_toml_str + "\n";
root_toml + "\n" + &metadata_toml_str + "\n" + &inputs_toml_str + "\n" + &root_outputs_toml_str + "\n";
}

// [agent]
let Ok(agent_toml) = serde_json::from_value::<toml::Value>(serde_json::json!({
"agent": {
"interval": "1s",
"round_interval": true,
"metric_batch_size": 1000,
"metric_buffer_limit": 10000,
"collection_jitter": "0s",
"flush_interval": "1s",
"flush_jitter": "0s",
"precision": "0s",
"debug": false
}
})) else {
error!("[telegraf] Failed to convert agent to toml");
return;
};
let Ok(agent_toml_str) = toml::to_string(&agent_toml) else {
error!("[telegraf] Failed to convert agent to toml string");
return;
};

root_toml = root_toml + "\n" + &agent_toml_str + "\n";

debug!("Telegraf config:\n{}", root_toml);

// Write the string to a file
Expand Down Expand Up @@ -977,30 +1075,64 @@ mod tests {
interval: "10s"
namepass: ["process_cpu_seconds_*"]
tags:
metric_id: prometheus_metrics
test: test_tag
outputs:
wave-autoscale:
tagpass:
metric_id: prometheus_metrics
agent:
interval: "1s"
metric_batch_size: 1000
metric_buffer_limit: 10000
flush_interval: "1s"
secretstores:
http:
- id: "secretstore"
url: "http://localhost/secrets"
"#;

let metric_definition = serde_yaml::from_str::<MetricDefinition>(yaml).unwrap();

// metadata.outputs remove
// metadata.outputs, metadata.inputs remove
let mut metadata = metric_definition.metadata.clone();
metadata.remove("outputs");
metadata.remove("inputs");

// convert metric_definition.metadata to toml
let metadata_toml =
serde_json::from_value::<toml::Value>(serde_json::json!(metadata)).unwrap();

let mut outputs_toml = toml::value::Array::new();
let mut inputs_map = HashMap::<String, Vec<serde_json::Map<std::string::String, serde_json::Value>>>::new();
if metric_definition.collector == "telegraf" {

// metadata.inputs
let inputs_object = metric_definition.metadata.get("inputs").unwrap().as_object().unwrap();
for (inputs_key, array_objects) in inputs_object.iter() {
let object = array_objects.as_array().unwrap();
let mut inputs_object_arr = Vec::<serde_json::Map<std::string::String, serde_json::Value>>::new();

// insert required tags to inputs
object.iter().for_each(|object| {
let inputs_tags = object.get("tags");
let mut inputs_tags_append = serde_json::Map::new();
if let Some(inputs_tags) = inputs_tags {
inputs_tags_append = inputs_tags.as_object().unwrap().clone();
}
inputs_tags_append.insert(
"metric_id".to_string(),
serde_json::json!(metric_definition.id.clone()),
);
let mut object_tags = serde_json::Map::new();
object_tags.insert(
"tags".to_string(),
serde_json::json!(inputs_tags_append),
);
let mut object_append = object.as_object().unwrap().clone();
object_append.append(&mut object_tags);
inputs_object_arr.push(object_append);
});
inputs_map.insert(
inputs_key.to_string(),
inputs_object_arr
);
}

let outputs = metric_definition.metadata.get("outputs").unwrap();

// find output waveautoscale
Expand Down Expand Up @@ -1042,11 +1174,37 @@ mod tests {
let mut root_outputs = toml::value::Table::new();
root_outputs.insert("outputs".to_string(), toml::Value::Table(outputs));

// [[inputs]]
let inputs_toml = serde_json::from_value::<toml::Value>(serde_json::json!(inputs_map)).unwrap();
let mut root_inputs = toml::value::Table::new();
root_inputs.insert("inputs".to_string(), inputs_toml);
let inputs_toml_str = toml::to_string(&root_inputs).unwrap();

// [agent]
let agent_toml = serde_json::from_value::<toml::Value>(serde_json::json!({
"agent": {
"flush_interval": "1s",
}
})).unwrap();
let agent_toml_str = toml::to_string(&agent_toml).unwrap();

// other metadata
let metadata_toml_str = toml::to_string(&metadata_toml).unwrap();

debug!("metadata_toml:\n{}", metadata_toml_str);
assert!(metadata_toml_str.contains("flush_interval = \"1s\""));
assert!(metadata_toml_str.contains("[[inputs.prometheus]]"));
assert!(metadata_toml_str.contains("namepass = [\"process_cpu_seconds_*\"]"));
assert!(metadata_toml_str.contains("[[secretstores.http]]"));
assert!(metadata_toml_str.contains("id = \"secretstore\""));

debug!("agent_toml:\n{}", agent_toml_str);
assert!(agent_toml_str.contains("flush_interval = \"1s\""));

debug!("inputs_toml:\n{}", inputs_toml_str);
assert!(inputs_toml_str.contains("[[inputs.prometheus]]"));
assert!(inputs_toml_str.contains("namepass = [\"process_cpu_seconds_*\"]"));
assert!(inputs_toml_str.contains("[inputs.prometheus.tags]"));
assert!(inputs_toml_str.contains("metric_id = \"prometheus_metrics\""));
assert!(inputs_toml_str.contains("test = \"test_tag\""));

let outputs_http_str = toml::to_string(&root_outputs).unwrap();
debug!("output_toml:\n{}", outputs_http_str);
assert!(outputs_http_str.contains("[[outputs.http]]"));
Expand Down
14 changes: 0 additions & 14 deletions core/wave-autoscale/tests/yaml/plan_k8s_istio_rds_sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ metadata:
dimensions:
- name: DBInstanceIdentifier
value: "wave-eks-istio-postgresql"
tags:
metric_id: cloudwatch_rds_metrics # add tags
- region: ap-northeast-1
profile: "default"
period: "1m"
Expand All @@ -30,20 +28,8 @@ metadata:
dimensions:
- name: DBInstanceIdentifier
value: "wave-eks-istio-postgresql"
tags:
metric_id: cloudwatch_rds_metrics # add tags
outputs:
wave-autoscale: {}
agent:
interval: "1s"
round_interval: true
metric_batch_size: 1000
metric_buffer_limit: 10000
collection_jitter: "0s"
flush_interval: "1s"
flush_jitter: "0s"
precision: "0s"
debug: false
---
kind: ScalingComponent
id: k8s_patch
Expand Down