Skip to content

Commit

Permalink
Merge pull request cockroachdb#8 from RaduBerinde/dist-sql-rfc-work
Browse files Browse the repository at this point in the history
Addressing Andrei's last set of comments from PR #6
  • Loading branch information
RaduBerinde committed Apr 12, 2016
2 parents 07444ee + 31fd49e commit 5bacdb2
Showing 1 changed file with 41 additions and 29 deletions.
70 changes: 41 additions & 29 deletions docs/RFCS/distributed_sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,14 @@ record (row or index entry).

## Logical model and logical plans


This is a description of a logical framework for describing logical plans. It
is a conceptual model and *not* a description of what actually happens in our
system.
We compile SQL into a *logical plan* (similar on the surface to the current
`planNode` tree) which represents the abstract data flow through computation
stages. The logical plan is agnostic to the way data is partitioned and
distributed in the cluster; however, it contains enough information about the
structure of the planned computation to allow us to exploit data parallelism
later - in a subsequent phase, the logical plan will be converted into a
*physical plan*, which maps the abstract computation and data flow to concrete
data processors and communication channels between them.

The logical plan is made up **aggregators**. Each aggregator consumes an **input
stream** of rows (or more streams for joins, but let's leave that aside for now)
Expand All @@ -186,13 +190,13 @@ columns and types, with each row having a datum for each column. Again, we
emphasize that the streams are a logical concept and might not map to a single
data stream in the actual computation.
We introduce the concept of **grouping** to characterize a specific aspect of the
computation that happens inside an aggregator. The groups are defined based on a
**group key**, which is a subset of columns in the input stream. The computation
that happens for each group is independent of the data in the other groups and
the aggregator emits the results for each group. The ordering between group
results in the output stream is not fixed - some aggregators may guarantee a
certain ordering, others may not.
We introduce the concept of **grouping** to characterize a specific aspect of
the computation that happens inside an aggregator. The groups are defined based
on a **group key**, which is a subset of the columns in the input stream schema.
The computation that happens for each group is independent of the data in the
other groups, and the aggregator emits a concatenation of the results for all
the groups. The ordering between group results in the output stream is not
fixed - some aggregators may guarantee a certain ordering, others may not.
More precisely, we can define the computation in an aggregator using a function
`agg` that takes a sequence of input rows that are in a single group (same group
Expand Down Expand Up @@ -227,8 +231,9 @@ A special type of aggregator is the **program** aggregator which is a
"programmable" aggregator which processes the input stream sequentially (one
element at a time), potentially emitting output elements. This is an aggregator
with no grouping (group key is the full set of columns); the processing of each
data element is independent. A program can be used, for example, to generate new
values from arbitrary expressions (like the `a+b` in `SELECT a+b FROM ..`).
row independent. A program can be used, for example, to generate new values from
arbitrary expressions (like the `a+b` in `SELECT a+b FROM ..`); or to filter
rows according to a predicate.
Special **table reader** aggregators with no inputs are used as data sources; a
table reader can be configured to output only certain columns, as needed.
Expand All @@ -237,12 +242,14 @@ query/statement.
Some aggregators (final, limit) have an **ordering requirement** on the input
stream (a list of columns with corresponding ascending/descending requirements).
Some aggregators (like table readers) can guarantee a certain ordering on their output
stream, called an **ordering guarantee** (same as the `orderingInfo` in the
current code). All aggregators have an associated **ordering
characterization** function that maps an ordering guarantee on the input stream
into an ordering guarantee for the output stream - meaning that if the input is
ordered according to the input guarantee, the output guarantee will hold.
Some aggregators (like table readers) can guarantee a certain ordering on their
output stream, called an **ordering guarantee** (same as the `orderingInfo` in
the current code). All aggregators have an associated **ordering
characterization** function `ord(input_order) -> output_order` that maps
`input_order` (an ordering guarantee on the input stream) into `output_order`
(an ordering guarantee for the output stream) - meaning that if the rows in the
input stream are ordered according to `input_order`, then the rows in the output
stream will be ordered according to `output_order`.
The ordering guarantee of the table readers along with the characterization
functions can be used to propagate ordering information across the logical plan.
Expand Down Expand Up @@ -374,6 +381,9 @@ There is also the possibility that `summer` uses an ordered map, in which case
it will always output the results in age order; that would mean we are always in
case 1 above, regardless of the ordering of `src`.
TODO add a section on back-propagation of ordering requirements to help choose
the desired ordering characterization function for `summer` (i.e. only use an
ordered map if the order it provides is useful).
### Example 3
Expand Down Expand Up @@ -498,7 +508,7 @@ This logical plan above could be instantiated as the following physical plan:
Each box in the physical plan is a *processor*:
- `src` is a table reader and performs KV Get operations and forms rows; it is
programmed to read the spans that belong to the respective node. It evaluates
the `Date > 2015` filter before outputting data elements.
the `Date > 2015` filter before outputting rows.
- `summer-stage1` is the first stage of the `summer` aggregator; its purpose is
to do the aggregation it can do locally and distribute the partial results to
the `summer-stage2` processes, such that all values for a certain group key
Expand Down Expand Up @@ -528,7 +538,7 @@ Processors are generally made up of three components:
1. The *input synchronizer* merges the input streams into a single stream of
data. Types:
* single-input (pass-through)
* unsynchronized: passes data elements from all input streams, arbitrarily
* unsynchronized: passes rows from all input streams, arbitrarily
interleaved.
* ordered: the input physical streams have an ordering guarantee (namely the
guarantee of the correspondig locical stream); the synchronizer is careful
Expand All @@ -540,8 +550,8 @@ Processors are generally made up of three components:
3. The *output router* splits the data processor's output to multiple streams;
types:
* single-output (pass-through)
* mirror: every data element is sent to all output streams
* hashing: each data element goes to a single output stream, chosen according
* mirror: every row is sent to all output streams
* hashing: each row goes to a single output stream, chosen according
to a hash function applied on certain elements of the data tuples.
* by range: TODO (for index-join)

Expand Down Expand Up @@ -701,10 +711,11 @@ KV integration involves three aspects:

1. Range information lookup

At the physical planning stage we need to map key spans into ranges and
determine who is the master for each range. The KV layer already caches this
kind of information, though we may need to be more aggressive in terms of how
much information we retain and how frequently we update it.
At the physical planning stage we need to break up key spans into ranges and
determine who is the leader for each range. The KV layer already has a range
cache that maintains this information, but we will need to make changes to be
more aggressive in terms of how much information we maintain, and how we
invalidate/update it.

2. Distributed reads

Expand All @@ -721,7 +732,8 @@ KV integration involves three aspects:
pass this information to the KV layer. There are also likely various cases
where checking for error cases must be relaxed.

The details of all these need to be further investigated. Only 1 and 2 are required for M1; 3 is required for M2.
The details of all these need to be further investigated. Only 1 and 2 are
required for M1; 3 is required for M2.

# Alternatives

Expand Down Expand Up @@ -825,7 +837,7 @@ This general approach can be used for distributed SQL operations as well as
remote-side filtering and updates. The main drawback of this approach is that it
is very general and not prescriptive on how to build reusable pieces of
functionality. It is not clear how we could break apart the work in modular
piece, and it has the potential of evolving into a monster of unmanageable
pieces, and it has the potential of evolving into a monster of unmanageable
complexity.


Expand Down

0 comments on commit 5bacdb2

Please sign in to comment.