Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advanced listening with cluster support #211

Closed
ronag opened this issue Jun 19, 2016 · 28 comments
Closed

Advanced listening with cluster support #211

ronag opened this issue Jun 19, 2016 · 28 comments
Assignees
Milestone

Comments

@ronag
Copy link

ronag commented Jun 19, 2016

This a future feature request.

We are interested in being able to run providers in a high availability setup with possibly some load balancing.

Currently anyone that registers a listener for a patter will become an "owner" for a record. Which basically means that we should not have overlapping providers and up with a single point of failure.

We would like a setup where only one provider will be registered as an owner using e.g. some kind of hash based schema. The provider would then be health checked on a regular basis and if non responding the records are re-balanced against the available hosts.

Basically we would like to be able to use deepstream providers in the same way we would setup a HTTP server cluster with load-balancing and health checking through e.g. HaProxy or Nginx. So if one of our servers go down we are able to automatically and quickly fail-over to another server.

@yasserf
Copy link
Contributor

yasserf commented Jun 30, 2016

This is something we discussed internally and we'll update once it's on our near roadmap!

@yasserf yasserf added the epic label Jun 30, 2016
@yasserf yasserf added this to the 1.1 milestone Jul 28, 2016
@yasserf yasserf changed the title record.listen HA Advanced listening with cluster support Jul 28, 2016
@yasserf
Copy link
Contributor

yasserf commented Jul 28, 2016

We'll be looking to add this to 1.1

To boil initial requirements:

  • heartbearts to ensure providers are active ( not spinning with an open connection )
  • single provider for matched names ( not the pattern itself ), only a single one in entire cluster
  • share listen state correctly across entire cluster
  • load balancing to not give everything to one provider ( currently done as it is selected randomly )

Issue:

Sometimes we may have providers that populate certain parts of a record. If you have a pricing feed you might have the bid/ask price from one provider, and meta data in another. You could successfully argue that those should be two records, however is that really a design choice we want to make perm.

Very first suggested api is exactly the same one we have now.

listen( pattern, callback( name, subscribed ) {
} )

except it has to work with subscribed.

@ronag
Copy link
Author

ronag commented Jul 28, 2016

Sometimes we may have providers that populate certain parts of a record. If you have a pricing feed you might have the bid/ask price from one provider, and meta data in another. You could successfully argue that those should be two records, however is that really a design choice we want to make perm.

You could have match + group name.

@yasserf
Copy link
Contributor

yasserf commented Jul 28, 2016

Can you elaborate a bit on what your definition of group name is?

@ronag
Copy link
Author

ronag commented Jul 28, 2016

