Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outbound data architecture changes #680

Merged
merged 28 commits into from
Jun 12, 2018
Merged

Outbound data architecture changes #680

merged 28 commits into from
Jun 12, 2018

Conversation

derekcollison
Copy link
Member

The original outbound architecture would potentially spend time from a receiving client's Go routine to process inbound messages. This would do ok for small to medium fanout. This changes that such that it can switch from that model with a time bound architecture to a dedicated outbound Go routine. Routes will always default to using the dedicated outbound routine when ingesting messages. Clients may spend time in place to balance ingress vs egress rates and optimize for latency. There are also changes to help when routes connect with large number of subscriptions and to do a better job with slow consumer status. Various tests that were flapping were also fixed.

Resolves #675
Resolves #659
Resolves #550

/cc @nats-io/core

derekcollison and others added 26 commits June 4, 2018 17:45
Signed-off-by: Derek Collison <[email protected]>
Signed-off-by: Derek Collison <[email protected]>
Signed-off-by: Derek Collison <[email protected]>
Use pending bytes as slow consumer trigger, so reintroduce max_pending.
Improve latency with inplace flush calls when appropriate. Utilize simple
time budget for readLoop routine.

Signed-off-by: Derek Collison <[email protected]>
Signed-off-by: Derek Collison <[email protected]>
Signed-off-by: Derek Collison <[email protected]>
Signed-off-by: Derek Collison <[email protected]>
Signed-off-by: Derek Collison <[email protected]>
Signed-off-by: Derek Collison <[email protected]>
Signed-off-by: Derek Collison <[email protected]>
Signed-off-by: Derek Collison <[email protected]>
@coveralls
Copy link

coveralls commented Jun 11, 2018

Coverage Status

Coverage increased (+0.3%) to 92.548% when pulling f7cb616 on fanout into e597043 on master.

// Will return if data was attempted to be written.
// Lock must be held
func (c *client) flushOutbound() bool {
if c.flags.isSet(flushOutbound) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering why use that since this needs to be called under client lock, but then I realized that this function release/reacquire the lock. This is quite dangerous for rest of code calling this function under the client lock. The state after calling flushOutbound() may have changed, which means that code would have to check and not assume state is as it was before the call to flushOutboud.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes correct but was a specific design goal to not hols a lock during IO similar to what read does now, but with outbound it can be called from multiple places hence the flag.

client.mu.Unlock()

// Remember for when we return to the top of the loop.
c.pcd[client] = needFlush
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe outside the client lock? In sendOK() this is set under the client lock, and I see that you added a FIXME there. I think the idea of setting needsFlush there is because we call sendProto() with false, to indicate that this does not require a send in place, but ultimately, we want OK to be sent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your comment does not seem to match the highlighted code. But in the code, client and c are different.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. And any place we possibly touch this pcd map is from the client's readLoop go routine so we are ok.

Copy link
Member

@kozlovic kozlovic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments.. did not fully review everything yet. There are some changes required I believe (locking client -> server is problematic for sure).

c.nc.SetWriteDeadline(time.Now().Add(c.srv.getOpts().WriteDeadline))
deadlineSet = true
// queueOutbound queues data for client/route connections.
// Return pending length.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really ;-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on the return or what it does. It queues data for the connection. flushOutbound is what may or may not send to the socket.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was just the comment about returning pending length, which this function does not do (no return value).

// Snapshot opts
srv := c.srv

// Place primary on nb, assign primary to secondary, nil out nb and secondary.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assign secondary to primary

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read this as assign foo = 3, which when read assign primary to secondary is what happens.

Copy link
Member

@kozlovic kozlovic Jun 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for my english: I read foo = 3 as assign 3 to foo, not foo to 3, hence if you have p = s I would have said assign secondary to primary.

c.mu.Lock()

// Update flush time statistics
c.out.lft = lft
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function could return the flush time instead, since it is read only after calling this function in the readLoop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may use it for more advanced stats on /connz


// Re-acquire client lock
c.mu.Lock()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check if connection was closed, and if so simply return true here? Not sure it makes sense to continue after this point if it has been closed. Also, we may end-up logging a flush error if the write returned an error due to socket close.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We check for err != nil right below here. If connection is properly closed the err should be nil, and the code below just does accounting. But we could check for that condition if you think its critical.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible that connection is closed before we grab the lock but after the nb.WriteTo() so err would be nil. Again, as long as the remaining code is safe if the connection is closed I am fine.

// flushSignal will use server to queue the flush IO operation to a pool of flushers.
// Lock must be held.
func (c *client) flushSignal() {
c.out.sg.Signal()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have an out.inWait = true and out.inWait = false surrounding the call to sg.Wait() and flush only if inWait is true? That would possibly reduce the number of Signal() calls.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far has not been a problem. Also, when Wait returns it will have the lock but it is not guaranteed the next instruction to reset the flag gets run.

server/client.go Outdated
// Check for a big message, and if found place directly on nb
// FIXME(dlc) - do we need signaling of ownership here if we want len(data) <
if len(data) > maxBufSize {
c.out.nb = append(c.out.nb, data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the data is referenced here? What I mean is for instance when delivering a message, we queue the message header and then the payload. But the header is already referencing a client's buffer that is reused for each deliverMsg call. Is that safe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation. The code above is safe since anything over maxBufSize is a heap allocation for the message, hence the conditional. I want to make it more explicit such that I could scale out fan out cpu wise. Ran out of time but still on my list. I could add some more comments about how it is safe now.

}
if !didDeliver && c.srv != nil {
group := c.srv.lookupRemoteQGroup(string(c.pa.sid))
c.reRouteQMsg(r, msgh, msg, group)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should still account for group being not found and in this case simply return.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is handled via a DebugF on route.go:159

s.rqsMu.RLock()
rqsub := s.rqsubs[sid]
s.rqsMu.RUnlock()
return rqsub.group
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, the group could not be found...

Copy link
Member Author

@derekcollison derekcollison Jun 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a struct in the map, so empty struct return so group would be default of nil.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see.

server/client.go Outdated
// so we need to know what to do to avoid unnecessary message drops
// from [auto-]unsubscribe.
if c.typ == CLIENT && c.srv != nil &&
len(sub.queue) > 0 && c.srv.NumRoutes() > 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a big no-no. NumRoutes() grabs the server lock. We can't do that. We have code doing s.mu.Lock() -> c.mu.Lock() so we should not do the opposite otherwise we have the risk of getting a lock inversion.
If you really want to optimize, it would be with an atomic (or immutable depending of config reload capabilities) on if routing is enabled or not. But if it is, I would not check the number of routes, simply store the q group.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I will snapshot the variables and release client lock.

server/client.go Outdated
// pubAllowed checks on publish permissioning.
func (c *client) pubAllowed() bool {
// Disallow publish to _SYS.>, these are reserved for internals.
if c.pa.subject[0] == '_' && len(c.pa.subject) > 4 &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that this is moved and not new code, but someone previously reported that bytes.Equal() would probably be as fast if not faster?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used to be much slower but you are probably right. I could run a micro benchmark or just check the normal benchmarks. Will test quickly and if so will change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will need to be bytes.HasPrefix(). Running benchmarks now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5ns slower with bytes.HasPrefix(), so will leave as is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I was basing my comments on this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but agreed, not worth the change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tweaked a bit and it made a small difference with string cast and subslice with ==.

@kozlovic
Copy link
Member

LGTM

@derekcollison derekcollison merged commit 5598d5c into master Jun 12, 2018
@derekcollison derekcollison deleted the fanout branch June 12, 2018 20:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants