Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.

METRON-590 Enable Use of Event Time in Profiler #965

Closed
wants to merge 6 commits into from

Conversation

nickwallen
Copy link
Contributor

@nickwallen nickwallen commented Mar 15, 2018

This enables the use of event time processing in the Profiler.

By default, the Profiler will still use processing time. If you configure the profiler with a timestampField then it will extract the timestamps from that field contained within the incoming telemetry.

Review the updates that have made to the Profiler README for more details on how this works and how it should be configured.

Changes

  • The core Profiler classes in metron-profiler-common were refactored slightly so that time is always injected from the outside. When run in Storm, the Storm bolts are responsible for keeping track of time. When run in the REPL, the StandAloneProfiler is responsible for keeping track of time.

  • A Clock abstraction was created (er enhanced?) to handle the differences between processing time and event time. If operating in processing time, the WallClock implementation will be used. If operating in event time, the EventTimeClock will be used. Given a message, a Clock will always tell us what time it is.

  • A ClockFactory will give us the right Clock based on what the Profiler configuration looks-like. This factory is used in both the Storm and REPL interfaces.

  • The ProfileSplitterBolt takes a message, uses a Clock to get a timestamp, then sends the message and the correct timestamp to the downstream ProfileBuilderBolts. The ProfileBuilderBolt trusts the timestamp that it was given and continues on its merry way.

  • The ProfilerBuilderBolt uses a FlushSignal to know when to flush. There is some important subtlety contained in this logic. If you accidentally inject a message with a recent timestamp when you are running with older, archived data, it will prevent the system from flushing when you expect it to. This is because the new message advances time to the most recent timestamp. I might of run into this when testing. :) I added an important check and log statement to help make this very noticeable.

  • There was one difference between processing and event time that had to be accounted for. When in processing time, if a profile stops receiving messages that profile still needs to flush at the end of the period. When in event time this is not the case. In event time, if you stop receiving messages, time effectively does not advance. This was accounted for by creating the concept of "expired" profiles in the MessageDistributor. The ProfileBuilderBolts then use a tick tuple to periodically flush expired profiles. See the javadoc for more explanation.

  • Additional properties were added to adjust the event time processing logic that we leverage in Storm. This includes specifying a window length and a window lag. There will be 1 or more windows in each profile period. A smaller window lets the profiler process a smaller chunk of messages at a time. The window lag allows you to adjust the Profiler depending on how out-of-order your incoming telemetry is.

  • The Mpack was updated to support the additional properties.

  • I added a lot of useful logging to help troubleshoot and debug issues when running in Storm. If you go into the Storm UI and turn on DEBUG level logging for org.apache.metron.profiler, you will get some useful information in the worker logs.

Manual Testing

This change can be tested manually when the Profiler is running atop Storm or when run in the REPL.

Testing in the REPL

  1. Create a simple profile and define a "timestampField" in the Profiler configuration. This will tell the Profiler to operate using event time.

    [Stellar]>>> conf := SHELL_EDIT(conf)
    {
    	"profiles" : [
    		{
    		"profile": "counter",
    		"foreach": "'counter'",
    		"init": { "counter": 0 },
    		"update": { "counter" : "counter + 1" },
    		"result": "counter"
    		}
    	],
    	"timestampField" : "timestamp"
    }
    
  2. Create a message that has a timestamp. In this example, the timestamp is really old, like 1970 old.

    [Stellar]>>> msg := SHELL_EDIT()
    {
    	"timestamp": 1
    }
    
  3. Create the Profiler.

    [Stellar]>>> p := PROFILER_INIT(conf)
    Profiler{1 profile(s), 0 messages(s), 0 route(s)}
    
  4. Apply the message to the Profiler.

    [Stellar]>>> PROFILER_APPLY(msg, p)
    Profiler{1 profile(s), 1 messages(s), 1 route(s)}
    
  5. Flush the Profiler. Notice that the 'period' of the measurement that was produced is also from 1970, which indicates that the Profiler successfully used event time.

    [Stellar]>>> PROFILER_FLUSH(p)
    [{period={duration=900000, period=0, start=0, end=900000}, profile=counter, groups=[], value=1, entity=counter}]
    
  6. Now let's do the same, but using processing time. Use the same profile, but this time do not specify a 'timestampField'.

    [Stellar]>>> conf := SHELL_EDIT(conf)
    {
      "profiles" : [
        {
          "profile": "counter",
          "foreach": "'counter'",
          "init": { "counter": 0 },
          "update": { "counter" : "counter + 1" },
          "result": "counter"
        }
      ]
    }
    
  7. Now run through the same steps. Notice how the period of the measurement is based on system time now.

    [Stellar]>>> p := PROFILER_INIT(conf)
    Profiler{1 profile(s), 0 messages(s), 0 route(s)}
    
    [Stellar]>>> PROFILER_APPLY(msg, p)
    Profiler{1 profile(s), 1 messages(s), 1 route(s)}
    
    [Stellar]>>> PROFILER_FLUSH(p)
    [{period={duration=900000, period=1690248, start=1521223200000, end=1521224100000}, profile=counter, groups=[], value=1, entity=counter}]
    

