Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec, coldata: add support for INTERVAL type #43517

Merged
merged 2 commits into from
Jan 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/col/coldata/random_testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/col/coltypes"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

Expand Down Expand Up @@ -105,6 +106,11 @@ func RandomVec(
loc := locations[rng.Intn(len(locations))]
timestamps[i] = timestamps[i].In(loc)
}
case coltypes.Interval:
intervals := vec.Interval()
for i := 0; i < n; i++ {
intervals[i] = duration.FromFloat64(rng.Float64())
}
default:
panic(fmt.Sprintf("unhandled type %s", typ))
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/col/coldata/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ func AssertEquivalentBatches(t testingT, expected, actual Batch) {
t.Fatalf("Timestamp mismatch at index %d:\nexpected:\n%sactual:\n%s", i, expectedTimestamp[i], resultTimestamp[i])
}
}
} else if typ == coltypes.Interval {
// Cannot use require.Equal for this type.
// TODO(yuzefovich): Again, why not?
expectedInterval := expectedVec.Interval()[0:expected.Length()]
resultInterval := actualVec.Interval()[0:actual.Length()]
require.Equal(t, len(expectedInterval), len(resultInterval))
for i := range expectedInterval {
if expectedInterval[i].Compare(resultInterval[i]) != 0 {
t.Fatalf("Interval mismatch at index %d:\nexpected:\n%sactual:\n%s", i, expectedInterval[i], resultInterval[i])
}
}
} else {
require.Equal(
t,
Expand Down
5 changes: 5 additions & 0 deletions pkg/col/coldata/unknown_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
"github.com/cockroachdb/cockroach/pkg/util/duration"
)

// unknown is a Vec that represents an unhandled type. Used when a batch needs a placeholder Vec.
Expand Down Expand Up @@ -58,6 +59,10 @@ func (u unknown) Timestamp() []time.Time {
panic("Vec is of unknown type and should not be accessed")
}

func (u unknown) Interval() []duration.Duration {
panic("Vec is of unknown type and should not be accessed")
}

func (u unknown) Col() interface{} {
panic("Vec is of unknown type and should not be accessed")
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/col/coldata/vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
"github.com/cockroachdb/cockroach/pkg/util/duration"
)

// column is an interface that represents a raw array of a Go native type.
Expand Down Expand Up @@ -79,6 +80,8 @@ type Vec interface {
Decimal() []apd.Decimal
// Timestamp returns a time.Time slice.
Timestamp() []time.Time
// Interval returns a duration.Duration slice.
Interval() []duration.Duration

// Col returns the raw, typeless backing storage for this Vec.
Col() interface{}
Expand Down Expand Up @@ -171,6 +174,8 @@ func NewMemColumn(t coltypes.T, n int) Vec {
return &memColumn{t: t, col: make([]apd.Decimal, n), nulls: nulls}
case coltypes.Timestamp:
return &memColumn{t: t, col: make([]time.Time, n), nulls: nulls}
case coltypes.Interval:
return &memColumn{t: t, col: make([]duration.Duration, n), nulls: nulls}
case coltypes.Unhandled:
return unknown{}
default:
Expand Down Expand Up @@ -218,6 +223,10 @@ func (m *memColumn) Timestamp() []time.Time {
return m.col.([]time.Time)
}

func (m *memColumn) Interval() []duration.Duration {
return m.col.([]duration.Duration)
}

func (m *memColumn) Col() interface{} {
return m.col
}
Expand Down Expand Up @@ -256,6 +265,8 @@ func (m *memColumn) Length() int {
return len(m.col.([]apd.Decimal))
case coltypes.Timestamp:
return len(m.col.([]time.Time))
case coltypes.Interval:
return len(m.col.([]duration.Duration))
default:
panic(fmt.Sprintf("unhandled type %s", m.t))
}
Expand All @@ -279,6 +290,8 @@ func (m *memColumn) SetLength(l int) {
m.col = m.col.([]apd.Decimal)[:l]
case coltypes.Timestamp:
m.col = m.col.([]time.Time)[:l]
case coltypes.Interval:
m.col = m.col.([]duration.Duration)[:l]
default:
panic(fmt.Sprintf("unhandled type %s", m.t))
}
Expand All @@ -302,6 +315,8 @@ func (m *memColumn) Capacity() int {
return cap(m.col.([]apd.Decimal))
case coltypes.Timestamp:
return cap(m.col.([]time.Time))
case coltypes.Interval:
return cap(m.col.([]duration.Duration))
default:
panic(fmt.Sprintf("unhandled type %s", m.t))
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/col/coldata/vec_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ import (
// {{/*
"github.com/cockroachdb/cockroach/pkg/sql/colexec/execgen"
// */}}
// HACK: crlfmt removes the "*/}}" comment if it's the last line in the import
// block. This was picked because it sorts after "pkg/sql/exec/execgen" and
// has no deps.
_ "github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/duration"
)

// {{/*
Expand All @@ -49,6 +46,9 @@ var _ apd.Decimal
// Dummy import to pull in "time" package.
var _ time.Time

// Dummy import to pull in "duration" package.
var _ duration.Duration

// */}}

func (m *memColumn) Append(args SliceArgs) {
Expand Down
20 changes: 19 additions & 1 deletion pkg/col/colserde/arrowbatchconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,17 @@ func randomBatch(allocator *colexec.Allocator) ([]coltypes.T, coldata.Batch) {
const maxTyps = 16
rng, _ := randutil.NewPseudoRand()

availableTyps := make([]coltypes.T, 0, len(coltypes.AllTypes))
for _, typ := range coltypes.AllTypes {
// TODO(yuzefovich): We do not support interval conversion yet.
if typ == coltypes.Interval {
continue
}
availableTyps = append(availableTyps, typ)
}
typs := make([]coltypes.T, rng.Intn(maxTyps)+1)
for i := range typs {
typs[i] = coltypes.AllTypes[rng.Intn(len(coltypes.AllTypes))]
typs[i] = availableTyps[rng.Intn(len(availableTyps))]
}

capacity := rng.Intn(int(coldata.BatchSize())) + 1
Expand All @@ -40,6 +48,16 @@ func randomBatch(allocator *colexec.Allocator) ([]coltypes.T, coldata.Batch) {
return typs, b
}

func TestArrowBatchConverterRejectsUnsupportedTypes(t *testing.T) {
defer leaktest.AfterTest(t)()

unsupportedTypes := []coltypes.T{coltypes.Interval}
for _, typ := range unsupportedTypes {
_, err := colserde.NewArrowBatchConverter([]coltypes.T{typ})
require.Error(t, err)
}
}

func TestArrowBatchConverterRandom(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
10 changes: 9 additions & 1 deletion pkg/col/colserde/record_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,17 @@ func TestRecordBatchSerializerSerializeDeserializeRandom(t *testing.T) {
dataLen = rng.Intn(maxDataLen) + 1
nullProbability = rng.Float64()
buf = bytes.Buffer{}
supportedTypes = make([]coltypes.T, 0, len(coltypes.AllTypes))
)

supportedTypes := coltypes.AllTypes
// We do not support intervals.
for _, t := range coltypes.AllTypes {
if t == coltypes.Interval {
continue
}
supportedTypes = append(supportedTypes, t)
}

for i := range typs {
typs[i] = supportedTypes[rng.Intn(len(supportedTypes))]
data[i] = randomDataFromType(rng, typs[i], dataLen, nullProbability)
Expand Down
7 changes: 4 additions & 3 deletions pkg/col/coltypes/t_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/col/coltypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/util/duration"
)

// T represents an exec physical type - a bytes representation of a particular
Expand All @@ -41,6 +42,8 @@ const (
Float64
// Timestamp is a column of type time.Time
Timestamp
// Interval is a column of type duration.Duration
Interval

// Unhandled is a temporary value that represents an unhandled type.
// TODO(jordan): this should be replaced by a panic once all types are
Expand Down Expand Up @@ -78,6 +81,7 @@ func init() {
CompatibleTypes[Int64] = append(CompatibleTypes[Int64], NumberTypes...)
CompatibleTypes[Float64] = append(CompatibleTypes[Float64], NumberTypes...)
CompatibleTypes[Timestamp] = append(CompatibleTypes[Timestamp], Timestamp)
CompatibleTypes[Interval] = append(CompatibleTypes[Interval], Interval)
}

// FromGoType returns the type for a Go value, if applicable. Shouldn't be used at
Expand All @@ -102,6 +106,8 @@ func FromGoType(v interface{}) T {
return Decimal
case time.Time:
return Timestamp
case duration.Duration:
return Interval
default:
panic(fmt.Sprintf("type %T not supported yet", t))
}
Expand All @@ -126,6 +132,8 @@ func (t T) GoTypeName() string {
return "float64"
case Timestamp:
return "time.Time"
case Interval:
return "duration.Duration"
default:
panic(fmt.Sprintf("unhandled type %d", t))
}
Expand Down
1 change: 1 addition & 0 deletions pkg/internal/sqlsmith/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ CREATE TABLE IF NOT EXISTS seed_vec AS
'2001-01-01'::DATE + g AS _date,
'2001-01-01'::TIMESTAMP + g * '1 day'::INTERVAL AS _timestamp,
'2001-01-01'::TIMESTAMPTZ + g * '1 day'::INTERVAL AS _timestamptz,
g * '1 day'::INTERVAL AS _interval,
g % 2 = 1 AS _bool,
g::DECIMAL AS _decimal,
g::STRING AS _string,
Expand Down
11 changes: 10 additions & 1 deletion pkg/sql/colcontainer/diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ func TestDiskQueue(t *testing.T) {
queueCfg, cleanup := colcontainer.NewTestingDiskQueueCfg(t, true /* inMem */)
defer cleanup()

availableTyps := make([]coltypes.T, 0, len(coltypes.AllTypes))
for _, typ := range coltypes.AllTypes {
// TODO(yuzefovich): We do not support interval serialization yet.
if typ == coltypes.Interval {
continue
}
availableTyps = append(availableTyps, typ)
}

rng, _ := randutil.NewPseudoRand()
for _, bufferSizeBytes := range []int{0, 16<<10 + rng.Intn(1<<20) /* 16 KiB up to 1 MiB */} {
for _, maxFileSizeBytes := range []int{10 << 10 /* 10 KiB */, 1<<20 + rng.Intn(64<<20) /* 1 MiB up to 64 MiB */} {
Expand All @@ -40,7 +49,7 @@ func TestDiskQueue(t *testing.T) {
// Create random input.
batches := make([]coldata.Batch, 0, 1+rng.Intn(2048))
op := colexec.NewRandomDataOp(testAllocator, rng, colexec.RandomDataOpArgs{
AvailableTyps: coltypes.AllTypes,
AvailableTyps: availableTyps,
NumBatches: cap(batches),
BatchSize: 1 + rng.Intn(int(coldata.BatchSize())),
Nulls: true,
Expand Down
13 changes: 13 additions & 0 deletions pkg/sql/colencoding/key_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -246,6 +247,14 @@ func decodeTableKeyToCol(
rkey, t, err = encoding.DecodeTimeDescending(key)
}
vec.Timestamp()[idx] = t
case types.IntervalFamily:
var d duration.Duration
if dir == sqlbase.IndexDescriptor_ASC {
rkey, d, err = encoding.DecodeDurationAscending(key)
} else {
rkey, d, err = encoding.DecodeDurationDescending(key)
}
vec.Interval()[idx] = d
default:
return rkey, false, errors.AssertionFailedf("unsupported type %+v", log.Safe(valType))
}
Expand Down Expand Up @@ -301,6 +310,10 @@ func UnmarshalColumnValueToCol(
var v time.Time
v, err = value.GetTime()
vec.Timestamp()[idx] = v
case types.IntervalFamily:
var v duration.Duration
v, err = value.GetDuration()
vec.Interval()[idx] = v
default:
return errors.AssertionFailedf("unsupported column type: %s", log.Safe(typ.Family()))
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/colencoding/value_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -97,6 +98,10 @@ func decodeUntaggedDatumToCol(vec coldata.Vec, idx uint16, t *types.T, buf []byt
var t time.Time
buf, t, err = encoding.DecodeUntaggedTimeValue(buf)
vec.Timestamp()[idx] = t
case types.IntervalFamily:
var d duration.Duration
buf, d, err = encoding.DecodeUntaggedDurationValue(buf)
vec.Interval()[idx] = d
default:
return buf, errors.AssertionFailedf(
"couldn't decode type: %s", log.Safe(t))
Expand Down
18 changes: 11 additions & 7 deletions pkg/sql/colexec/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/mon"
)

Expand Down Expand Up @@ -142,13 +143,14 @@ func (a *Allocator) Clear() {
}

const (
sizeOfBool = int(unsafe.Sizeof(true))
sizeOfInt16 = int(unsafe.Sizeof(int16(0)))
sizeOfInt32 = int(unsafe.Sizeof(int32(0)))
sizeOfInt64 = int(unsafe.Sizeof(int64(0)))
sizeOfFloat64 = int(unsafe.Sizeof(float64(0)))
sizeOfTime = int(unsafe.Sizeof(time.Time{}))
sizeOfUint16 = int(unsafe.Sizeof(uint16(0)))
sizeOfBool = int(unsafe.Sizeof(true))
sizeOfInt16 = int(unsafe.Sizeof(int16(0)))
sizeOfInt32 = int(unsafe.Sizeof(int32(0)))
sizeOfInt64 = int(unsafe.Sizeof(int64(0)))
sizeOfFloat64 = int(unsafe.Sizeof(float64(0)))
sizeOfTime = int(unsafe.Sizeof(time.Time{}))
sizeOfDuration = int(unsafe.Sizeof(duration.Duration{}))
sizeOfUint16 = int(unsafe.Sizeof(uint16(0)))
)

// sizeOfBatchSizeSelVector is the size (in bytes) of a selection vector of
Expand Down Expand Up @@ -196,6 +198,8 @@ func estimateBatchSizeBytes(vecTypes []coltypes.T, batchLength int) int {
// significantly overestimate.
// TODO(yuzefovich): figure out whether the caching does take place.
acc += sizeOfTime
case coltypes.Interval:
acc += sizeOfDuration
case coltypes.Unhandled:
// Placeholder coldata.Vecs of unknown types are allowed.
default:
Expand Down
Loading