Skip to content

Commit

Permalink
Merge #73555 #73676
Browse files Browse the repository at this point in the history
73555: kvclient: fix bug in batches containing both get and rev scan requests r=nvanbenschoten,AlexTalks a=arulajmani

This patch fixes a bug which could cause Get requests to be dropped
when:
- They were in the same batch as a ReverseScan request.
- The key referenced in the Get request was to the left of the scan
bounds
- The key referenced in the Get request was on its own range.

This was because of a bug in `prev()`, which the distsender uses to
seek to the next range when scanning keys in reverse. A call to `prev()`
informally means that all requests intersecting with keys `>=k` have
already been executed. Here, we were incorrectly assuming that a point
request for `x` had already been served and skipping over it -- but this
isn't true when `x < k`. This patch ensures we no longer do that by
treating [x] = [x, x.next()) and letting the existing logic take care of the
rest.

Fixes #73370

Release note: None

73676: cli/start: use the same format for server details in multi-tenant r=stevendanna a=knz

cc @tbg who wanted this.

This ensures that the server details are printed using the same format
as regular nodes for multi-tenant SQL servers.

Before:

```
SQL server for tenant 123 listening at 127.0.0.1:26258, http at 127.0.0.1:8081
```

After:
```
CockroachDB SQL server starting at 2021-12-10 12:11:57.684183707 +0000 UTC (took 0.1s)
build:               CCL v21.2.0-alpha.00000000-6604-g6f243c67e2-dirty @  (go1.17.4)
webui:               http://kenax:8081
sql:                 postgresql://root@kenax:26258/defaultdb?sslmode=disable
sql (JDBC):          jdbc:postgresql://kenax:26258/defaultdb?sslmode=disable&user=root
RPC client flags:    ./cockroach <client cmd> --host=kenax:26258 --insecure
temp dir:            /data/home/kena/src/go/src/github.com/cockroachdb/cockroach/mt-data/cockroach-temp981776782
external I/O path:   <disabled>
store[0]:            path=/data/home/kena/src/go/src/github.com/cockroachdb/cockroach/mt-data
storage engine:      pebble
clusterID:           6622219f-89b7-4d0d-919f-22db8fa83dfd
tenantID:            123
instanceID:          3
```

Release note: None

Co-authored-by: arulajmani <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
3 people committed Dec 10, 2021
3 parents 4422323 + a4b34a5 + d7f8ebe commit e4ecb4c
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 102 deletions.
1 change: 1 addition & 0 deletions docs/generated/redact_safe.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pkg/roachpb/metadata.go | `ReplicaID`
pkg/roachpb/metadata.go | `ReplicaType`
pkg/roachpb/metadata.go | `StoreID`
pkg/roachpb/method.go | `Method`
pkg/roachpb/tenant.go | `TenantID`
pkg/rpc/connection_class.go | `ConnectionClass`
pkg/sql/catalog/descpb/structured.go | `ColumnID`
pkg/sql/catalog/descpb/structured.go | `ConstraintType`
Expand Down
11 changes: 9 additions & 2 deletions pkg/cli/mt_start_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/sdnotify"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -62,6 +63,8 @@ well unless it can be verified using a trusted root certificate store. That is,
}

