-
Notifications
You must be signed in to change notification settings - Fork 489
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change over to Message Passing edge communication #1425
Conversation
@desa At this stage all nodes are using the legacy wrapper on the new MPI edge type. A few tests are still not passing so something is broken, but I wanted to get your feedback before going too much further. |
edge/consumer.go
Outdated
func (ec *Consumer) Run() error { | ||
for msg, ok := ec.edge.Next(); ok; msg, ok = ec.edge.Next() { | ||
switch typ := msg.Type(); typ { | ||
case BeginBatch: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only concern, and I'm not sure that it's even valid, is that switch for every message will end up being expensive.
My concern comes from a note in go-lua
This exercises more of the bytecode interpreter’s inner loop. Here we see the performance impact of Go’s switch implementation. Both go-lua and gopher-lua are an order of magnitude slower than the C Lua interpreter.
I'm not exactly sure what to do about it, but my initial though is to do something like you would with a parser, Where we receive a BeginBatchMessage
and therefore we know that all further messages should be PointMessage
s, until we receive a EndBatchMessage
. This does assume that that there are some semantics to the order in which messages are received.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that idea. I am too concerned that this will be less performant. For the most part I have ignored those questions and just been focusing on the API design first. Then we can come back through and fix slow spots. We will need to develop good benchmarks to test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I played around with this idea, and it currently can only remove two cases from the switch statement and has only one level of nesting. It doesn't seem like much of a gain for the complexity it adds. And I don't foresee many nested messages types emerging.
When we circle back to performance test this we can reconsider.
@nathanielc so far I really like it. Just have the one thought about the switch statement on each message. |
1865678
to
2c26302
Compare
@desa This is ready for a final review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read through most of the things in the edge package and a few of the nodes.
|
||
// ForwardReceiver handles messages as they arrive and can return a message to be forwarded to output edges. | ||
// If a returned messages is nil, no message is forwarded. | ||
type ForwardReceiver interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't all of the nodes use a ForwardReceiver
? Reading through the combine.go, it seems like it would have been possible to implement it using a ForwardReceiver
instead of a Receiver
. Am I missing something?
join.go
Outdated
} | ||
|
||
func (n *JoinNode) Barrier(src int, b edge.BarrierMessage) error { | ||
//TODO: implement barrier |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this need to forward the barrier message?
"github.com/influxdata/kapacitor/expvar" | ||
"github.com/influxdata/kapacitor/models" | ||
"github.com/influxdata/kapacitor/pipeline" | ||
"github.com/influxdata/kapacitor/timer" | ||
"github.com/pkg/errors" | ||
) | ||
|
||
type JoinNode struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why JoinNode
doesn't need to implement Receiver
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah because there are different types of Consumers
:)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Not sure I read over every change, but I think I've got the gist of things.
Change over to Message Passing edge communication
This PR introduces the new API for using MPI semantics for edge communication between nodes of the task DAG.
A LegacyEdge type is introduced that implements the original behavior on top of the new API, this way the existing code remains the same so it can be incrementally switched over to the new API.
All nodes have been reimplemented using the new API. The LegacyEdge type has been removed.