Testing in Storm

  1. Launch a development environment. Shutdown Indexing, Elasticsearch, Kibana, YARN, and MapReduce2 to avoid any resource issues.

  2. Using Ambari, change the following settings and restart the Profiler.

    Set the "Period Duration" to 1 minute.
    Set the "Window Duration" to 15 seconds.
    Set the "Window Lag" to 30 seconds.

  3. Replace /opt/sensor-stubs/bin/start-bro-stub with the following.

    Instead of adding the current time into each Bro message, this will add a timestamp from 1 day ago.

    #
    # how long to delay between each 'batch' in seconds.
    #
    DELAY=${1:-2}
    
    #
    # how many messages to send in each 'batch'.  the messages are drawn randomly
    # from the entire set of canned data.
    #
    COUNT=${2:-10}
    
    INPUT="/opt/sensor-stubs/data/bro.out"
    PRODUCER="/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh"
    TOPIC="bro"
    
    while true; do
    
      # transform the bro timestamp and push to kafka
      SEARCH="\"ts\"\:[0-9]\+\."
      REPLACE="\"ts\"\:`date -d '1 day ago' +'%s'`\."
      shuf -n $COUNT $INPUT | sed -e "s/$SEARCH/$REPLACE/g" | $PRODUCER --broker-list node1:6667 --topic $TOPIC
    
      sleep $DELAY
    done
    
  4. Restart the Bro Sensor Stub.

    service sensor-stubs stop
    service sensor-stubs start bro
    
  5. Open up the REPL and configure the Profiler like so.

    Notice that we are setting the 'timestampField' within the Profiler configuration. This will tell the Profiler to extract the timestamp from this field rather than using system time.

    [Stellar]>>> conf := SHELL_EDIT(conf)
    {
      "profiles": [
        {
          "profile": "hello-world",
          "onlyif": "source.type == 'bro'",
          "foreach": "'global'",
          "init":    { "count": "0" },
          "update":  { "count": "count + 1" },
          "result":  "count"
        }
      ],
      "timestampField": "timestamp"
    }
    [Stellar]>>>
    [Stellar]>>>
    [Stellar]>>> CONFIG_PUT("PROFILER",conf)
    
  6. Query the Profiler data store. This will take a minute or so until you see a value written.

    [Stellar]>>> PROFILE_GET("hello-world", "global", PROFILE_FIXED(2, "DAYS"))
    []
    [Stellar]>>> PROFILE_GET("hello-world", "global", PROFILE_FIXED(2, "DAYS"))
    [200]
    
  7. Now query back just a couple hours instead. Notice that you should get no results. This indicates that the Profiler successfully used the timestamp from the Bro data which contained day old values.

    [Stellar]>>> PROFILE_GET("hello-world", "global", PROFILE_FIXED(2, "HOURS"))
    []
    
  8. Now change the Profiler configuration to remove the "timestampField" setting. This will switch the Profiler back to using system aka processing time.

    [Stellar]>>> conf := SHELL_EDIT(conf)
    {
      "profiles": [
        {
          "profile": "hello-world",
          "onlyif": "source.type == 'bro'",
          "foreach": "'global'",
          "init":    { "count": "0" },
          "update":  { "count": "count + 1" },
          "result":  "count"
        }
      ]
    }
    [Stellar]>>>
    [Stellar]>>> CONFIG_PUT("PROFILER",conf)
    
  9. The Profiler will pick-up the change after the next flush event. Query for profile data in the past few minutes. This shows that the Profiler has switched back to use system time aka processing time.

    [Stellar]>>> PROFILE_GET("hello-world", "global", PROFILE_FIXED(2, "MINUTES"))
    [180, 190]
    
  10. In Storm you can also set logging to DEBUG for "org.apache.metron.profiler". This will output detailed worker logs that allows you to also verify that the profiler is using the correct timestamps.

Pull Request Checklist

  • Is there a JIRA ticket associated with this PR? If not one needs to be created at Metron Jira.
  • Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
  • Has your PR been rebased against the latest commit within the target branch (typically master)?
  • Have you included steps to reproduce the behavior or problem that is being changed or addressed?
  • Have you included steps or a guide to how the change may be verified and tested manually?
  • Have you ensured that the full suite of tests and checks have been executed in the root metron
  • Have you written or updated unit tests and or integration tests to verify your changes?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent?

| Name | | Description
|--- |--- |---
| [profiles](#profiles) | Required | A list of zero or more Profile definitions.
| [timestampField](#timestampfield) | Optional | Indicates whether processing time or event time should be used.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we indicate the default here in the description (for quicker reference)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Update made.

@cestella
Copy link
Member

+1, thanks!

@asfgit asfgit closed this in 3083b47 Mar 20, 2018
@nickwallen nickwallen deleted the METRON-590-2018 branch September 17, 2018 19:29
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants