Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize/monitor #4481

Merged
merged 10 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion controllers/pkg/database/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
type Interface interface {
Account
Auth
Traffic
}

type Auth interface {
Expand All @@ -51,11 +52,20 @@ type Account interface {
GetBillingCount(accountType common.Type, startTime, endTime time.Time) (count, amount int64, err error)
GenerateBillingData(startTime, endTime time.Time, prols *resources.PropertyTypeLS, namespaces []string, owner string) (orderID []string, amount int64, err error)
InsertMonitor(ctx context.Context, monitors ...*resources.Monitor) error
GetDistinctMonitorCombinations(startTime, endTime time.Time, namespace string) ([]resources.Monitor, error)
DropMonitorCollectionsOlderThan(days int) error
Disconnect(ctx context.Context) error
Creator
}

type Traffic interface {
GetTrafficSentBytes(startTime, endTime time.Time, namespace string, _type uint8, name string) (int64, error)
GetTrafficRecvBytes(startTime, endTime time.Time, namespace string, _type uint8, name string) (int64, error)

GetPodTrafficSentBytes(startTime, endTime time.Time, namespace string, name string) (int64, error)
GetPodTrafficRecvBytes(startTime, endTime time.Time, namespace string, name string) (int64, error)
}

type Creator interface {
CreateBillingIfNotExist() error
//suffix by day, eg: monitor_20200101
Expand All @@ -74,7 +84,8 @@ type MeteringOwnerTimeResult struct {
//}

const (
MongoURI = "MONGO_URI"
MongoURI = "MONGO_URI"
TrafficMongoURI = "TRAFFIC_MONGO_URI"
//MongoUsername = "MONGO_USERNAME"
//MongoPassword = "MONGO_PASSWORD"
//RetentionDay = "RETENTION_DAY"
Expand Down
135 changes: 49 additions & 86 deletions controllers/pkg/database/mongo/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,21 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"golang.org/x/sync/errgroup"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
DefaultAccountDBName = "sealos-resources"
DefaultTrafficDBName = "sealos-networkmanager-synchronizer"
DefaultAuthDBName = "sealos-auth"
DefaultMeteringConn = "metering"
DefaultMonitorConn = "monitor"
DefaultBillingConn = "billing"
DefaultUserConn = "user"
DefaultPricesConn = "prices"
DefaultPropertiesConn = "properties"
//TODO fix
DefaultTrafficConn = "traffic"
)

const DefaultRetentionDay = 30
Expand All @@ -61,13 +63,15 @@ var cryptoKey = defaultCryptoKey
type mongoDB struct {
Client *mongo.Client
AccountDB string
TrafficDB string
AuthDB string
UserConn string
MonitorConnPrefix string
MeteringConn string
BillingConn string
PricesConn string
PropertiesConn string
TrafficConn string
}

type AccountBalanceSpecBSON struct {
Expand Down Expand Up @@ -247,6 +251,42 @@ func (m *mongoDB) InsertMonitor(ctx context.Context, monitors ...*resources.Moni
return err
}

func (m *mongoDB) GetDistinctMonitorCombinations(startTime, endTime time.Time, namespace string) ([]resources.Monitor, error) {
pipeline := mongo.Pipeline{
{{Key: "$match", Value: bson.M{
"time": bson.M{
"$gte": startTime.UTC(),
"$lt": endTime.UTC(),
},
"category": namespace,
}}},
{{Key: "$group", Value: bson.M{
"_id": bson.M{
"category": "$category",
"name": "$name",
"type": "$type",
},
}}},
}
cursor, err := m.getMonitorCollection(startTime).Aggregate(context.Background(), pipeline)
if err != nil {
return nil, fmt.Errorf("aggregate error: %v", err)
}
defer cursor.Close(context.Background())
var monitors []resources.Monitor
for cursor.Next(context.Background()) {
var result = make(map[string]resources.Monitor, 1)
if err := cursor.Decode(result); err != nil {
return nil, fmt.Errorf("decode error: %v", err)
}
monitors = append(monitors, result["_id"])
}
if err := cursor.Err(); err != nil {
return nil, fmt.Errorf("cursor error: %v", err)
}
return monitors, nil
}

func (m *mongoDB) GetAllPricesMap() (map[string]resources.Price, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -303,90 +343,6 @@ func (m *mongoDB) SavePropertyTypes(types []resources.PropertyType) error {
return err
}

// 2020-12-01 23:00:00 - 2020-12-02 00:00:00
// 2020-12-02 00:00:00 - 2020-12-02 01:00:00
func (m *mongoDB) GenerateMeteringData(startTime, endTime time.Time, prices map[string]resources.Price) error {
filter := bson.M{
"time": bson.M{
"$gte": startTime,
"$lt": endTime,
},
}
cursor, err := m.getMonitorCollection(startTime).Find(context.Background(), filter)
if err != nil {
return fmt.Errorf("find monitors error: %v", err)
}
defer cursor.Close(context.Background())

meteringMap := make(map[string]map[string]int64)
countMap := make(map[string]map[string]int64)
updateTimeMap := make(map[string]map[string]*time.Time)

for cursor.Next(context.Background()) {
var monitor resources.Monitor
if err := cursor.Decode(&monitor); err != nil {
return fmt.Errorf("decode monitor error: %v", err)
}

if _, ok := updateTimeMap[monitor.Category]; !ok {
updateTimeMap[monitor.Category] = make(map[string]*time.Time)
}
if _, ok := updateTimeMap[monitor.Category][monitor.Property]; !ok {
lastUpdateTime, err := m.GetUpdateTimeForCategoryAndPropertyFromMetering(monitor.Category, monitor.Property)
if err != nil {
logger.Debug(err, "get latest update time failed", "category", monitor.Category, "property", monitor.Property)
}
updateTimeMap[monitor.Category][monitor.Property] = &lastUpdateTime
}
lastUpdateTime := updateTimeMap[monitor.Category][monitor.Property].UTC()

if /* skip last update lte 1 hour*/ lastUpdateTime.Before(startTime) || lastUpdateTime.Equal(startTime) {
if _, ok := meteringMap[monitor.Category]; !ok {
meteringMap[monitor.Category] = make(map[string]int64)
countMap[monitor.Category] = make(map[string]int64)
}
//TODO interface will delete
//meteringMap[monitor.Category][monitor.Property] += monitor.Value
countMap[monitor.Category][monitor.Property]++
continue
}
logger.Debug("Info", "skip metering", "category", monitor.Category, "property", monitor.Property, "lastUpdateTime", updateTimeMap[monitor.Category][monitor.Property].UTC(), "startTime", startTime)
}

if err := cursor.Err(); err != nil {
return fmt.Errorf("cursor error: %v", err)
}
eg, _ := errgroup.WithContext(context.Background())

for category, propertyMap := range meteringMap {
for property, totalValue := range propertyMap {
count := countMap[category][property]
if count < 60 {
count = 60
}
unitValue := math.Ceil(float64(totalValue) / float64(count))
metering := &resources.Metering{
Category: category,
Property: property,
Time: endTime,
Amount: int64(unitValue * float64(prices[property].Price)),
Value: int64(unitValue),
//Detail: "",
}
_category, _property := category, property
eg.Go(func() error {
_, err := m.getMeteringCollection().InsertOne(context.Background(), metering)
if err != nil {
//TODO if insert failed, should todo?
logger.Error(err, "insert metering data failed", "category", _category, "property", _property)
}
return err
})
}
}
return eg.Wait()
}

/*
monitors = append(monitors, &common.Monitor{
Category: namespace.Name,
Expand Down Expand Up @@ -445,9 +401,14 @@ func (m *mongoDB) GenerateBillingData(startTime, endTime time.Time, prols *resou
}}}
continue
}
if value.PriceType == resources.SUM {
groupStage = append(groupStage, primitive.E{Key: keyStr, Value: bson.D{{Key: "$sum", Value: "$used." + keyStr}}})
usedStage[keyStr] = bson.D{{Key: "$toInt", Value: "$" + keyStr}}
continue
}
groupStage = append(groupStage, primitive.E{Key: keyStr, Value: bson.D{{Key: "$sum", Value: "$used." + keyStr}}})
usedStage[keyStr] = bson.D{{Key: "$toInt", Value: bson.D{{Key: "$round", Value: bson.D{{Key: "$divide", Value: bson.A{
"$" + keyStr, bson.D{{Key: "$cond", Value: bson.A{bson.D{{Key: "$gt", Value: bson.A{"$count", minutes}}}, "$count", minutes}}}}}}}}}}
"$" + keyStr, minutes}}}}}}}
}

// add the used phase to the $project phase
Expand Down Expand Up @@ -981,12 +942,14 @@ func NewMongoInterface(ctx context.Context, URL string) (database.Interface, err
return &mongoDB{
Client: client,
AccountDB: DefaultAccountDBName,
TrafficDB: DefaultTrafficDBName,
AuthDB: DefaultAuthDBName,
UserConn: DefaultUserConn,
MeteringConn: DefaultMeteringConn,
MonitorConnPrefix: DefaultMonitorConn,
BillingConn: DefaultBillingConn,
PricesConn: DefaultPricesConn,
PropertiesConn: DefaultPropertiesConn,
TrafficConn: DefaultTrafficConn,
}, err
}
131 changes: 131 additions & 0 deletions controllers/pkg/database/mongo/traffic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright © 2024 sealos.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mongo

import (
"context"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

/* example:
{
_id: ObjectId("60eea26373c4cdcb6356827d"),
traffic_meta: {
pod_name: "my-pod",
pod_namespace: "my-namespace",
pod_address: "100.64.0.1",
traffic_tag: "port:80",
pod_type: 1,
pod_type_name: "mongodb"
},
timestamp: "2024-01-04T04:02:25",
sent_bytes: 31457280,
recv_bytes: 15728640
}
*/

func (m *mongoDB) GetTrafficRecvBytes(startTime, endTime time.Time, namespace string, _type uint8, name string) (int64, error) {
return m.getTrafficBytes(false, startTime, endTime, namespace, _type, name)
}

func (m *mongoDB) GetTrafficSentBytes(startTime, endTime time.Time, namespace string, _type uint8, name string) (int64, error) {
return m.getTrafficBytes(true, startTime, endTime, namespace, _type, name)
}

func (m *mongoDB) GetPodTrafficSentBytes(startTime, endTime time.Time, namespace string, name string) (int64, error) {
return m.getPodTrafficBytes(true, startTime, endTime, namespace, name)
}

func (m *mongoDB) GetPodTrafficRecvBytes(startTime, endTime time.Time, namespace string, name string) (int64, error) {
return m.getPodTrafficBytes(false, startTime, endTime, namespace, name)
}

func (m *mongoDB) getPodTrafficBytes(sent bool, startTime, endTime time.Time, namespace string, name string) (int64, error) {
filter := bson.M{
"traffic_meta.pod_namespace": namespace,
"traffic_meta.pod_name": name,
"timestamp": bson.M{
"$gte": startTime,
"$lt": endTime,
},
}
pipeline := mongo.Pipeline{
bson.D{{Key: "$match", Value: filter}},
}
if sent {
pipeline = append(pipeline, bson.D{{Key: "$group", Value: bson.D{{Key: "_id", Value: nil}, {Key: "total", Value: bson.D{{Key: "$sum", Value: "$sent_bytes"}}}}}})
} else {
pipeline = append(pipeline, bson.D{{Key: "$group", Value: bson.D{{Key: "_id", Value: nil}, {Key: "total", Value: bson.D{{Key: "$sum", Value: "$recv_bytes"}}}}}})
}
cur, err := m.getTrafficCollection().Aggregate(context.Background(), pipeline)
if err != nil {
return 0, err
}
defer cur.Close(context.Background())
total := int64(0)
for cur.Next(context.Background()) {
var result struct {
Total int64 `bson:"total"`
}
if err := cur.Decode(&result); err != nil {
return 0, err
}
total += result.Total
}
return total, nil
}

func (m *mongoDB) getTrafficBytes(sent bool, startTime, endTime time.Time, namespace string, _type uint8, name string) (int64, error) {
filter := bson.M{
"traffic_meta.pod_namespace": namespace,
"traffic_meta.pod_type": _type,
"traffic_meta.pod_type_name": name,
"timestamp": bson.M{
"$gte": startTime,
"$lte": endTime,
},
}
pipeline := mongo.Pipeline{
bson.D{{Key: "$match", Value: filter}},
}
if sent {
pipeline = append(pipeline, bson.D{{Key: "$group", Value: bson.D{{Key: "_id", Value: nil}, {Key: "total", Value: bson.D{{Key: "$sum", Value: "$sent_bytes"}}}}}})
} else {
pipeline = append(pipeline, bson.D{{Key: "$group", Value: bson.D{{Key: "_id", Value: nil}, {Key: "total", Value: bson.D{{Key: "$sum", Value: "$recv_bytes"}}}}}})
}
cur, err := m.getTrafficCollection().Aggregate(context.Background(), pipeline)
if err != nil {
return 0, err
}
defer cur.Close(context.Background())
total := int64(0)
for cur.Next(context.Background()) {
var result struct {
Total int64 `bson:"total"`
}
if err := cur.Decode(&result); err != nil {
return 0, err
}
total += result.Total
}
return total, nil
}

func (m *mongoDB) getTrafficCollection() *mongo.Collection {
return m.Client.Database(m.TrafficDB).Collection(m.TrafficConn)
}
Loading
Loading