-
Notifications
You must be signed in to change notification settings - Fork 114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallelization of bulk publisher for high loads #927
Conversation
messageProcessed := false | ||
for !messageProcessed { | ||
for _, pipe := range pipes { | ||
if !pipe.isFull() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you considered using something like this so you don't need to check for space:
select {
case pipe.in <- msg:
messageProcessed = true
default:
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. I will give that a try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The select
worked great! Thanks Jay!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need the default:
line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@afiune You need the default:
line to allow this line pipe.in <- msg
to not wait for room in the channel.
The below code is doing. If the channel is not full send the message and set messageProcessed
to true. If the channel is full exit the select.
case pipe.in <- msg:
messageProcessed = true
default:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clever!!! ⭐️
Signed-off-by: Lance Finfrock <[email protected]>
f1ec0c5
to
a0be4a5
Compare
Signed-off-by: Lance Finfrock <[email protected]>
Signed-off-by: Lance Finfrock <[email protected]>
Signed-off-by: Lance Finfrock <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neato
} | ||
|
||
func sendMessage(pipeInChannels []chan message.ChefRun, msg message.ChefRun) { | ||
for true { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit]
for true { | |
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how we feel about this style, but I think if you wanted you could do:
for !distributeMessage(pipeInChannels, msg) {
// All pipes are full. Wait and try again
time.Sleep(time.Millisecond * 10)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I thought about doing that change, but it does not seem as readable. The "distributeMessage" function does not say when to stop the for-loop.
What about?
for messageProcessed := distributeMessage(pipeInChannels, msg); !messageProcessed; messageProcessed = distributeMessage(pipeInChannels, msg) {
// All pipes are full. Wait and try again
time.Sleep(time.Millisecond * 10)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eh, I'd just leave it, it was pretty clear as is, I was just musing aloud :D
} | ||
|
||
func mergeOutChannels(pipeOutChannels []<-chan message.ChefRun) chan message.ChefRun { | ||
mergedOut := make(chan message.ChefRun, 100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 As a follow up to this PR, I think it might make sense to do some arithmetic on the maximum number of message we might end up with sitting in channel buffers if we get maximally backed up and then tune or remove some of these buffers accordingly.
default: | ||
} | ||
} | ||
return false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to log a warning here? If we hit here, it means we potentially are no longer keeping up with requests right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is right. I will add a warning log.
Signed-off-by: Lance Finfrock <[email protected]>
Signed-off-by: Lance Finfrock <[email protected]>
I did some load testing with this PR and a large Elasticsearch cluster and was able to achieve much higher throughput than before through the frontend. At peak I saw 60k 3MB nodes checking on a c5.4xlarge, with CPU being the ultimate bottleneck. It looks like this moves the bottleneck from waiting on publishing to the Automate servers hardware. I would recommend a bit higher than 6 though, from the testing we did 20 looks to be a good number. |
@@ -35,7 +36,8 @@ func NewChefRunPipeline(client backend.Client, authzClient iam_v2.ProjectsClient | |||
processor.BuildRunProjectTagger(authzClient), | |||
publisher.BuildNodeManagerPublisher(nodeMgrClient), | |||
processor.BuildRunMsgToBulkRequestTransformer(client), | |||
publisher.BuildBulkRunPublisher(client, maxNumberOfBundledRunMsgs), | |||
publisher.BuildMsgDistributor(publisher.BuildBulkRunPublisher( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super nit but, I can read this better if you ident it this way:
publisher.BuildMsgDistributor(
publisher.BuildBulkRunPublisher(client, maxNumberOfBundledRunMsgs),
numberOfParallelBulkPublishers,
maxNumberOfBundledRunMsgs,
),
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That does look better. Thanks.
for msg := range in { | ||
sendMessage(pipeInChannels, msg) | ||
} | ||
close(out) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Im not so sure about it but, should this close func call belongs in a defer statement? 🤔 (I'm just starting to think about what happens when one of these goroutines fail for any reason.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you might be correct. I don't want to make this change right now though, because this has already been thoroughly tested. This is a problem with all the processors in the pipeline and should be tested separately.
Good catch!
inChannels := make([]chan message.ChefRun, numProcessors) | ||
outChannels := make([]<-chan message.ChefRun, numProcessors) | ||
for index := range inChannels { | ||
in := make(chan message.ChefRun, childPipeInboxSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this inbox size is coming all the way from the config file in habitat, should we do some parameter check for any -1
or 0
(negative int or zero)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a good idea. I will want to do that in another PR. We have not fully published these parameters yet (meaning no it the docs). So, only the CS teams should be playing around with them for now.
@@ -0,0 +1,122 @@ | |||
package publisher |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: By convention, this file should be called msg_distributor_internal_test.go
since it is testing the internals of the package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thouuuuugh, thanks for adding tests!!!!! 💯
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking sharp! Loving the improvements you have made Lance. 💟
Signed-off-by: Lance Finfrock <[email protected]>
Signed-off-by: Lance Finfrock <[email protected]>
Signed-off-by: Lance Finfrock <[email protected]>
Signed-off-by: Lance Finfrock <[email protected]>
@@ -29,6 +29,7 @@ func DefaultConfigRequest() *ConfigRequest { | |||
c.V1.Sys.Service.MaxNumberOfBundledRunMsgs = w.Int32(2500) | |||
c.V1.Sys.Service.MaxNumberOfBundledActionMsgs = w.Int32(10000) | |||
c.V1.Sys.Service.NumberOfRunMsgsTransformers = w.Int32(9) | |||
c.V1.Sys.Service.NumberOfRunMsgPublishers = w.Int32(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are currently going to default to two publishers. This should be good for a local Elasticsearch.
return bulkRunPublisherBundler(in, client, maxNumberOfBundledRunMsgs) | ||
name := fmt.Sprintf("pub-%d", count) | ||
count++ | ||
return bulkRunPublisherBundler(in, client, maxNumberOfBundledRunMsgs, name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a different name for each publisher to be able to see in the logs how many publishers are being used.
@@ -0,0 +1,94 @@ | |||
package publisher |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main addition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
📋 TODO
numberOfParallelBulkPublishers
to the Automate config🔩 Description
Created a message distributor to send messages to multiple bulk publishers. The distributor will create a set number of bulk publishers that the messages will be sent to. The messages are sent to the first publisher until it is filled. Then the overflow messages are sent to the second publisher until it full and so on with the other publisher until they are all full. When they are all full the distributor will wait and try to send the messages again. This algorithm can be easily changed by updating the
distributeMessage
function.The reason behind filling one of the publishers before sending messages to the other ones is to increase the size of the bulk messages send. With a round-robin or equal distribution approach, all the bulk publishers will be sending small messages with the overhead of more connections to elasticsearch. For example, we would rather send 100 messages with one elasticsearch connection than 4 connections with 25 messages apiece. Once the load on the system is high enough that the bottleneck is publishing the bulk messages. The second bulk publisher's inbox will fill up and the second bulk publisher will start sending bulk messages in parallel.
Testing is needed The distributor may only be useful with multiple elasticsearch nodes.
Update the
numberOfParallelBulkPublishers
to add more publishers sending messages to Elasticsearch.⛓️ Related Resources
#924
✅ Checklist