Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: native threads #299

Conversation

mariokostelac
Copy link
Contributor

No description provided.

def dispatch_single_messages(queue)
messages = @fetcher.fetch(queue, 1)
@polling_strategy.messages_found(queue.name, messages.size)
messages.each { |message| process(queue.name, message) }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac not sure if I'm following it, but you can't process all received messages using a single thread (one at time). It would consume the visibility timeout from other messages, while the messages are idle waiting for the processor to process them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, Was not really looking at style. We hardcode number of messages to 1 :).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac that's a good point, maybe if we wait a little more 1 second (like in the other PR) we can have more processors available and fetch more messages, instead of fetching a single message all the time for a single processor, consuming more SQS capacity and also making more requests against AWS, that takes a few milliseconds.

def loop
while !@done
do_work
end
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac wouldn't this cause non-stop fetching?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the point where polling strategy kicks in. But you are right, it will melt :o

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's not difficult to fix. We can always tell it to sleep if there are no active queues.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac actually it isn't just about active queues, it's about available thread to process, we can't fetch and wait.

@phstc
Copy link
Collaborator

phstc commented Dec 23, 2016

@mariokostelac maybe you could target the PR to my branch thread-pool instead of master, it would help to see the differences.

@mariokostelac
Copy link
Contributor Author

mariokostelac commented Dec 23, 2016

Done

@mariokostelac mariokostelac changed the base branch from master to thread-pool December 23, 2016 13:15
# does updating and evaluating @thread has to be wrapped in mutex?
return if !@thread
@thread.raise ::Shoryuken::Shutdown
end
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac that's where ^ I see some advantage of using concurrent-ruby/thread-pool, it abstracts some of this logic shutdown, kill, wait_for_termination etc.

Also, replacing dead processors automatically. Here, we would need to re-spawn in case the thread dies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. It's not the easiest thing to get right. I have to give it another thought, but ThreadPool alone does not seem as the best solution. Futures on ThreadPool could be better, though, have to investigate that a little bit more.
Still, I think it's difficult to get the same throughput if we do not use Actor model or if we do not make Processor self-sufficient.

Copy link
Collaborator

@phstc phstc Dec 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still, I think it's difficult to get the same throughput if we do not use Actor model or if we do not make Processor self-sufficient.

@mariokostelac I don't see why not? We have a thread-pool with the size of the configured concurrency and we only enqueued that size, so there won't be any idle message waiting for a thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not too opinionated whether we should use some library or not, it was just a try to see how it looks like. I like it, but it introduces handling low-level stuff.
I think meeting half-way would look like:

  • processor look the same as here, self-sufficient
  • processors handled by thread-pool, pool.post { processor.loop }
  • having some watchdog that periodically checks that all processors are alive.

That way manager becomes simple, we do not share state - no concurrency problems, and we do not have to handle threads ourselves.

Copy link
Contributor Author

@mariokostelac mariokostelac Dec 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac I don't see why not? We have a thread-pool with the size of the configured concurrency and we only enqueued that size, so there won't be any idle message waiting for a thread.

Actually, we have a problem where processor fails and we do not know that (@ready variable does not increment). It could be the case that we can see how many "available" processors we have by asking "how many threads available do you have?".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac I don't see why not? We have a thread-pool with the size of the configured concurrency and we only enqueued that size, so there won't be any idle message waiting for a thread.

As far as I understand the code, we poll once per second. Regardless the concurrency level, we can have a lot of time being spent waiting for next dispatch.
Let's say every job takes 20ms, concurrency set to 8.
t=0, we fetch 8 jobs, we dispatch them
t=20ms, all processors finished
t=1000ms, we fetch 8 jobs, we dispatch them
t=1020ms, all processors finished.

We actually spend 980 ms waiting for next dispatch.

Copy link
Collaborator

@phstc phstc Dec 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we have a problem where processor fails and we do not know that (@ready variable does not increment). It could be the case that we can see how many "available" processors we have by asking "how many threads available do you have?".

@mariokostelac great call! I totally missed that. Maybe we could use remaining_capacity in place of @ready.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That implies that you ARE improving the system throughput with increasing concurrency settings, but never to the level where it should be.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac

We actually spend 980 ms waiting for next dispatch.

How long it would it take to perform 8 requests to SQS to fetch a message per processor?

