Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
116444: sql: atomic flushing of sql stats r=koorosh a=koorosh

This change refactors existing logic of flushing SQL stats. The main goal of this change is to fix an issue where flushing and clearing in-memory stats aren't "atomic" operation and may cause cases where not yet persisted stats are cleared unintentionally.

Current change introduces following changes:
1. Introduces `PopAllStatementsStats` function that prepares stats to be persisted and then clears in-memory stats as an atomic operation.
2. the process of flushing stats is following: - pop all stats from in-memory storage - reset in-memory stats - use local copy of stats and persist them
3. before this process was like this: - iterate in-memory stats which could be updated during iteration; - persisting stats could take some time and iteration over stats slow; - after flushing all stats, in-memory storage is cleared, but there's no guaranties that at this moment nothing is added to SQL stats.

New implementation does have disadvantage, it might cause glitches when we pop stats from in-memory storage and before persisting them - user might not see up to date information. It is assumed that this is better than having missing statistics permanently.

Release note: None

Resolves: #115168

119723: kvserver: record QPS stats on successful batch request only r=koorosh a=koorosh

Before, QPS stats for batch request were recorded before request executed,
and it could lead to cases when request failed but calculated QPS is recorded.
This behavior looks inconsistent comparing to other replica stats (ie Write Keys,
or Write Bytes) that recorded only after successful batch request.

This patch changes the order of accounting QPS stats that now recorded after
request completes.
In addition, it calculates QPS on the final batch request (after possible
mutation of it by `maybeStripInFlightWrites` function).

Release note: None

Related issue: #119388
Related issue: #119206

Epic: None

119763: roachprod: add VM label for "spot" instance r=nameisbhaskar a=nameisbhaskar

what was there before: Previously, there was no way to identify spot instances using a label for tracking (billing)
why it needed to change: This was inadequate because the VMs can be tracked for billing using the added label
what you did about it: To address this, this patch adds a new tag as "spot:true"

Fixes: #119629
Epic: none

Co-authored-by: Andrii Vorobiov <[email protected]>
Co-authored-by: Bhaskarjyoti Bora <[email protected]>
  • Loading branch information
3 people committed Mar 4, 2024
4 parents 9addf03 + 046641b + ec58ad1 + f3beddf commit a2420ac
Show file tree
Hide file tree
Showing 28 changed files with 656 additions and 145 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/serverccl/statusccl/tenant_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,8 @@ func testResetSQLStatsRPCForTenant(
}

if flushed {
testTenant.TenantSQLStats().Flush(ctx)
controlCluster.TenantSQLStats(serverccl.RandomServer).Flush(ctx)
testTenant.TenantSQLStats().Flush(ctx, testTenant.GetTenant().AppStopper())
controlCluster.TenantSQLStats(serverccl.RandomServer).Flush(ctx, controlCluster.Tenant(0).GetTenant().AppStopper())
}

status := testTenant.TenantStatusSrv()
Expand Down
30 changes: 17 additions & 13 deletions pkg/cmd/roachprod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,23 +324,27 @@ hosts file.
// We use a hacky workaround below to color the empty string.
// [1] https://github.com/golang/go/issues/12073

