Skip to content

Commit

Permalink
migrated beacons analysis to using full timestamp list instead of unique
Browse files Browse the repository at this point in the history
  • Loading branch information
lisaSW committed Aug 18, 2022
1 parent 6a8a022 commit 7149f81
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 83 deletions.
4 changes: 2 additions & 2 deletions database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ var MinMongoDBVersion = semver.Version{
//versions of MongoDB compatible with RITA
var MaxMongoDBVersion = semver.Version{
Major: 4,
Minor: 3,
Patch: 0,
Minor: 4,
Patch: 11,
}

// DB is the workhorse container for messing with the database
Expand Down
31 changes: 12 additions & 19 deletions pkg/beacon/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (a *analyzer) start() {
// and need to update uconn table with the strobe flag. This is being done
// here and not in uconns because uconns doesn't do reads, and doesn't know
// the updated conn count
if (res.TsList) == nil {
if (res.TsList == nil) || (res.UniqueTsListLength <= 0) {
// copy variables to be used by bulk callback to prevent capturing by reference
pairSelector := res.Hosts.BSONKey()
update := mgoBulkActions{
Expand All @@ -96,25 +96,15 @@ func (a *analyzer) start() {
tsLength := len(res.TsList) - 1
dsLength := len(res.OrigBytesList)

//find the delta times between the timestamps
//find the delta times between the timestamps and sort
diff := make([]int64, tsLength)
for i := 0; i < tsLength; i++ {
diff[i] = res.TsList[i+1] - res.TsList[i]
}

//find the delta times between full list of timestamps
//(this will be used for the intervals list. Bowleys skew
//must use a unique timestamp list with no duplicates)
tsLengthFull := len(res.TsListFull) - 1
//find the delta times between the timestamps
diffFull := make([]int64, tsLengthFull)
for i := 0; i < tsLengthFull; i++ {
diffFull[i] = res.TsListFull[i+1] - res.TsListFull[i]
}
sort.Sort(util.SortableInt64(diff))

//perfect beacons should have symmetric delta time and size distributions
//Bowley's measure of skew is used to check symmetry
sort.Sort(util.SortableInt64(diff))
tsSkew := float64(0)
dsSkew := float64(0)

Expand Down Expand Up @@ -169,9 +159,7 @@ func (a *analyzer) start() {
//get a list of the intervals found in the data,
//the number of times the interval was found,
//and the most occurring interval
//sort intervals list (origbytes already sorted)
sort.Sort(util.SortableInt64(diffFull))
intervals, intervalCounts, tsMode, tsModeCount := createCountMap(diffFull)
intervals, intervalCounts, tsMode, tsModeCount := createCountMap(diff)
dsSizes, dsCounts, dsMode, dsModeCount := createCountMap(res.OrigBytesList)

//more skewed distributions receive a lower score
Expand All @@ -180,9 +168,14 @@ func (a *analyzer) start() {
dsSkewScore := 1.0 - math.Abs(dsSkew) //smush dsSkew

//lower dispersion is better
tsMadmScore := 1.0 - float64(tsMadm)/float64(tsMid)
if tsMadmScore < 0 {
tsMadmScore = 0
// tsMadmScore := 1.0 - float64(tsMadm)/float64(tsMid)
// if tsMadmScore < 0 {
// tsMadmScore = 0
// }

tsMadmScore := 1.0
if tsMid >= 1 {
tsMadmScore = 1.0 - float64(tsMadm)/float64(tsMid)
}

//lower dispersion is better
Expand Down
50 changes: 25 additions & 25 deletions pkg/beacon/dissector.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,39 +104,39 @@ func (d *dissector) start() {
{"$unwind": "$ts"},
{"$unwind": "$ts"},
{"$group": bson.M{
"_id": "$_id",
"ts": bson.M{"$addToSet": "$ts"},
"ts_full": bson.M{"$push": "$ts"},
"bytes": bson.M{"$first": "$bytes"},
"count": bson.M{"$first": "$count"},
"tbytes": bson.M{"$first": "$tbytes"},
"_id": "$_id",
"ts_unique": bson.M{"$addToSet": "$ts"},
"ts": bson.M{"$push": "$ts"},
"bytes": bson.M{"$first": "$bytes"},
"count": bson.M{"$first": "$count"},
"tbytes": bson.M{"$first": "$tbytes"},
}},
{"$unwind": "$bytes"},
{"$unwind": "$bytes"},
{"$group": bson.M{
"_id": "$_id",
"ts": bson.M{"$first": "$ts"},
"ts_full": bson.M{"$first": "$ts_full"},
"bytes": bson.M{"$push": "$bytes"},
"count": bson.M{"$first": "$count"},
"tbytes": bson.M{"$first": "$tbytes"},
"_id": "$_id",
"ts_unique": bson.M{"$first": "$ts_unique"},
"ts": bson.M{"$first": "$ts"},
"bytes": bson.M{"$push": "$bytes"},
"count": bson.M{"$first": "$count"},
"tbytes": bson.M{"$first": "$tbytes"},
}},
{"$project": bson.M{
"_id": "$_id",
"ts": 1,
"ts_full": 1,
"bytes": 1,
"count": 1,
"tbytes": 1,
"_id": "$_id",
"ts_unique_len": bson.M{"$size": "$ts_unique"},
"ts": 1,
"bytes": 1,
"count": 1,
"tbytes": 1,
}},
}

var res struct {
Count int64 `bson:"count"`
Ts []int64 `bson:"ts"`
TsFull []int64 `bson:"ts_full"`
Bytes []int64 `bson:"bytes"`
TBytes int64 `bson:"tbytes"`
Count int64 `bson:"count"`
TsUniqueLen int64 `bson:"ts_unique_len"`
Ts []int64 `bson:"ts"`
Bytes []int64 `bson:"bytes"`
TBytes int64 `bson:"tbytes"`
}

_ = ssn.DB(d.db.GetSelectedDB()).C(d.conf.T.Structure.UniqueConnTable).Pipe(uconnFindQuery).AllowDiskUse().One(&res)
Expand All @@ -156,11 +156,11 @@ func (d *dissector) start() {

} else { // otherwise, parse timestamps and orig ip bytes
analysisInput.TsList = res.Ts
analysisInput.TsListFull = res.TsFull
analysisInput.UniqueTsListLength = res.TsUniqueLen
analysisInput.OrigBytesList = res.Bytes
// the analysis worker requires that we have over UNIQUE 3 timestamps
// we drop the input here since it is the earliest place in the pipeline to do so
if len(analysisInput.TsList) > 3 {
if analysisInput.UniqueTsListLength > 3 {
d.dissectedCallback(analysisInput)
}
}
Expand Down
43 changes: 21 additions & 22 deletions pkg/beacon/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"github.com/activecm/rita/util"

"github.com/globalsign/mgo"
"github.com/vbauerster/mpb"
"github.com/vbauerster/mpb/decor"

log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -114,20 +112,21 @@ func (r *repo) Upsert(uconnMap map[string]*uconn.Input, hostMap map[string]*host
}

// progress bar for troubleshooting
p := mpb.New(mpb.WithWidth(20))
bar := p.AddBar(int64(len(uconnMap)),
mpb.PrependDecorators(
decor.Name("\t[-] Beacon Analysis:", decor.WC{W: 30, C: decor.DidentRight}),
decor.CountersNoUnit(" %d / %d ", decor.WCSyncWidth),
),
mpb.AppendDecorators(decor.Percentage()),
)
// p := mpb.New(mpb.WithWidth(20))
// bar := p.AddBar(int64(len(uconnMap)),
// mpb.PrependDecorators(
// decor.Name("\t[-] Beacon Analysis:", decor.WC{W: 30, C: decor.DidentRight}),
// decor.CountersNoUnit(" %d / %d ", decor.WCSyncWidth),
// ),
// mpb.AppendDecorators(decor.Percentage()),
// )

// loop over map entries
for _, entry := range uconnMap {
dissectorWorker.collect(entry)
bar.IncrBy(1)
// bar.IncrBy(1)
}
p.Wait()
// p.Wait()

// start the closing cascade (this will also close the other channels)
dissectorWorker.close()
Expand Down Expand Up @@ -161,22 +160,22 @@ func (r *repo) Upsert(uconnMap map[string]*uconn.Input, hostMap map[string]*host
}

// add a progress bar for troubleshooting
p = mpb.New(mpb.WithWidth(20))
bar = p.AddBar(int64(len(localHosts)),
mpb.PrependDecorators(
decor.Name("\t[-] Beacon Aggregation:", decor.WC{W: 30, C: decor.DidentRight}),
decor.CountersNoUnit(" %d / %d ", decor.WCSyncWidth),
),
mpb.AppendDecorators(decor.Percentage()),
)
// p = mpb.New(mpb.WithWidth(20))
// bar = p.AddBar(int64(len(localHosts)),
// mpb.PrependDecorators(
// decor.Name("\t[-] Beacon Aggregation:", decor.WC{W: 30, C: decor.DidentRight}),
// decor.CountersNoUnit(" %d / %d ", decor.WCSyncWidth),
// ),
// mpb.AppendDecorators(decor.Percentage()),
// )

// loop over the local hosts that need to be summarized
for _, localHost := range localHosts {
summarizerWorker.collect(localHost)
bar.IncrBy(1)
// bar.IncrBy(1)
}

p.Wait()
// p.Wait()

// start the closing cascade (this will also close the other channels)
summarizerWorker.close()
Expand Down
1 change: 0 additions & 1 deletion pkg/beacon/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func (s *sorter) start() {
if (data.TsList) != nil {
//sort the size and timestamps to compute quantiles in the analyzer
sort.Sort(util.SortableInt64(data.TsList))
sort.Sort(util.SortableInt64(data.TsListFull))
sort.Sort(util.SortableInt64(data.OrigBytesList))
}
s.sortedCallback(data)
Expand Down
28 changes: 14 additions & 14 deletions pkg/uconn/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@ type update struct {

//Input holds aggregated connection information between two hosts in a dataset
type Input struct {
Hosts data.UniqueIPPair
ConnectionCount int64
IsLocalSrc bool
IsLocalDst bool
TotalBytes int64
MaxDuration float64
TotalDuration float64
TsList []int64
TsListFull []int64
OrigBytesList []int64
Tuples data.StringSet
InvalidCertFlag bool
UPPSFlag bool
ConnStateMap map[string]*ConnState
Hosts data.UniqueIPPair
ConnectionCount int64
IsLocalSrc bool
IsLocalDst bool
TotalBytes int64
MaxDuration float64
TotalDuration float64
TsList []int64
UniqueTsListLength int64
OrigBytesList []int64
Tuples data.StringSet
InvalidCertFlag bool
UPPSFlag bool
ConnStateMap map[string]*ConnState
}

//LongConnResult represents a pair of hosts that communicated and
Expand Down

0 comments on commit 7149f81

Please sign in to comment.