But anyway, I mentioned in another comment, we can remove the heartbeat and call dispatch once a processor is done. So we processor whenever a processor is ready.

TimeTask says that the delay is in seconds, but maybe it supports something like this 0.5.

Copy link
Collaborator

@phstc phstc Dec 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac I think for most people, 1 second does not matter much, also it isn't exact a second, a second starts counting before the fetch. It's in background anyways. But I see your point, just saying that it's something we need to trade ~ 1 second delay or increase the number of requests.

@phstc
Copy link
Collaborator

phstc commented Dec 23, 2016 via email

@phstc
Copy link
Collaborator

phstc commented Dec 23, 2016

@mariokostelac we can try a similar test I did in here. We test both thread strategies like this.

  • Make sure Shoryuken isn't running
  • Enqueue 1k messages for PutsReqWorker. This is a worker that makes posts to a PutsReq bucket.
  • Start shoryuken
  • Check the difference between the first and last received at in PutsReq

Run the same test, but for the other strategy. We shouldn't run in parallel, to avoid latencies in PutsReq.

Then we can compare the timing differences.

Why PutsReq? IMO any test like this seems subjective, but making an HTTP call to an external service, feels like a more real use case for a generic worker.

@phstc
Copy link
Collaborator

phstc commented Dec 23, 2016

@mariokostelac maybe a non-sense idea. You can try to the use the thread-pool as the previous PR, but keeping the processors fetching the messages. Like move the dispatch part to the processor, as you did already, but keep the thread pool in the manager that every time it receives a processor_done or/and every second, it pool.post as many times as it's possible given the number of remaining_capacity.

@mariokostelac
Copy link
Contributor Author

@phstc exactly that! Will try to sketch something like that to avoid handling threads manually. Left some comments on throughput/price trade-off on #291 :).

messages = @fetcher.fetch(queue, 1)
@polling_strategy.messages_found(queue.name, messages.size)
process(queue.name, messages.first) if messages.length > 0
end
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac thinking more on processors fetching messages... Maybe we can start using long-pool wait_time_seconds: ..., so a processor does not need to go non-stop to SQS when the queue is empty, it can wait for a few seconds if the queue is empty. Better to wait waiting for messages, than waiting for nothing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac ^ but if we go with that approach, we will need to make sure that the polling_strategy is thread safe, it isn't currently, otherwise we won't be able to guarantee the order/load balancing, as each processor will work individually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not possible with the current set of interfaces (all queues are paused so we get nil), but our interfaces are not written in stone.

It's good that you mentioned long polling - since processors are independent, it should be totally ok to use long-polling all the time. Ish. If we have just one queue, we should use long polling for sure.
If we have multiple queues, we could not want to block for too long on q1 if there is q2 being full. I am wondering if long polling wait time is the actual parameter we want to tweak, instead of the heartbeat interval we've discussed in #291. I guess being able to configure queue_empty_pause for an empty queue and long_polling_wait_time will be sufficient for all kinds of workloads.

Just to recap what would that approach bring us:

  • optimal performance for "one queue" configurations, regardless concurrency level
  • configurable tradeoff for cost vs throughput via long_polling_wait_time and queue_empty_pause parameters. People could optimise the number of calls to SQS or throughput of the system, depending on their needs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariokostelac I think we can deprecated delay in favor of wait_time_seconds, if you don't want hold checking all queues, you can set it to zero, if you want to wait and save some $ (too little as you mentioned), you can bump it to whatever you would like to.

instead of the heartbeat interval we've discussed in #291

That implementation is straightforward and working. But as you raised, it isn't as optimal as it could be, there will be always a possible 25ms delay, which isn't a huge thing for some cases, but I bet it's for others. If we need to wait, it feels like it's better to wait waiting for messages, not just for the heartbeat. But if we go with processors fetching messages, and long-pool (which feels cool indeed), I think we we will need the polling strategies to make it thread-safe, otherwise we won't be able to load balance the queues consumption, so we will move the complexity around, but we may ended with a very optimal setup.

I think Shoryuken doesn't need to address all needs. We can go with a solution that's good and straightforward as possible for most users and for us to maintain. Apps that can't wait for something less than second to process are very edge case.

@phstc phstc force-pushed the thread-pool branch 2 times, most recently from 41c14f8 to fed9d81 Compare February 13, 2017 04:02
@phstc phstc closed this Mar 19, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants