Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Refactor of the plugin for better performance and scaling #84

Closed
wants to merge 13 commits into from

Conversation

ph
Copy link
Contributor

@ph ph commented Jul 26, 2016

State

This PR is not ready for a full review, but I would like feedback with the architecture I took.

Motivation:

The code base of the S3 was a bit monolitic and really hard to test this change make sure to create smaller class with less responsibilities that are easier to test and reason with.

Also this PR make an really important change and introduce multithreaded workers, so this plugin can effectively read file and download them at the same time.

Theses commits and changes make the way to support S3 notification, most of the code will be reusable to create a new inputs that will be stateless. I believe we only need to replace the Poller class with a SQSListener and the remaining of the code shouldn't change.

Remaining

  • Rebase with latest master
  • More integration tests.
  • Code should be compatible with 5.0 and 2.X
  • test suite broken on travis

@ph ph self-assigned this Jul 26, 2016
@brandond
Copy link

brandond commented Sep 9, 2016

Are you still working on this? I have a bucket with about a year of CloudTrail data from a dozen or so accounts that I'd like to pull in, and the current S3 input plugin is nowhere near capable of handling it. It took a half hour just to pull in the last week of data from one region in one little-used account.

Ideally, the refactored code would:

  • Use Aws::S3::client#list_objects instead of Aws::S3::Bucket#objects
  • Support pagination (using max_keys / is_truncated / continuation_token) for batching
  • Store the marker between polls, rather than re-scanning the entire bucket and comparing individual object timestamps

I'm glad to contribute to some of this, but unfortunately this plugin appears to be dead in the water, with multiple stale open issues and PRs with no response.

@ph
Copy link
Contributor Author

ph commented Sep 9, 2016

@brandond The plugins isn't dead, refactoring and review takes times.

I think the core in this PR is flexible enough to add your recommendations concerning the polling, actually I wanted to make the core easy to reuse to support S3 Notification, so we can scale the processing horizontally with mutiples instances.

@brandond
Copy link

brandond commented Sep 9, 2016

Thanks @ph. I was a little scared off by the period of inactivity and open items, although I do see that some of the open PRs have been merged but not closed.

I have some additional thoughts for things that I'd like to see for my use case - configurable parallelism and batch sizes, depth-first searching through subkeys (a la directory hierarchy), etc. Are you open to PRs, and if so, should I work on master, or your threading fork? Is the threading fork even usable in its current state?

@ph
Copy link
Contributor Author

ph commented Sep 9, 2016

@brandond It was usable, let me a few days to clean it up, I will try to get get that merged in the upcoming weeks. I will keep you in the loop and will really like to collaborate to make it better. I've been doing the same work on the S3 output, so theses plugins should be much better in the near future.

Concerning the parallelism, I am not sure if we should put a lot of energy in that plugin, It's OK for older buckets that we want to consume but in a real world usage I think people should use the S3 Event Notificiation via sqs, this model is easier to deal than having to read from the bucket since AWS will just push new files as SQS message. As the same time the parellelism will come from multiple instances consuming the same queue? I took great care in this refactor to support that feature, actually its only a matter of adding new fields for the SQS queue and replacing the Poller class with a Push class.

@ph
Copy link
Contributor Author

ph commented Sep 9, 2016

to keep you in the loop @geekpete

@brandond
Copy link

brandond commented Sep 9, 2016

I opened #86 to describe some of my thoughts around optimized parallel polling for CloudTrail. You're probably right that SQS notification is more efficient than polling, at least once things are up and running.

I still don't have a good way to get my existing 2.5 million (and growing) files ingested though - the current approach of enumerating the entire bucket every poll is a non-starter for a lot of use cases.

@geekpete
Copy link

geekpete commented Sep 9, 2016

How about a lambda to queue up the existence of new files landing?

@brandond
Copy link

brandond commented Sep 9, 2016

Not sure why you'd need a Lambda to queue it up when you can just point the notifications directly at SQS. Alternately you could point S3 at SNS and subscribe multiple endpoints to the topic (Lambda, SQS, etc). That's the best way to do fanout, since each S3 event can only notify a single endpoint.

@geekpete
Copy link

geekpete commented Sep 9, 2016

Ah forgot about that, makes it easy that way.

So @Jarpy had an idea of loading the files into ES directly from the cloud.
A service that ingests cloudtrail files into an ES endpoint.
A lambda that sends newly detected files at the service.
This allows reprocessing manually against the service if needed, eg reindex.

@brandond
Copy link