// Print header.
fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t\n",
"Cluster", "Clouds", "Size", "VM", "Arch",
color.HiWhiteString("$/hour"), color.HiWhiteString("$ Spent"),
color.HiWhiteString("Uptime"), color.HiWhiteString("TTL"),
color.HiWhiteString("$/TTL"))
// Print separator.
fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t\n",
"", "", "", "",
color.HiWhiteString(""), color.HiWhiteString(""),
color.HiWhiteString(""), color.HiWhiteString(""),
color.HiWhiteString(""))
if !listDetails {
// Print header only if we are not printing cluster details.
fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t\n",
"Cluster", "Clouds", "Size", "VM", "Arch",
color.HiWhiteString("$/hour"), color.HiWhiteString("$ Spent"),
color.HiWhiteString("Uptime"), color.HiWhiteString("TTL"),
color.HiWhiteString("$/TTL"))
// Print separator.
fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t\n",
"", "", "", "",
color.HiWhiteString(""), color.HiWhiteString(""),
color.HiWhiteString(""), color.HiWhiteString(""),
color.HiWhiteString(""))
}
totalCostPerHour := 0.0
for _, name := range names {
c := filteredCloud.Clusters[name]
if listDetails {
c.PrintDetails(config.Logger)
if err = c.PrintDetails(config.Logger); err != nil {
return err
}
} else {
// N.B. Tabwriter doesn't support per-column alignment. It looks odd to have the cluster names right-aligned,
// so we make it left-aligned.
Expand Down
93 changes: 93 additions & 0 deletions pkg/kv/kvserver/replica_rankings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
"fmt"
"math/rand"
"reflect"
"sync/atomic"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
aload "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load"
Expand Down Expand Up @@ -295,6 +297,97 @@ func TestWriteLoadStatsAccounting(t *testing.T) {
}
}

// TestLoadQPSStats validates that replica stats consistently accounted when batch request succeeds or fails.
func TestLoadQPSStats(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

failBatchReq := atomic.Bool{}
failBatchReq.Store(false)
var key roachpb.Key
var qps, writeBytes float64

tc := serverutils.StartCluster(t, 1, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &StoreTestingKnobs{
TestingRequestFilter: func(_ context.Context, ba *kvpb.BatchRequest) *kvpb.Error {
if failBatchReq.Load() {
for _, req := range ba.Requests {
if req.GetInner().Header().Key.Equal(key) {
return kvpb.NewError(fmt.Errorf("failed batch request"))
}
}
}
return nil
},
},
},
},
})

defer tc.Stopper().Stop(ctx)
ts := tc.Server(0)
db := ts.DB()
conn := tc.ServerConn(0)
sqlDB := sqlutils.MakeSQLRunner(conn)

// Disable the consistency checker, to avoid interleaving requests
// artificially inflating QPS due to consistency checking.
sqlDB.Exec(t, `SET CLUSTER SETTING server.consistency_check.interval = '0'`)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.range_split.by_load.enabled = false`)

key = tc.ScratchRange(t)

req := &kvpb.PutRequest{
RequestHeader: kvpb.RequestHeader{Key: key},
Value: roachpb.MakeValueFromString("value"),
}
batchReq := &kvpb.BatchRequest{}
batchReq.Add(req)

store, err := ts.GetStores().(*Stores).GetStore(ts.GetFirstStoreID())
require.NoError(t, err)

repl := store.LookupReplica(roachpb.RKey(key))
require.NotNil(t, repl)
err = db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
failBatchReq.Store(true)
// Reset stats before sending request.
repl.loadStats.Reset()
_, pErr := txn.Send(ctx, batchReq)

qps = repl.loadStats.TestingGetSum(load.Queries)
writeBytes = repl.loadStats.TestingGetSum(load.WriteBytes)
failBatchReq.Store(false)
return pErr.GoError()
})

// Expected error for filtered out batch request.
require.Error(t, err)
require.ErrorContains(t, err, "failed batch request")

// Test that for failed batch request, neither QPS, or write keys/bytes stats are accounted for.
require.Equal(t, 0.0, qps)
require.Equal(t, 0.0, writeBytes)

err = db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Reset stats before sending request.
repl.loadStats.Reset()
_, pErr := txn.Send(ctx, batchReq)
qps = repl.loadStats.TestingGetSum(load.Queries)
writeBytes = repl.loadStats.TestingGetSum(load.WriteBytes)
return pErr.GoError()
})
require.NoError(t, err)

// QPS, write bytes and write keys should be non-zero values.
require.Greater(t, qps, 0.0)
require.Greater(t, writeBytes, 0.0)
}

func TestReadLoadMetricAccounting(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,6 @@ func (r *Replica) SendWithWriteBytes(
// recorded regardless of errors that are encountered.
startCPU := grunning.Time()
defer r.MeasureReqCPUNanos(startCPU)
// Record summary throughput information about the batch request for
// accounting.
r.recordBatchRequestLoad(ctx, ba)

// If the internal Raft group is quiesced, wake it and the leader.
r.maybeUnquiesce(ctx, true /* wakeLeader */, true /* mayCampaign */)
Expand Down Expand Up @@ -216,6 +213,9 @@ func (r *Replica) SendWithWriteBytes(
r.recordBatchForLoadBasedSplitting(ctx, ba, br, int(grunning.Difference(startCPU, grunning.Time())))
}

// Record summary throughput information about the batch request for
// accounting.
r.recordBatchRequestLoad(ctx, ba)
r.recordRequestWriteBytes(writeBytes)
r.recordImpactOnRateLimiter(ctx, br, isReadOnly)
return br, writeBytes, pErr
Expand Down
62 changes: 60 additions & 2 deletions pkg/roachprod/cloud/cluster_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
"bytes"
"context"
"fmt"
"os"
"regexp"
"sort"
"text/tabwriter"
"time"

"github.com/cockroachdb/cockroach/pkg/roachprod/config"
Expand All @@ -26,6 +28,28 @@ import (
"golang.org/x/sync/errgroup"
)

const (
// The following constants are headers that are used for printing the VM details.
headerName = "Name"
headerDNS = "DNS"
headerPrivateIP = "Private IP"
headerPublicIP = "Public IP"
headerMachineType = "Machine Type"
headerCPUArch = "CPU Arch"
headerCPUFamily = "CPU Family"
headerProvisionModel = "Provision Model"

// Provisional models that are used for printing VM details.
spotProvisionModel = "spot"
onDemandProvisionModel = "ondemand"
)

// printDetailsColumnHeaders are the headers to be printed in the defined sequence.
var printDetailsColumnHeaders = []string{
headerName, headerDNS, headerPrivateIP, headerPublicIP, headerMachineType, headerCPUArch, headerCPUFamily,
headerProvisionModel,
}

// Cloud contains information about all known clusters (across multiple cloud
// providers).
type Cloud struct {
Expand Down Expand Up @@ -143,7 +167,7 @@ func (c *Cluster) String() string {
}

// PrintDetails TODO(peter): document
func (c *Cluster) PrintDetails(logger *logger.Logger) {
func (c *Cluster) PrintDetails(logger *logger.Logger) error {
logger.Printf("%s: %s ", c.Name, c.Clouds())
if !c.IsLocal() {
l := c.LifetimeRemaining().Round(time.Second)
Expand All @@ -155,9 +179,43 @@ func (c *Cluster) PrintDetails(logger *logger.Logger) {
} else {
logger.Printf("(no expiration)")
}
// Align columns left and separate with at least two spaces.
tw := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
logPrettifiedHeader(tw, printDetailsColumnHeaders)

for _, vm := range c.VMs {
logger.Printf(" %s\t%s\t%s\t%s\t%s\t%s\t%s", vm.Name, vm.DNS, vm.PrivateIP, vm.PublicIP, vm.MachineType, vm.CPUArch, vm.CPUFamily)
provisionModel := onDemandProvisionModel
if vm.Preemptible {
provisionModel = spotProvisionModel
}
fmt.Fprintf(tw, "%s\n", prettifyRow(printDetailsColumnHeaders, map[string]string{
headerName: vm.Name, headerDNS: vm.DNS, headerPrivateIP: vm.PrivateIP, headerPublicIP: vm.PublicIP,
headerMachineType: vm.MachineType, headerCPUArch: string(vm.CPUArch), headerCPUFamily: vm.CPUFamily,
headerProvisionModel: provisionModel,
}))
}
return tw.Flush()
}

// logPrettifiedHeader writes a prettified row of headers to the tab writer.
func logPrettifiedHeader(tw *tabwriter.Writer, headers []string) {
for _, header := range headers {
fmt.Fprintf(tw, "%s\t", header)
}
fmt.Fprint(tw, "\n")
}

// prettifyRow returns a prettified row of values. the sequence of the header is maintained.
func prettifyRow(headers []string, rowMap map[string]string) string {
row := ""
for _, header := range headers {
value := ""
if v, ok := rowMap[header]; ok {
value = v
}
row = fmt.Sprintf("%s%s\t", row, value)
}
return row
}

// IsLocal returns true if c is a local cluster.
Expand Down
7 changes: 4 additions & 3 deletions pkg/roachprod/roachprod.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,9 @@ func SetupSSH(ctx context.Context, l *logger.Logger, clusterName string) error {
return err
}

cloudCluster.PrintDetails(l)
if err = cloudCluster.PrintDetails(l); err != nil {
return err
}
// Run ssh-keygen -R serially on each new VM in case an IP address has been recycled
for _, v := range cloudCluster.VMs {
cmd := exec.Command("ssh-keygen", "-R", v.PublicIP)
Expand Down Expand Up @@ -667,8 +669,7 @@ func Extend(l *logger.Logger, clusterName string, lifetime time.Duration) error
return fmt.Errorf("cluster %s does not exist", clusterName)
}

c.PrintDetails(l)
return nil
return c.PrintDetails(l)
}

// Default scheduled backup runs a full backup every hour and an incremental
Expand Down
14 changes: 10 additions & 4 deletions pkg/roachprod/vm/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,11 +1090,17 @@ func (p *Provider) runInstance(
m := vm.GetDefaultLabelMap(opts)
m[vm.TagCreated] = timeutil.Now().Format(time.RFC3339)
m["Name"] = name

if providerOpts.UseSpot {
m[vm.TagSpotInstance] = "true"
}

var awsLabelsNameMap = map[string]string{
vm.TagCluster: "Cluster",
vm.TagCreated: "Created",
vm.TagLifetime: "Lifetime",
vm.TagRoachprod: "Roachprod",
vm.TagCluster: "Cluster",
vm.TagCreated: "Created",
vm.TagLifetime: "Lifetime",
vm.TagRoachprod: "Roachprod",
vm.TagSpotInstance: "Spot",
}

var labelPairs []string
Expand Down
4 changes: 4 additions & 0 deletions pkg/roachprod/vm/gce/gcloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,10 @@ func computeLabelsArg(opts vm.CreateOpts, providerOpts *ProviderOpts) (string, e
addLabel(ManagedLabel, "true")
}

if providerOpts.UseSpot {
addLabel(vm.TagSpotInstance, "true")
}

for key, value := range opts.CustomLabels {
_, ok := m[strings.ToLower(key)]
if ok {
Expand Down
2 changes: 2 additions & 0 deletions pkg/roachprod/vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (
TagLifetime = "lifetime"
// TagRoachprod is roachprod tag const, value is true & false.
TagRoachprod = "roachprod"
// TagSpotInstance is a tag added to spot instance vms with value as true.
TagSpotInstance = "spot"
// TagUsage indicates where a certain resource is used. "roachtest" is used
// as the key for roachtest created resources.
TagUsage = "usage"
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/application_api/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestEnsureSQLStatsAreFlushedForTelemetry(t *testing.T) {

statusServer := s.StatusServer().(serverpb.StatusServer)
sqlServer := s.SQLServer().(*sql.Server)
sqlServer.GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)
sqlServer.GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx, srv.AppStopper())
testutils.SucceedsSoon(t, func() error {
// Get the diagnostic info.
res, err := statusServer.Diagnostics(ctx, &serverpb.DiagnosticsRequest{NodeId: "local"})
Expand Down Expand Up @@ -346,7 +346,7 @@ func TestClusterResetSQLStats(t *testing.T) {
populateStats(t, sqlDB)
if flushed {
gateway.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx, gateway.AppStopper())
}

statsPreReset, err := status.Statements(ctx, &serverpb.StatementsRequest{
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (s *drainServer) drainClients(
statsProvider := s.sqlServer.pgServer.SQLServer.GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats)
// If the SQL server is disabled there is nothing to drain here.
if !s.sqlServer.cfg.DisableSQLServer {
statsProvider.Flush(ctx)
statsProvider.Flush(ctx, s.stopper)
}
statsProvider.Stop(ctx)

Expand Down
Loading

0 comments on commit a2420ac

Please sign in to comment.