func runStartSQL(cmd *cobra.Command, args []string) error {
tBegin := timeutil.Now()

// First things first: if the user wants background processing,
// relinquish the terminal ASAP by forking and exiting.
//
Expand Down Expand Up @@ -108,7 +111,7 @@ func runStartSQL(cmd *cobra.Command, args []string) error {

initGEOS(ctx)

sqlServer, addr, httpAddr, err := server.StartTenant(
sqlServer, _, _, err := server.StartTenant(
ctx,
stopper,
clusterName,
Expand Down Expand Up @@ -145,7 +148,11 @@ func runStartSQL(cmd *cobra.Command, args []string) error {
sqlServer.StartDiagnostics(ctx)
}

log.Infof(ctx, "SQL server for tenant %s listening at %s, http at %s", serverCfg.SQLConfig.TenantID, addr, httpAddr)
// Report the server identifiers and other server details
// in the same format as 'cockroach start'.
if err := reportServerInfo(ctx, tBegin, &serverCfg, st, false /* isHostNode */, false /* initialStart */); err != nil {
return err
}

// TODO(tbg): make the other goodies in `./cockroach start` reusable, such as
// logging to files, periodic memory output, heap and goroutine dumps, debug
Expand Down
206 changes: 123 additions & 83 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,93 +676,15 @@ If problems persist, please see %s.`
return err
}

// Now inform the user that the server is running and tell the
// user about its run-time derived parameters.
var buf redact.StringBuilder
info := build.GetInfo()
buf.Printf("CockroachDB node starting at %s (took %0.1fs)\n", timeutil.Now(), timeutil.Since(tBegin).Seconds())
buf.Printf("build:\t%s %s @ %s (%s)\n",
redact.Safe(info.Distribution), redact.Safe(info.Tag), redact.Safe(info.Time), redact.Safe(info.GoVersion))
buf.Printf("webui:\t%s\n", serverCfg.AdminURL())

// (Re-)compute the client connection URL. We cannot do this
// earlier (e.g. above, in the runStart function) because
// at this time the address and port have not been resolved yet.
sCtx := rpc.MakeSecurityContext(serverCfg.Config, security.ClusterTLSSettings(serverCfg.Settings), roachpb.SystemTenantID)
pgURL, err := sCtx.PGURL(url.User(security.RootUser))
if err != nil {
log.Ops.Errorf(ctx, "failed computing the URL: %v", err)
return err
}
buf.Printf("sql:\t%s\n", pgURL.ToPQ())
buf.Printf("sql (JDBC):\t%s\n", pgURL.ToJDBC())

buf.Printf("RPC client flags:\t%s\n", clientFlagsRPC())
if len(serverCfg.SocketFile) != 0 {
buf.Printf("socket:\t%s\n", serverCfg.SocketFile)
}
logNum := 1
_ = cliCtx.logConfig.IterateDirectories(func(d string) error {
if logNum == 1 {
// Backward-compatibility.
buf.Printf("logs:\t%s\n", d)
} else {
buf.Printf("logs[%d]:\t%s\n", logNum, d)
}
logNum++
return nil
})
if serverCfg.Attrs != "" {
buf.Printf("attrs:\t%s\n", serverCfg.Attrs)
}
if len(serverCfg.Locality.Tiers) > 0 {
buf.Printf("locality:\t%s\n", serverCfg.Locality)
}
if s.TempDir() != "" {
buf.Printf("temp dir:\t%s\n", s.TempDir())
}
if ext := s.ClusterSettings().ExternalIODir; ext != "" {
buf.Printf("external I/O path: \t%s\n", ext)
} else {
buf.Printf("external I/O path: \t<disabled>\n")
}
for i, spec := range serverCfg.Stores.Specs {
buf.Printf("store[%d]:\t%s\n", i, spec)
}
buf.Printf("storage engine: \t%s\n", &serverCfg.StorageEngine)
// Remember the server identifiers for logging.
// TODO(knz): Remove this.
nodeID := s.NodeID()
if initialStart {
if nodeID == kvserver.FirstNodeID {
buf.Printf("status:\tinitialized new cluster\n")
} else {
buf.Printf("status:\tinitialized new node, joined pre-existing cluster\n")
}
} else {
buf.Printf("status:\trestarted pre-existing node\n")
}

if baseCfg.ClusterName != "" {
buf.Printf("cluster name:\t%s\n", baseCfg.ClusterName)
}

// Remember the cluster ID for log file rotation.
clusterID := s.ClusterID().String()
log.SetNodeIDs(clusterID, int32(nodeID))
buf.Printf("clusterID:\t%s\n", clusterID)
buf.Printf("nodeID:\t%d\n", nodeID)

// Collect the formatted string and show it to the user.
msg, err := expandTabsInRedactableBytes(buf.RedactableBytes())
if err != nil {
return err
}
msgS := msg.ToString()
log.Ops.Infof(ctx, "node startup completed:\n%s", msgS)
if !startCtx.inBackground && !log.LoggingToStderr(severity.INFO) {
fmt.Print(msgS.StripMarkers())
}

return nil
// Now inform the user that the server is running and tell the
// user about its run-time derived parameters.
return reportServerInfo(ctx, tBegin, &serverCfg, s.ClusterSettings(), true /* isHostNode */, initialStart)
}(); err != nil {
errChan <- err
}
Expand Down Expand Up @@ -958,6 +880,124 @@ If problems persist, please see %s.`
return returnErr
}

// reportServerInfo prints out the server version and network details
// in a standardized format.
func reportServerInfo(
ctx context.Context,
startTime time.Time,
serverCfg *server.Config,
st *cluster.Settings,
isHostNode, initialStart bool,
) error {
srvS := redact.SafeString("SQL server")
if isHostNode {
srvS = "node"
}

var buf redact.StringBuilder
info := build.GetInfo()
buf.Printf("CockroachDB %s starting at %s (took %0.1fs)\n", srvS, timeutil.Now(), timeutil.Since(startTime).Seconds())
buf.Printf("build:\t%s %s @ %s (%s)\n",
redact.Safe(info.Distribution), redact.Safe(info.Tag), redact.Safe(info.Time), redact.Safe(info.GoVersion))
buf.Printf("webui:\t%s\n", serverCfg.AdminURL())

// (Re-)compute the client connection URL. We cannot do this
// earlier (e.g. above, in the runStart function) because
// at this time the address and port have not been resolved yet.
sCtx := rpc.MakeSecurityContext(serverCfg.Config, security.ClusterTLSSettings(serverCfg.Settings), roachpb.SystemTenantID)
pgURL, err := sCtx.PGURL(url.User(security.RootUser))
if err != nil {
log.Ops.Errorf(ctx, "failed computing the URL: %v", err)
return err
}
buf.Printf("sql:\t%s\n", pgURL.ToPQ())
buf.Printf("sql (JDBC):\t%s\n", pgURL.ToJDBC())

buf.Printf("RPC client flags:\t%s\n", clientFlagsRPC())
if len(serverCfg.SocketFile) != 0 {
buf.Printf("socket:\t%s\n", serverCfg.SocketFile)
}
logNum := 1
_ = cliCtx.logConfig.IterateDirectories(func(d string) error {
if logNum == 1 {
// Backward-compatibility.
buf.Printf("logs:\t%s\n", d)
} else {
buf.Printf("logs[%d]:\t%s\n", logNum, d)
}
logNum++
return nil
})
if serverCfg.Attrs != "" {
buf.Printf("attrs:\t%s\n", serverCfg.Attrs)
}
if len(serverCfg.Locality.Tiers) > 0 {
buf.Printf("locality:\t%s\n", serverCfg.Locality)
}
if tmpDir := serverCfg.SQLConfig.TempStorageConfig.Path; tmpDir != "" {
buf.Printf("temp dir:\t%s\n", tmpDir)
}
if ext := st.ExternalIODir; ext != "" {
buf.Printf("external I/O path: \t%s\n", ext)
} else {
buf.Printf("external I/O path: \t<disabled>\n")
}
for i, spec := range serverCfg.Stores.Specs {
buf.Printf("store[%d]:\t%s\n", i, spec)
}
buf.Printf("storage engine: \t%s\n", &serverCfg.StorageEngine)

// Print the commong server identifiers.
if baseCfg.ClusterName != "" {
buf.Printf("cluster name:\t%s\n", baseCfg.ClusterName)
}
clusterID := serverCfg.BaseConfig.ClusterIDContainer.Get().String()
buf.Printf("clusterID:\t%s\n", clusterID)

nodeID := serverCfg.BaseConfig.IDContainer.Get()
if isHostNode {
if initialStart {
if nodeID == kvserver.FirstNodeID {
buf.Printf("status:\tinitialized new cluster\n")
} else {
buf.Printf("status:\tinitialized new node, joined pre-existing cluster\n")
}
} else {
buf.Printf("status:\trestarted pre-existing node\n")
}
// Report the server identifiers.
buf.Printf("nodeID:\t%d\n", nodeID)
} else {
// Report the SQL server identifiers.
buf.Printf("tenantID:\t%s\n", serverCfg.SQLConfig.TenantID)
buf.Printf("instanceID:\t%d\n", nodeID)

if kvAddrs := serverCfg.SQLConfig.TenantKVAddrs; len(kvAddrs) > 0 {
// Report which KV servers are connected.
buf.Printf("KV addresses:\t")
comma := redact.SafeString("")
for _, addr := range serverCfg.SQLConfig.TenantKVAddrs {
buf.Printf("%s%s", comma, addr)
comma = ", "
}
buf.Printf("\n")
}
}

// Collect the formatted string and show it to the user.
msg, err := expandTabsInRedactableBytes(buf.RedactableBytes())
if err != nil {
return err
}
msgS := msg.ToString()
log.Ops.Infof(ctx, "%s startup completed:\n%s", srvS, msgS)
if !startCtx.inBackground && !log.LoggingToStderr(severity.INFO) {
fmt.Print(msgS.StripMarkers())
}

return nil
}

// expandTabsInRedactableBytes expands tabs in the redactable byte
// slice, so that columns are aligned. The correctness of this
// function depends on the assumption that the `tabwriter` does not
Expand Down
21 changes: 11 additions & 10 deletions pkg/kv/kvclient/kvcoord/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,19 @@ func prev(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) {
}
endKey := h.EndKey
if len(endKey) == 0 {
// This is unintuitive, but if we have a point request at `x=k` then that request has
// already been satisfied (since the batch has already been executed for all keys `>=
// k`). We treat `k` as `[k,k)` which does the right thing below. It also does when `x >
// k` and `x < k`, so we're good.
// If we have a point request for `x < k` then that request has not been
// satisfied (since the batch has only been executed for keys `>=k`). We
// treat `x` as `[x, x.Next())` which does the right thing below. This
// also works when `x > k` or `x=k` as the logic below will skip `x`.
//
// Note that if `x` is /Local/k/something, then AddrUpperBound below will turn it into
// `k\x00`, and so we're looking at the key range `[k, k\x00)`. This is exactly what we
// want since otherwise the result would be `k` and so the caller would restrict itself
// to `key < k`, but that excludes `k` itself and thus all local keys attached to it.
// Note that if the key is /Local/x/something, then instead of using
// /Local/x/something.Next() as the end key, we rely on AddrUpperBound to
// handle local keys. In particular, AddrUpperBound will turn it into
// `x\x00`, so we're looking at the key-range `[x, x.Next())`. This is
// exactly what we want as the local key is contained in that range.
//
// See TestBatchPrevNext for a test case with commentary.
endKey = h.Key
// See TestBatchPrevNext for test cases with commentary.
endKey = h.Key.Next()
}
eAddr, err := keys.AddrUpperBound(endKey)
if err != nil {
Expand Down
34 changes: 29 additions & 5 deletions pkg/kv/kvclient/kvcoord/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ func TestBatchPrevNext(t *testing.T) {
expFW: max,
expBW: min,
},
{
spans: span("a"), key: "a",
// Done with `key < a`, so `a <= key` is next.
expFW: "a",
// Done with `key >= a`, and there's nothing in `[min, a]`, so we return
// min here.
expBW: min,
},
{spans: span("a", "b", "c", "d"), key: "c",
// Done with `key < c`, so `c <= key` next.
expFW: "c",
Expand Down Expand Up @@ -87,19 +95,24 @@ func TestBatchPrevNext(t *testing.T) {
{spans: abc, key: "b",
// We've seen `key < b` so we still need to hit `b`.
expFW: "b",
// We've seen `key >= b`, so hop over the gap to `a`. Similar to the
// first test case.
expBW: "a",
// We've seen `key >=b`, so hop over the gap to `a`, similar to the first
// test case. [`a`] is represented as [`a`,`a.Next()`), so we expect prev
// to return a.Next() here..
expBW: "a\x00",
},
{spans: abc, key: "b\x00",
// No surprises.
expFW: "c",
expBW: "b",
// Similar to the explanation above, we expect [`b`] = [`b`, `b.Next()`)
// here.
expBW: "b\x00",
},
{spans: abc, key: "bb",
// Ditto.
expFW: "c",
expBW: "b",
// Similar to the explanation above, we expect [`b`] = [`b`, `b.Next()`)
// here.
expBW: "b\x00",
},

// Multiple candidates. No surprises, just a sanity check.
Expand Down Expand Up @@ -149,6 +162,17 @@ func TestBatchPrevNext(t *testing.T) {
// contain `loc(b)`.
expBW: "b\x00",
},
{
spans: span(loc("a"), "", loc("b"), ""), key: "b",
// We've dealt with any key that addresses to `< b`, and `loc(b)` is not
// covered by it. `loc(b)` lives between `b` and `b\x00`, so we start at
// `b`.
expFW: "b",
// We've dealt with any key that addresses to `>= b`, which includes
// `loc(b)`. The next thing cover is `loc(a)`, which lives between `a`
// and `a\x00`, so we return `a\x00` here.
expBW: "a\x00",
},

// Multiple candidates. No surprises, just a sanity check.
{spans: span(loc("a"), loc("b"), loc("c"), loc("d")), key: "e",
Expand Down
2 changes: 0 additions & 2 deletions pkg/kv/kvnemesis/kvnemesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func init() {

func TestKVNemesisSingleNode(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 73373, "flaky test")
defer log.Scope(t).Close(t)
skip.UnderRace(t)

Expand Down Expand Up @@ -72,7 +71,6 @@ func TestKVNemesisSingleNode(t *testing.T) {

func TestKVNemesisMultiNode(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 73370, "flaky test")
defer log.Scope(t).Close(t)
skip.UnderRace(t)

Expand Down
Loading

0 comments on commit e4ecb4c

Please sign in to comment.