Skip to content

Commit

Permalink
Merge #32937
Browse files Browse the repository at this point in the history
32937: distsql: move protos to distsqlpb from distsqlrun r=jordanlewis a=jordanlewis

This creates a new package, distsqlpb, that contains the distsql API
protobufs, generated files, and support methods.

The new package permits other packages to depend on distsql protobufs
without having to depend on the distsql runtime.

Release note: None

Co-authored-by: Jordan Lewis <[email protected]>
  • Loading branch information
craig[bot] and jordanlewis committed Dec 7, 2018
2 parents 6d733ad + 3f06cf5 commit 24c4c04
Show file tree
Hide file tree
Showing 114 changed files with 3,373 additions and 3,181 deletions.
19 changes: 10 additions & 9 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlplan"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -108,32 +109,32 @@ func distChangefeedFlow(
for _, sp := range spanPartitions {
// TODO(dan): Merge these watches with the span-level resolved
// timestamps from the job progress.
watches := make([]distsqlrun.ChangeAggregatorSpec_Watch, len(sp.Spans))
watches := make([]distsqlpb.ChangeAggregatorSpec_Watch, len(sp.Spans))
for i, nodeSpan := range sp.Spans {
watches[i] = distsqlrun.ChangeAggregatorSpec_Watch{
watches[i] = distsqlpb.ChangeAggregatorSpec_Watch{
Span: nodeSpan,
InitialResolved: initialHighWater,
}
}

changeAggregatorProcs = append(changeAggregatorProcs, distsqlplan.Processor{
Node: sp.Node,
Spec: distsqlrun.ProcessorSpec{
Core: distsqlrun.ProcessorCoreUnion{
ChangeAggregator: &distsqlrun.ChangeAggregatorSpec{
Spec: distsqlpb.ProcessorSpec{
Core: distsqlpb.ProcessorCoreUnion{
ChangeAggregator: &distsqlpb.ChangeAggregatorSpec{
Watches: watches,
Feed: details,
},
},
Output: []distsqlrun.OutputRouterSpec{{Type: distsqlrun.OutputRouterSpec_PASS_THROUGH}},
Output: []distsqlpb.OutputRouterSpec{{Type: distsqlpb.OutputRouterSpec_PASS_THROUGH}},
},
})
}
// NB: This SpanFrontier processor depends on the set of tracked spans being
// static. Currently there is no way for them to change after the changefeed
// is created, even if it is paused and unpaused, but #28982 describes some
// ways that this might happen in the future.
changeFrontierSpec := distsqlrun.ChangeFrontierSpec{
changeFrontierSpec := distsqlpb.ChangeFrontierSpec{
TrackedSpans: trackedSpans,
Feed: details,
JobID: jobID,
Expand All @@ -151,8 +152,8 @@ func distChangefeedFlow(

p.AddSingleGroupStage(
gatewayNodeID,
distsqlrun.ProcessorCoreUnion{ChangeFrontier: &changeFrontierSpec},
distsqlrun.PostProcessSpec{},
distsqlpb.ProcessorCoreUnion{ChangeFrontier: &changeFrontierSpec},
distsqlpb.PostProcessSpec{},
changefeedResultTypes,
)

Expand Down
13 changes: 7 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand All @@ -32,7 +33,7 @@ type changeAggregator struct {
distsqlrun.ProcessorBase

flowCtx *distsqlrun.FlowCtx
spec distsqlrun.ChangeAggregatorSpec
spec distsqlpb.ChangeAggregatorSpec
memAcc mon.BoundAccount

// cancel shuts down the processor, both the `Next()` flow and the poller.
Expand Down Expand Up @@ -69,7 +70,7 @@ var _ distsqlrun.RowSource = &changeAggregator{}
func newChangeAggregatorProcessor(
flowCtx *distsqlrun.FlowCtx,
processorID int32,
spec distsqlrun.ChangeAggregatorSpec,
spec distsqlpb.ChangeAggregatorSpec,
output distsqlrun.RowReceiver,
) (distsqlrun.Processor, error) {
ctx := flowCtx.EvalCtx.Ctx()
Expand All @@ -81,7 +82,7 @@ func newChangeAggregatorProcessor(
}
if err := ca.Init(
ca,
&distsqlrun.PostProcessSpec{},
&distsqlpb.PostProcessSpec{},
nil, /* types */
flowCtx,
processorID,
Expand Down Expand Up @@ -281,7 +282,7 @@ type changeFrontier struct {
distsqlrun.ProcessorBase

flowCtx *distsqlrun.FlowCtx
spec distsqlrun.ChangeFrontierSpec
spec distsqlpb.ChangeFrontierSpec
memAcc mon.BoundAccount
a sqlbase.DatumAlloc

Expand Down Expand Up @@ -327,7 +328,7 @@ var _ distsqlrun.RowSource = &changeFrontier{}
func newChangeFrontierProcessor(
flowCtx *distsqlrun.FlowCtx,
processorID int32,
spec distsqlrun.ChangeFrontierSpec,
spec distsqlpb.ChangeFrontierSpec,
input distsqlrun.RowSource,
output distsqlrun.RowReceiver,
) (distsqlrun.Processor, error) {
Expand All @@ -341,7 +342,7 @@ func newChangeFrontierProcessor(
sf: makeSpanFrontier(spec.TrackedSpans...),
}
if err := cf.Init(
cf, &distsqlrun.PostProcessSpec{},
cf, &distsqlpb.PostProcessSpec{},
input.OutputTypes(),
flowCtx,
processorID,
Expand Down
9 changes: 5 additions & 4 deletions pkg/ccl/importccl/exportcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -136,7 +137,7 @@ func exportPlanHook(
}
}

out := distsqlrun.ProcessorCoreUnion{CSVWriter: &distsqlrun.CSVWriterSpec{
out := distsqlpb.ProcessorCoreUnion{CSVWriter: &distsqlpb.CSVWriterSpec{
Destination: file,
NamePattern: exportFilePatternDefault,
Options: csvOpts,
Expand Down Expand Up @@ -166,7 +167,7 @@ func exportPlanHook(
func newCSVWriterProcessor(
flowCtx *distsqlrun.FlowCtx,
processorID int32,
spec distsqlrun.CSVWriterSpec,
spec distsqlpb.CSVWriterSpec,
input distsqlrun.RowSource,
output distsqlrun.RowReceiver,
) (distsqlrun.Processor, error) {
Expand All @@ -177,7 +178,7 @@ func newCSVWriterProcessor(
input: input,
output: output,
}
if err := c.out.Init(&distsqlrun.PostProcessSpec{}, sql.ExportPlanResultTypes, flowCtx.NewEvalCtx(), output); err != nil {
if err := c.out.Init(&distsqlpb.PostProcessSpec{}, sql.ExportPlanResultTypes, flowCtx.NewEvalCtx(), output); err != nil {
return nil, err
}
return c, nil
Expand All @@ -186,7 +187,7 @@ func newCSVWriterProcessor(
type csvWriter struct {
flowCtx *distsqlrun.FlowCtx
processorID int32
spec distsqlrun.CSVWriterSpec
spec distsqlpb.CSVWriterSpec
input distsqlrun.RowSource
out distsqlrun.ProcOutputHelper
output distsqlrun.RowReceiver
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/importccl/read_import_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
Expand Down Expand Up @@ -342,7 +343,7 @@ var csvOutputTypes = []sqlbase.ColumnType{
func newReadImportDataProcessor(
flowCtx *distsqlrun.FlowCtx,
processorID int32,
spec distsqlrun.ReadImportDataSpec,
spec distsqlpb.ReadImportDataSpec,
output distsqlrun.RowReceiver,
) (distsqlrun.Processor, error) {
cp := &readImportDataProcessor{
Expand All @@ -352,7 +353,7 @@ func newReadImportDataProcessor(
output: output,
}

if err := cp.out.Init(&distsqlrun.PostProcessSpec{}, csvOutputTypes, flowCtx.NewEvalCtx(), output); err != nil {
if err := cp.out.Init(&distsqlpb.PostProcessSpec{}, csvOutputTypes, flowCtx.NewEvalCtx(), output); err != nil {
return nil, err
}
return cp, nil
Expand All @@ -369,7 +370,7 @@ type inputConverter interface {
type readImportDataProcessor struct {
flowCtx *distsqlrun.FlowCtx
processorID int32
spec distsqlrun.ReadImportDataSpec
spec distsqlpb.ReadImportDataSpec
out distsqlrun.ProcOutputHelper
output distsqlrun.RowReceiver
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/ccl/importccl/sst_writer_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand All @@ -46,7 +47,7 @@ var sstOutputTypes = []sqlbase.ColumnType{
func newSSTWriterProcessor(
flowCtx *distsqlrun.FlowCtx,
processorID int32,
spec distsqlrun.SSTWriterSpec,
spec distsqlpb.SSTWriterSpec,
input distsqlrun.RowSource,
output distsqlrun.RowReceiver,
) (distsqlrun.Processor, error) {
Expand All @@ -62,7 +63,7 @@ func newSSTWriterProcessor(
progress: spec.Progress,
db: flowCtx.EvalCtx.Txn.DB(),
}
if err := sp.out.Init(&distsqlrun.PostProcessSpec{}, sstOutputTypes, flowCtx.NewEvalCtx(), output); err != nil {
if err := sp.out.Init(&distsqlpb.PostProcessSpec{}, sstOutputTypes, flowCtx.NewEvalCtx(), output); err != nil {
return nil, err
}
return sp, nil
Expand All @@ -71,14 +72,14 @@ func newSSTWriterProcessor(
type sstWriter struct {
flowCtx *distsqlrun.FlowCtx
processorID int32
spec distsqlrun.SSTWriterSpec
spec distsqlpb.SSTWriterSpec
input distsqlrun.RowSource
out distsqlrun.ProcOutputHelper
output distsqlrun.RowReceiver
tempStorage diskmap.Factory
settings *cluster.Settings
registry *jobs.Registry
progress distsqlrun.JobProgress
progress distsqlpb.JobProgress
db *client.DB
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/server/status/statuspb"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
Expand Down Expand Up @@ -978,13 +978,13 @@ func parseGossipValues(gossipInfo *gossip.InfoStatus) (string, error) {
}
output = append(output, fmt.Sprintf("%q: %+v", key, healthAlert))
} else if strings.HasPrefix(key, gossip.KeyDistSQLNodeVersionKeyPrefix) {
var version distsqlrun.DistSQLVersionGossipInfo
var version distsqlpb.DistSQLVersionGossipInfo
if err := protoutil.Unmarshal(bytes, &version); err != nil {
return "", errors.Wrapf(err, "failed to parse value for key %q", key)
}
output = append(output, fmt.Sprintf("%q: %+v", key, version))
} else if strings.HasPrefix(key, gossip.KeyDistSQLDrainingPrefix) {
var drainingInfo distsqlrun.DistSQLDrainingInfo
var drainingInfo distsqlpb.DistSQLDrainingInfo
if err := protoutil.Unmarshal(bytes, &drainingInfo); err != nil {
return "", errors.Wrapf(err, "failed to parse value for key %q", key)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/debug"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -709,11 +709,11 @@ func TestGRPCAuthentication(t *testing.T) {
return err
}},
{"distSQL", func(ctx context.Context, conn *grpc.ClientConn) error {
stream, err := distsqlrun.NewDistSQLClient(conn).RunSyncFlow(ctx)
stream, err := distsqlpb.NewDistSQLClient(conn).RunSyncFlow(ctx)
if err != nil {
return err
}
_ = stream.Send(&distsqlrun.ConsumerSignal{})
_ = stream.Send(&distsqlpb.ConsumerSignal{})
_, err = stream.Recv()
return err
}},
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -532,7 +533,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
}

s.distSQLServer = distsqlrun.NewServer(ctx, distSQLCfg)
distsqlrun.RegisterDistSQLServer(s.grpc, s.distSQLServer)
distsqlpb.RegisterDistSQLServer(s.grpc, s.distSQLServer)

s.admin = newAdminServer(s)
s.status = newStatusServer(
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package sql
import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/types"
Expand Down Expand Up @@ -217,12 +218,12 @@ func (n *distinctNode) startExec(params runParams) error {
if err := input.startExec(params); err != nil {
return err
}
if err := input.InitWithOutput(&distsqlrun.PostProcessSpec{}, nil); err != nil {
if err := input.InitWithOutput(&distsqlpb.PostProcessSpec{}, nil); err != nil {
return err
}

post := &distsqlrun.PostProcessSpec{} // post is not used as we only use the processor for the core distinct logic.
var output distsqlrun.RowReceiver // output is never used as distinct is only run as a RowSource.
post := &distsqlpb.PostProcessSpec{} // post is not used as we only use the processor for the core distinct logic.
var output distsqlrun.RowReceiver // output is never used as distinct is only run as a RowSource.

proc, err := distsqlrun.NewDistinct(flowCtx, 0 /* processorID */, spec, input, post, output)
if err != nil {
Expand Down
Loading

0 comments on commit 24c4c04

Please sign in to comment.