brandond commented Sep 9, 2016

Yeah you could definitely cut out the middle man and have Lambda get the S3 notification, pull down the file, unpack events, and stuff them into ElasticSearch. We've been doing that (with some filtering and data augmentation) and storing stuff into Dynamo. This is specifically what we're looking into replacing with LogStash, if we could get the S3 ingest streamlined ;)

If you wanted to go halfway with it, you could probably use Lambda to grab the S3 notification, download the archives, unpack the event JSON, and then toss them in as SQS message payloads. Get LogStash to pull off the queue, do filtering, and output to ElasticSearch. Wouldn't even require any new code on the LogStash side.

@joshuaspence
Copy link

+1 on this pull request. If S3 event notifications are supported then I will be able to use this plugin instead of logstash-input-s3sqs.

@brandond
Copy link

@joshuaspence you might check out this fork that I'm using to pull in CloudTrail logs:
https://github.com/brandond/logstash-input-s3/tree/simple-sincedb

@kureus
Copy link

kureus commented Jul 25, 2017

Hey @ph. What's the state of this PR? Is the original intent of it still valid and relevant? It looks like @brandond make progress towards getting it merged in #87 but it never made it in to 5.0 beta. Is your existing todo list on this issue a good place to start in terms of contributing? Any context on Elastics current view/plan for this plugin would be great. Thanks.

@brandond
Copy link

@kureus @ph I ended up doing basically a complete rewrite based on the threading fork, which I've been using in production for about 9 months. The branch name is probably not super accurate at this point, but you can find it here:
https://github.com/brandond/logstash-input-s3/tree/simple-sincedb

It would probably take a lot of work to reconcile its configuration options with the current Elastic-provided module, but I'd be willing to put some time into it if there's interest.

@suyograo
Copy link
Contributor

@brandond thanks for all your work on this. we would support your efforts if you can spend time reconciling this PR with that what you have.

We can probably open a new PR with your changes?

@robgil
Copy link

robgil commented Dec 14, 2017

Might be cool to make a queue optional here and merge with this. Threading support + queue would be awesome.

https://github.com/logstash-plugins/logstash-input-s3sqs/tree/feature/logstash-6.x

@jordansissel
Copy link
Contributor

I'm going to take this PR over from @ph <3

@jordansissel jordansissel force-pushed the feature/refactor-to-add-threading-support branch from d9f368a to 85238d2 Compare December 20, 2017 17:08
@brandond
Copy link

My (wildly divergent at this point) fork has been working well for the last year or so. The design goals may be a bit different though.

@jordansissel
Copy link
Contributor

@brandond Good info! I'll take a look at your fork (https://github.com/brandond/logstash-input-s3/tree/simple-sincedb, right?) and see what I can combine here.

@brandond
Copy link

@jordansissel Yep, that's the latest.

The design goal for my fork was to 'tail' multiple prefixes. We have a bunch of stuff streaming into various buckets from multiple accounts. Within each bucket, files are created with an ///// structure. Under each / prefix, files are guaranteed to be created with ascending names - so you can 'tail' that prefix by remembering what object you last processed out of that prefix, and asking the S3 API to list everything after it. The plugin discovers top-level prefixes down to a configurable depth at startup, and then stores both the prefix and the name of the last processed object in the SinceDB.

There is some basic multi-threading support. I'd intended to have it process prefixes in parallel, but right now it just works through them one at a time, sequentially assigning objects be processed by a configurable number of worker threads.

The handling of files that fail to process also needs work. Objects are failed after a fixed number of retries, and the SinceDB does not handle this well - it can cause it to grow out of control. I've only seen this happen once, but it did end up corrupting the DB file due to sheer size.

end

def load_database
return !::File.exists?(@file)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this maybe read:

return if !::File.exists?(@file)
       ^^

def download_to
# Lazy create FD
@download_to ||= begin
FileUtils.mkdir_p(download_to_path)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this doesn't work if the key is a full path instead of a single file, so maybe this should read more like...

full_local_path = ::File.join(download_to_path, key)
FileUtils.mkdir_p(::File.dirname(full_local_path))
::File.open(full_local_path, FILE_MODE)

@ph
Copy link
Contributor Author

ph commented Nov 1, 2023

I will close this PR, I have currently no time to push it forward. Feel free to make it your own.

@ph ph closed this Nov 1, 2023
@eherot
Copy link

eherot commented Nov 1, 2023

Doing that as we speak. Thanks for the A+ framework though!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants