-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ingest stage to provide synthetic workload for benchmarks #395
Conversation
Codecov Report
@@ Coverage Diff @@
## main #395 +/- ##
==========================================
+ Coverage 64.09% 64.39% +0.29%
==========================================
Files 92 94 +2
Lines 6501 6560 +59
==========================================
+ Hits 4167 4224 +57
- Misses 2094 2096 +2
Partials 240 240
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 1 file with indirect coverage changes Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
5b976fd
to
9fb9930
Compare
metricsProcessed = operational.DefineMetric( | ||
"ingest_synthetic_flows_processed", | ||
"Number of metrics processed", | ||
operational.TypeCounter, | ||
"stage", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the variable name and the help string should be updated to reflect that it processes flow logs rather than metrics
// IngestSynthetic Ingest generates flow logs according to provided parameters | ||
func (ingestS *IngestSynthetic) Ingest(out chan<- config.GenericMap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// IngestSynthetic Ingest generates flow logs according to provided parameters | |
func (ingestS *IngestSynthetic) Ingest(out chan<- config.GenericMap) { | |
// Ingest generates flow logs according to provided parameters | |
func (ingestS *IngestSynthetic) Ingest(out chan<- config.GenericMap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
next := 0 | ||
|
||
// compute time interval between batches | ||
ticker := time.NewTicker(time.Duration(int(time.Minute*time.Duration(ingestS.params.BatchMaxLen)) / ingestS.params.FlowLogsPerMin)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The computation of the interval between batches looks complex to me.
I don't think I understand it.
What does it mean to create a Duration
from BatchMaxLen
and multiply it by time.Minute
?
time.Duration(ingestS.params.BatchMaxLen)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a comment
|
||
// Start collecting flows from the ingester and ensure we have the specified number of distinct connections | ||
connectionMap := make(map[connection]int) | ||
for i := 0; i < (3 * connections); i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the hardcoded 3 the batchMaxLen? or just an arbitrary number to make sure we have multiple flow logs per connection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just to have (many) more flow logs than connections, and to verify that we accumulate the proper number of connections with multiple flow logs per connection.
jsonIngestSynthetic := api.IngestSynthetic{} | ||
if params.Ingest != nil || params.Ingest.Synthetic != nil { | ||
jsonIngestSynthetic = *params.Ingest.Synthetic | ||
} | ||
if jsonIngestSynthetic.Connections == 0 { | ||
jsonIngestSynthetic.Connections = defaultConnections | ||
} | ||
if jsonIngestSynthetic.FlowLogsPerMin == 0 { | ||
jsonIngestSynthetic.FlowLogsPerMin = defaultFlowLogsPerMin | ||
} | ||
if jsonIngestSynthetic.BatchMaxLen == 0 { | ||
jsonIngestSynthetic.BatchMaxLen = defaultBatchLen | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't use "json" as part of the variable name of jsonIngestSynthetic
because it's not limited to json only. Maybe "conf" could replace it?
4d59cae
to
b00857b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing my feedback and adding the comments
type IngestSynthetic struct { | ||
params api.IngestSynthetic | ||
exitChan <-chan struct{} | ||
metricsProcessed prometheus.Counter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename metricsProcessed
here as well
for flowsLeft > 0 { | ||
remainder := nLogs - next | ||
if subBatchLen > remainder { | ||
subBatchLen = remainder | ||
} | ||
log.Debugf("flowsLeft = %d, remainder = %d, subBatchLen = %d", flowsLeft, remainder, subBatchLen) | ||
subBatch := flowLogs[next : next+subBatchLen] | ||
ingestS.sendBatch(subBatch, out) | ||
ingestS.metricsProcessed.Add(float64(subBatchLen)) | ||
flowsLeft -= subBatchLen | ||
next += subBatchLen | ||
if subBatchLen == remainder { | ||
next = 0 | ||
subBatchLen = flowsLeft | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I explored a different approach to achieve the goal of this loop:
https://github.com/ronensc/flowlogs-pipeline/blob/4266dc2e507e89c007a521d6eaeb7bc34de64abd/pkg/pipeline/ingest/ingest_synthetic.go#L73-L81
Please let me know if it is indeed equivalent and if you think it is clearer.
Since we loop over the sub-batch anyway in sendBatch()
I incorporated it into the loop and removed sendBatch()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
your suggestion is much simpler and cleaner. Done.
[APPROVALNOTIFIER] This PR is APPROVED Approval requirements bypassed by manually added approval. This pull-request has been approved by: The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
This PR introduces ingest_synthetic stage to provide a steady stream of flow logs for a configured number of simulated connections. This may be used to control the input flows to be able to perform benchmarking and memory usages of stages.
To configure the stage, add in the config file the following fields: