Skip to content

Commit

Permalink
feat: add bytes_sent and bytes_received as metrics (#856)
Browse files Browse the repository at this point in the history
Adding additional metrics to be reported by Go Connector.

The Connector will now report the number of bytes_sent to Cloud SQL
and bytes_received from Cloud SQL.
  • Loading branch information
jackwotherspoon authored Aug 13, 2024
1 parent f482119 commit d0e493f
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 3 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,9 @@ Supported metrics include:
- `cloudsqlconn/refresh_success_count`: The number of successful certificate
refresh operations
- `cloudsqlconn/refresh_failure_count`: The number of failed refresh
operations.
operations
- `cloudsqlconn/bytes_sent`: The number of bytes sent to Cloud SQL
- `cloudsqlconn/bytes_received`: The number of bytes received from Cloud SQL

Supported traces include:

Expand Down
28 changes: 26 additions & 2 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (d *Dialer) Dial(ctx context.Context, icn string, opts ...DialOption) (conn
return newInstrumentedConn(tlsConn, func() {
n := atomic.AddUint64(c.openConns, ^uint64(0))
trace.RecordOpenConnections(context.Background(), int64(n), d.dialerID, cn.String())
}), nil
}, d.dialerID, cn.String()), nil
}

// removeCached stops all background refreshes and deletes the connection
Expand Down Expand Up @@ -479,10 +479,12 @@ func (d *Dialer) Warmup(ctx context.Context, icn string, opts ...DialOption) err

// newInstrumentedConn initializes an instrumentedConn that on closing will
// decrement the number of open connects and record the result.
func newInstrumentedConn(conn net.Conn, closeFunc func()) *instrumentedConn {
func newInstrumentedConn(conn net.Conn, closeFunc func(), dialerID, connName string) *instrumentedConn {
return &instrumentedConn{
Conn: conn,
closeFunc: closeFunc,
dialerID: dialerID,
connName: connName,
}
}

Expand All @@ -491,6 +493,28 @@ func newInstrumentedConn(conn net.Conn, closeFunc func()) *instrumentedConn {
type instrumentedConn struct {
net.Conn
closeFunc func()
dialerID string
connName string
}

// Read delegates to the underlying net.Conn interface and records number of
// bytes read
func (i *instrumentedConn) Read(b []byte) (int, error) {
bytesRead, err := i.Conn.Read(b)
if err == nil {
go trace.RecordBytesReceived(context.Background(), int64(bytesRead), i.connName, i.dialerID)
}
return bytesRead, err
}

// Write delegates to the underlying net.Conn interface and records number of
// bytes written
func (i *instrumentedConn) Write(b []byte) (int, error) {
bytesWritten, err := i.Conn.Write(b)
if err == nil {
go trace.RecordBytesSent(context.Background(), int64(bytesWritten), i.connName, i.dialerID)
}
return bytesWritten, err
}

// Close delegates to the underlying net.Conn interface and reports the close
Expand Down
38 changes: 38 additions & 0 deletions internal/trace/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ var (
"A failed certificate refresh operation",
stats.UnitDimensionless,
)
mBytesSent = stats.Int64(
"cloudsqlconn/bytes_sent",
"The bytes sent to Cloud SQL",
stats.UnitDimensionless,
)
mBytesReceived = stats.Int64(
"cloudsqlconn/bytes_received",
"The bytes received from Cloud SQL",
stats.UnitDimensionless,
)

latencyView = &view.View{
Name: "cloudsqlconn/dial_latency",
Expand Down Expand Up @@ -94,6 +104,20 @@ var (
Aggregation: view.Count(),
TagKeys: []tag.Key{keyInstance, keyDialerID, keyErrorCode},
}
bytesSentView = &view.View{
Name: "cloudsqlconn/bytes_sent",
Measure: mBytesSent,
Description: "The number of bytes sent to Cloud SQL",
Aggregation: view.LastValue(),
TagKeys: []tag.Key{keyInstance, keyDialerID},
}
bytesReceivedView = &view.View{
Name: "cloudsqlconn/bytes_received",
Measure: mBytesReceived,
Description: "The number of bytes received from Cloud SQL",
Aggregation: view.LastValue(),
TagKeys: []tag.Key{keyInstance, keyDialerID},
}

registerOnce sync.Once
registerErr error
Expand All @@ -110,6 +134,8 @@ func InitMetrics() error {
dialFailureView,
refreshCountView,
failedRefreshCountView,
bytesSentView,
bytesReceivedView,
); rErr != nil {
registerErr = fmt.Errorf("failed to initialize metrics: %v", rErr)
}
Expand Down Expand Up @@ -157,6 +183,18 @@ func RecordRefreshResult(ctx context.Context, instance, dialerID string, err err
stats.Record(ctx, mSuccessfulRefresh.M(1))
}

// RecordBytesSent reports the number of bytes sent to Cloud SQL
func RecordBytesSent(ctx context.Context, num int64, instance, dialerID string) {
ctx, _ = tag.New(ctx, tag.Upsert(keyInstance, instance), tag.Upsert(keyDialerID, dialerID))
stats.Record(ctx, mBytesSent.M(num))
}

// RecordBytesReceived reports the number of bytes received from Cloud SQL
func RecordBytesReceived(ctx context.Context, num int64, instance, dialerID string) {
ctx, _ = tag.New(ctx, tag.Upsert(keyInstance, instance), tag.Upsert(keyDialerID, dialerID))
stats.Record(ctx, mBytesReceived.M(num))
}

// errorCode returns an error code as given from the SQL Admin API, provided the
// error wraps a googleapi.Error type. If multiple error codes are returned from
// the API, then a comma-separated string of all codes is returned.
Expand Down
17 changes: 17 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package cloudsqlconn

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -159,6 +160,20 @@ func TestDialerWithMetrics(t *testing.T) {
if err != nil {
t.Fatalf("expected Dial to succeed, but got error: %v", err)
}
// write to conn to test bytes_sent and bytes_received
buf := &bytes.Buffer{}
err = buf.WriteByte('a')
if err != nil {
t.Fatalf("buf.WriteByte failed: %v", err)
}
_, err = conn2.Write(buf.Bytes())
if err != nil {
t.Fatalf("conn.Write failed: %v", err)
}
_, err = conn2.Read(buf.Bytes())
if err != nil {
t.Fatalf("conn.Read failed: %v", err)
}
defer conn2.Close()
// dial a bogus instance
_, err = d.Dial(context.Background(), "my-project:my-region:notaninstance")
Expand All @@ -172,6 +187,8 @@ func TestDialerWithMetrics(t *testing.T) {
wantLastValueMetric(t, "cloudsqlconn/open_connections", spy.data(), 2)
wantDistributionMetric(t, "cloudsqlconn/dial_latency", spy.data())
wantCountMetric(t, "cloudsqlconn/refresh_success_count", spy.data())
wantLastValueMetric(t, "cloudsqlconn/bytes_sent", spy.data(), 1)
wantLastValueMetric(t, "cloudsqlconn/bytes_received", spy.data(), 1)

// failure metrics from dialing bogus instance
wantCountMetric(t, "cloudsqlconn/dial_failure_count", spy.data())
Expand Down

0 comments on commit d0e493f

Please sign in to comment.