NATS (which we currently use) use something they call "queue groups" (http://www.nats.io/documentation/concepts/nats-queueing/). I think you could do something similar here.

In nats there is the "global" group, e.g.

nats.subscribe('foo',  function() {
  received += 1;
});

But you can also subscribe to a named group, e.g.

nats.subscribe('foo', {'queue':'job.workers'}, function() {
  received += 1;
});

I think something similar could be applied here as well.

@ronag
Copy link
Author

ronag commented Jul 28, 2016

Also I would like to add my last comment from deepstreamIO/deepstream.io-client-js#170.

I think isSubscribed should be replaced with numSubscribers. That way one can get around the issue by detecting numSubscribers === 1 in case the listener has its own reference-

i.e.:

listen( pattern, callback( name, numSubscriptions ) {
} )

@ronag
Copy link
Author

ronag commented Jul 28, 2016

Another thing that we would find useful is if we could reject a listen, similar to how the rpc work, so that we can have a bit more control over the load balancing.

@yasserf
Copy link
Contributor

yasserf commented Jul 29, 2016

Will take points into review during design meeting later today. Be good to get any other of your api requests in in the next 5 hours if possible.

@ronag
Copy link
Author

ronag commented Jul 29, 2016

I think we've covered it:

  • heartbearts to ensure providers are active ( not spinning with an open connection )
  • single provider for matched names ( not the pattern itself ), only a single one in entire cluster
  • share listen state correctly across entire cluster
  • handle the case where the listener has it's own subscription listen example is incorrect deepstream.io-client-js#170
  • some form of control over load balancing
  • graceful shutdown

In terms of load balancing our specific wish list (in order of importance) would be for a listener to be able to say:

  1. No more resources for additional records, try again later, i.e. 503
  2. Unsupported record data, i.e. 501, (this is important to be able to support different versions of software with breaking changes in the record data model).
  3. Different scheduling algorithms,
    • round-robin
    • least connection
    • hash of source ip
    • hash of record name (probably simplest to implement)
    • random

@yasserf
Copy link
Contributor

yasserf commented Jul 29, 2016

Outcome of planning:

  • We will implement heartbeats as a general concept and not just for listening.
  • Single provide ( check )
  • Share state ( check )
  • Handle self state as provider ( check, considering we only have one publisher per record )
  • Control over load balancing ( check, same way as rpc )
  • Grace shutdown, not related to this story

We will implement the same logic for load balancing as RPC, if we decide to move towards something else it will be part of another story. We need to cut scope to deliver this and other things in a sprint, and our solution should cover all the important aspects.

Consumer:

record.hasProvider // bool
record.on( 'provideStart' )
record.on( 'provideStop' )

Provider:

ds.record.listen( 'car/*', ( name, isSubscribed, response ){
    // optional reason, will only be logged
    response.reject( reason );

        // accept
    response.accept(accepted=>{})
})

Deepstream State Registry used for clustering listen state api:

this.subscribeStateRegistry = new StateRegistry( 'record-subscribe-state', options )
this.subscribeStateRegistry.add( name );
this.subscribeStateRegistry.remove( name );
this.subscribeStateRegistry.on( 'added', name );
this.subscribeStateRegistry.on( 'removed', name );

Message bug events:

// subscribeAdd recordName
// subscribeRemove recordName
// publishAdd recordName
// publishRemove recordName
// requestSubscribeState
// subscribeState: 
{
    subscribed: [<recordNames>]
    published: [<recordNames>]
}

@WolframHempel
Copy link
Member

I'd suggest we also add checksums to all update events. Whenever StateRegistry.add( name ) is called we calculate a simple checksum (the sum of name.charCodeAt()) and store it alongside the name. Whenever add or remove messages are sent, we send both the name and the combined checksum of all names. If the checksum of all local names equals the checksum of the remote name, our state is in sync, otherwise we'll send a reconciliation request

@WolframHempel
Copy link
Member

Likewise, we'll need to introduce a global server-shutdown message that's send over the message connector to notify nodes that they need to remove an entry from a registry

Happy to have a stab at an initial implementation in feature/distributed-state-registry

@WolframHempel
Copy link
Member

PR for distributed state registry here #312

@yasserf yasserf added ready and removed roadmap labels Aug 1, 2016
@ronag
Copy link
Author

ronag commented Aug 3, 2016

@yasserf: What happens if all listeners reject? Is there a retry after a certain duration?

@yasserf
Copy link
Contributor

yasserf commented Aug 3, 2016

Yes, you'll have an optional rediscovery timer that will go through all of the subscribers that don't have listeners and figure out whether any of the publishers changed their minds.

Obviously tradeoff for timeout is how realtime you get from "realtime" providers that change their decisions and load on deepstream/providers.

@ronag
Copy link
Author

ronag commented Aug 3, 2016

Could a provider somehow notify that it has changed its mind?

@yasserf
Copy link
Contributor

yasserf commented Aug 3, 2016

In what sense?

@ronag
Copy link
Author

ronag commented Aug 3, 2016

@yasserf: Take the following scenario:

listener 1

  1. Accept record1
  2. Accept record2
  3. Reject record3 (only allowed 2 records)
  4. Unsubscribed record2
  5. Notify server that new records could be accepted (clear rediscovery timer)
  6. Accept record2

Basically short-circuiting the rediscovery timer.

@yasserf
Copy link
Contributor

yasserf commented Aug 3, 2016

Interesting. We discussed this today. If you unlisten/listen that would work but will screw up your other states which is a overkill.

We also need a way to stop let the listener tell the server to stop listening to a specific record, if the backend system it was using went down and it depends on the other one for example.

@wolfram @timaschew We'll need to see if we can do this somehow.. maybe like:

client.record.listen( pattern ) // assuming pattern already exists, else throws error expecting callback

and

client.record.unpublish( pattern, recordName ) // discards specific listen

thoughts? Could also do something specific. Feels better than holding onto the response state.

@ronag
Copy link
Author

ronag commented Aug 26, 2016

@yasserf: What did you end up with in regards to the "notify change its mind"?

@ronag
Copy link
Author

ronag commented Aug 26, 2016

I assume "Handle self state as provider" is related to #170. How is this resolved? Does it just work out of the box i.e, the "race horse" example works as intended without changes?

@ronag
Copy link
Author

ronag commented Aug 26, 2016

How does response behave when isSubscribed=false? And what is the argument to the response.accept callback?

@ronag
Copy link
Author

ronag commented Aug 26, 2016

What happens with listening when a record is deleted?

yasserf added a commit that referenced this issue Aug 26, 2016
* Adding most listen cluster support

* Moving distributed-state-registry to cluster package

* Adding unit tests for cluster functionality

* Adding more cluster based listen tests

* Removing test listener leak

* comments and minor tweaks

* Code review

* Changing default timeout values

* Reverting package updates

* Code review suggestions
@yasserf
Copy link
Contributor

yasserf commented Aug 26, 2016

What did you end up with in regards to the "notify change its mind"?

Currently the listener would need to unlisten and listen. Given the amount of scenarios in the happy case scenarios we haven't fit in the ability to notify that it has changed its mind, but the code was structured in a way that it can be very easily fit in. The concept of stopping publishing a single record is extremely rare since we support unlistens for all subscriptions with a pattern. I can see some useful usecases and will add it as a feature improvement in the near future ( need to tackle non listening tasks for bit! )

I assume "Handle self state as provider" is related to #170. How is this resolved? Does it just work out of the box i.e, the "race horse" example works as intended without changes?

Yup, it works in that regard. Their is a catch though ( as always ) which is a publisher won't be notified if it is publishing data to itself. This is a pretty bad anti-pattern though, since if the provider needs to get data from itself it should ideally be able to hook into that code directly without depending on deepstream to tell it to publish...

How does response behave when isSubscribed=false?

Their is no response when isSubscribed is false, since there is no state to follow after that other than cleaning up.

And what is the argument to the response.accept callback?

None, it accepting is the only required data.

What happens with listening when a record is deleted?

Counted as an implicit discard, meaning provider gets notified false

All good questions!

@ronag
Copy link
Author

ronag commented Aug 27, 2016

Correct me if I'm wrong but with the current implementation accept must be called synchronously in the listen callback?

I've got a scenario where I would like to do the following:

ds.record.listen('^file/.+', async (match, isSubscribed, response) => {
  const path = await ds::record.observe(match).pluck('path').toPromise()
  if (!await fs.exists(path)) {
    response.reject()
  } else {
    response.accept()
  }
})

@ronag
Copy link
Author

ronag commented Aug 29, 2016

Question: What happens if all providers reject a record? Is there an infinite timeout + retry?

@ronag
Copy link
Author

ronag commented Aug 29, 2016

Question: I noticed there was some form of memory usage stats in the distributed state. Is there some form of default load balancing that is more advanced than round robin?

@yasserf yasserf closed this as completed Sep 8, 2016
@yasserf yasserf removed the in review label Sep 8, 2016
@ronag
Copy link
Author

ronag commented Oct 26, 2016

@yasserf: Bump previous question.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants