Skip to content

Commit

Permalink
feat(google): Set refresh interval for compute module (#59)
Browse files Browse the repository at this point in the history
This alters the `collector.Collect` method on the compute module to also check if the pricing map needs to be refreshed. This is a configurable paramater so that folks that want a tighter refresh interval are easily able to do it.

- closes #56
  • Loading branch information
Pokom authored Dec 19, 2023
1 parent df9de96 commit b14025f
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 9 deletions.
15 changes: 11 additions & 4 deletions pkg/google/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"regexp"
"strings"
"time"

"cloud.google.com/go/billing/apiv1/billingpb"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -37,7 +38,8 @@ var (
)

type Config struct {
Projects string
Projects string
ScrapeInterval time.Duration
}

// Collector implements the Collector interface for compute services in GKE.
Expand All @@ -47,6 +49,7 @@ type Collector struct {
PricingMap *StructuredPricingMap
config *Config
Projects []string
NextScrape time.Time
}

// New is a helper method to properly setup a compute.Collector struct.
Expand Down Expand Up @@ -225,9 +228,10 @@ func (c *Collector) Register(registry provider.Registry) error {
}

func (c *Collector) Collect() error {
log.Println("Collecting GKE metrics")
// TODO: Consider adding a timer to this so we can refresh after a certain period of time
if c.PricingMap == nil {
start := time.Now()
log.Printf("Collecting %s metrics", c.Name())
if c.PricingMap == nil || time.Now().After(c.NextScrape) {
log.Println("Refreshing pricing map")
serviceName, err := c.GetServiceName()
if err != nil {
return err
Expand All @@ -238,6 +242,8 @@ func (c *Collector) Collect() error {
return err
}
c.PricingMap = pricingMap
c.NextScrape = time.Now().Add(c.config.ScrapeInterval)
log.Printf("Finished refreshing pricing map in %s", time.Since(start))
}
for _, project := range c.Projects {
instances, err := c.ListInstances(project)
Expand Down Expand Up @@ -270,6 +276,7 @@ func (c *Collector) Collect() error {
}).Set(ramCost)
}
}
log.Printf("Finished collecting GKE metrics in %s", time.Since(start))

return nil
}
Expand Down
142 changes: 142 additions & 0 deletions pkg/google/compute/compute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"strings"
"testing"
"time"

billingv1 "cloud.google.com/go/billing/apiv1"
"cloud.google.com/go/billing/apiv1/billingpb"
Expand Down Expand Up @@ -467,6 +468,47 @@ func (s *fakeCloudCatalogServer) ListSkus(ctx context.Context, req *billingpb.Li
}, nil
}

type fakeCloudCatalogServerSlimResults struct {
billingpb.UnimplementedCloudCatalogServer
}

func (s *fakeCloudCatalogServerSlimResults) ListServices(ctx context.Context, req *billingpb.ListServicesRequest) (*billingpb.ListServicesResponse, error) {
return &billingpb.ListServicesResponse{
Services: []*billingpb.Service{
{
DisplayName: "Compute Engine",
Name: "compute-engine",
},
},
}, nil
}

func (s *fakeCloudCatalogServerSlimResults) ListSkus(ctx context.Context, req *billingpb.ListSkusRequest) (*billingpb.ListSkusResponse, error) {
return &billingpb.ListSkusResponse{
Skus: []*billingpb.Sku{
{
Name: "test",
Description: "N1 Predefined Instance Core running in Americas",
ServiceRegions: []string{"us-central1"},
PricingInfo: []*billingpb.PricingInfo{
{
PricingExpression: &billingpb.PricingExpression{
TieredRates: []*billingpb.PricingExpression_TierRate{
{
UnitPrice: &money.Money{
CurrencyCode: "USD",
Nanos: 1e9,
},
},
},
},
},
},
},
},
}, nil
}

func TestCollector_Collect(t *testing.T) {
tests := map[string]struct {
config *Config
Expand Down Expand Up @@ -585,3 +627,103 @@ func TestCollector_Collect(t *testing.T) {
})
}
}

func TestCollector_GetPricing(t *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
buf := &compute.InstanceAggregatedList{
Items: map[string]compute.InstancesScopedList{
"projects/testing/zones/us-central1-a": {
Instances: []*compute.Instance{
{
Name: "test-n1",
MachineType: "abc/n1-slim",
Zone: "testing/us-central1-a",
Scheduling: &compute.Scheduling{
ProvisioningModel: "test",
},
},
{
Name: "test-n2",
MachineType: "abc/n2-slim",
Zone: "testing/us-central1-a",
Scheduling: &compute.Scheduling{
ProvisioningModel: "test",
},
},
{
Name: "test-n1-spot",
MachineType: "abc/n1-slim",
Zone: "testing/us-central1-a",
Scheduling: &compute.Scheduling{
ProvisioningModel: "SPOT",
},
},
},
},
},
}
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(buf)
}))

computeService, err := computev1.NewService(context.Background(), option.WithoutAuthentication(), option.WithEndpoint(testServer.URL))
require.NoError(t, err)
// Create the collector with a nil billing service so we can override it on each test case
collector := New(&Config{
Projects: "testing",
}, computeService, nil)

var pricingMap *StructuredPricingMap
t.Run("Test that the pricing map is cached", func(t *testing.T) {
l, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
gsrv := grpc.NewServer()
defer gsrv.Stop()
go func() {
if err := gsrv.Serve(l); err != nil {
t.Errorf("failed to serve: %v", err)
}
}()

billingpb.RegisterCloudCatalogServer(gsrv, &fakeCloudCatalogServer{})
cloudCatalagClient, err := billingv1.NewCloudCatalogClient(context.Background(),
option.WithEndpoint(l.Addr().String()),
option.WithoutAuthentication(),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
)

collector.billingService = cloudCatalagClient

require.NotNil(t, collector)

err = collector.Collect()
require.NoError(t, err)

pricingMap = collector.PricingMap
err = collector.Collect()
require.Equal(t, pricingMap, collector.PricingMap)
})

t.Run("Test that the pricing map is updated after the next scrape", func(t *testing.T) {
l, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
gsrv := grpc.NewServer()
defer gsrv.Stop()
go func() {
if err := gsrv.Serve(l); err != nil {
t.Errorf("failed to serve: %v", err)
}
}()
billingpb.RegisterCloudCatalogServer(gsrv, &fakeCloudCatalogServerSlimResults{})
cloudCatalogClient, _ := billingv1.NewCloudCatalogClient(context.Background(),
option.WithEndpoint(l.Addr().String()),
option.WithoutAuthentication(),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
)

collector.billingService = cloudCatalogClient
collector.NextScrape = time.Now().Add(-1 * time.Minute)
err = collector.Collect()
require.NotEqual(t, pricingMap, collector.PricingMap)
})
}
7 changes: 3 additions & 4 deletions pkg/google/compute/pricing_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package compute
import (
"errors"
"fmt"
"log"
"regexp"
"strings"

Expand Down Expand Up @@ -106,15 +105,15 @@ func GeneratePricingMap(skus []*billingpb.Sku) (*StructuredPricingMap, error) {
rawData, err := getDataFromSku(sku)

if errors.Is(err, SkuNotRelevant) {
log.Println(fmt.Errorf("%w: %s", SkuNotRelevant, sku.Description))
fmt.Errorf("%w: %s\n", SkuNotRelevant, sku.Description)
continue
}
if errors.Is(err, PricingDataIsOff) {
log.Println(fmt.Errorf("%w: %s", PricingDataIsOff, sku.Description))
fmt.Errorf("%w: %s\n", PricingDataIsOff, sku.Description)
continue
}
if errors.Is(err, SkuNotParsable) {
log.Println(fmt.Errorf("%w: %s", SkuNotParsable, sku.Description))
fmt.Errorf("%w: %s\n", SkuNotParsable, sku.Description)
continue
}
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/google/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func New(config *Config) (*GCP, error) {
}
case "GKE":
collector = compute.New(&compute.Config{
Projects: config.Projects,
Projects: config.Projects,
ScrapeInterval: config.ScrapeInterval,
}, computeService, cloudCatalogClient)
default:
log.Printf("Unknown service %s", service)
Expand Down

0 comments on commit b14025f

Please sign in to comment.