Skip to content

Commit

Permalink
[data/docs] Key Concepts Page (#50129)
Browse files Browse the repository at this point in the history
## Why are these changes needed?

Refresher for #50022, but on a
separate page and a bit more holistic.

It's not tightly integrated into the other pages yet but I will do a
revision of quickstart/overview/data.rst pages.

## Related issue number

<!-- For example: "Closes #1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Richard Liaw <[email protected]>
Co-authored-by: Balaji Veeramani <[email protected]>
  • Loading branch information
richardliaw and bveeramani authored Feb 5, 2025
1 parent 3ac2417 commit 333155f
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 18 deletions.
3 changes: 2 additions & 1 deletion .vale/styles/config/vocabularies/Data/accept.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ autoscaler
[Aa]vro
[Bb]ackpressure
Dask
Data('s)?
[Dd]ata('s)?
[Dd]atasource
[Dd]iscretizer(s)?
dtype
Expand All @@ -16,6 +16,7 @@ Modin
[Mm]ultiget(s)?
ndarray(s)?
[Oo]utqueue(s)?
[Pp]ipelined
Predibase('s)?
[Pp]refetch
[Pp]refetching
Expand Down
35 changes: 18 additions & 17 deletions doc/source/data/data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Ray Data: Scalable Datasets for ML

Overview <overview>
quickstart
key-concepts
user-guide
examples
api/api
Expand Down Expand Up @@ -58,8 +59,7 @@ Learn more
**Quickstart**
^^^

Understand the key concepts behind Ray Data. Learn what
Datasets are and how they're used.
Get started with Ray Data with a simple example.

+++
.. button-ref:: data_quickstart
Expand All @@ -69,6 +69,22 @@ Learn more

Quickstart

.. grid-item-card::

**Key Concepts**
^^^

Learn the key concepts behind Ray Data. Learn what
Datasets are and how they're used.

+++
.. button-ref:: data_key_concepts
:color: primary
:outline:
:expand:

Key Concepts

.. grid-item-card::

**User Guides**
Expand Down Expand Up @@ -113,18 +129,3 @@ Learn more
:expand:

Read the API Reference

.. grid-item-card::

**Ray Blogs**
^^^

Get the latest on engineering updates from the Ray team and how companies are using Ray Data.

+++
.. button-link:: https://www.anyscale.com/blog?tag=ray-datasets
:color: primary
:outline:
:expand:

Read the Ray blogs
1 change: 1 addition & 0 deletions doc/source/data/images/dataset-arch-with-blocks.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions doc/source/data/images/get_execution_plan.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions doc/source/data/images/streaming-topology.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
148 changes: 148 additions & 0 deletions doc/source/data/key-concepts.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
.. _data_key_concepts:

Key Concepts
============


Datasets and blocks
-------------------

There are two main concepts in Ray Data:

* Datasets
* Blocks

`Dataset` is the main user-facing Python API. It represents a distributed data collection and define data loading and processing operations. Users typically use the API by:

1. Create a :class:`Dataset <ray.data.Dataset>` from external storage or in-memory data.
2. Apply transformations to the data.
3. Write the outputs to external storage or feed the outputs to training workers.

The Dataset API is lazy, meaning that operations aren't executed until you materialize or consume the dataset,
like :meth:`~ray.data.Dataset.show`. This allows Ray Data to optimize the execution plan
and execute operations in a pipelined streaming fashion.

Each *Dataset* consists of *blocks*. A *block* is a contiguous subset of rows from a dataset,
which are distributed across the cluster and processed independently in parallel.

The following figure visualizes a dataset with three blocks, each holding 1000 rows.
Ray Data holds the :class:`~ray.data.Dataset` on the process that triggers execution
(which is usually the entrypoint of the program, referred to as the :term:`driver`)
and stores the blocks as objects in Ray's shared-memory
:ref:`object store <objects-in-ray>`. Internally, Ray Data represents blocks with
Pandas Dataframes or Arrow tables.

.. image:: images/dataset-arch-with-blocks.svg
..
https://docs.google.com/drawings/d/1kOYQqHdMrBp2XorDIn0u0G_MvFj-uSA4qm6xf9tsFLM/edit
Operators and Plans
-------------------

Ray Data uses a two-phase planning process to execute operations efficiently. When you write a program using the Dataset API, Ray Data first builds a *logical plan* - a high-level description of what operations to perform. When execution begins, it converts this into a *physical plan* that specifies exactly how to execute those operations.

This diagram illustrates the complete planning process:

.. https://docs.google.com/drawings/d/1WrVAg3LwjPo44vjLsn17WLgc3ta2LeQGgRfE8UHrDA0/edit
.. image:: images/get_execution_plan.svg
:width: 600
:align: center

The building blocks of these plans are operators:

* Logical plans consist of *logical operators* that describe *what* operation to perform. For example, ``ReadOp`` specifies what data to read.
* Physical plans consist of *physical operators* that describe *how* to execute the operation. For example, ``TaskPoolMapOperator`` launches Ray tasks to actually read the data.

Here is a simple example of how Ray Data builds a logical plan. As you chain operations together, Ray Data constructs the logical plan behind the scenes:

.. testcode::
import ray

dataset = ray.data.range(100)
dataset = dataset.add_column("test", lambda x: x["id"] + 1)
dataset = dataset.select_columns("test")

You can inspect the resulting logical plan by printing the dataset:

.. code-block::
Project
+- MapBatches(add_column)
+- Dataset(schema={...})
When execution begins, Ray Data optimizes the logical plan, then translate it into a physical plan - a series of operators that implement the actual data transformations. During this translation:

1. A single logical operator may become multiple physical operators. For example, ``ReadOp`` becomes both ``InputDataBuffer`` and ``TaskPoolMapOperator``.
2. Both logical and physical plans go through optimization passes. For example, ``OperatorFusionRule`` combines map operators to reduce serialization overhead.

Physical operators work by:

* Taking in a stream of block references
* Performing their operation (either transforming data with Ray Tasks/Actors or manipulating references)
* Outputting another stream of block references

For more details on Ray Tasks and Actors, see :ref:`Ray Core Concepts <core-key-concepts>`.

.. note:: A dataset's execution plan only runs when you materialize or consume the dataset through operations like :meth:`~ray.data.Dataset.show`.

.. _streaming_execution_model:

Streaming execution model
-------------------------

Ray Data uses a *streaming execution model* to efficiently process large datasets.

Rather than materializing the entire dataset in memory at once,
Ray Data can process data in a streaming fashion through a pipeline of operations.

This is useful for inference and training workloads where the dataset can be too large to fit in memory and the workload doesn't require the entire dataset to be in memory at once.

Here is an example of how the streaming execution model works. The below code creates a dataset with 1K rows, applies a map and filter transformation, and then calls the ``show`` action to trigger the pipeline:

.. testcode::

import ray

# Create a dataset with 1K rows
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

# Define a pipeline of operations
ds = ds.map(lambda x: {"target1": x["target"] * 2})
ds = ds.map(lambda x: {"target2": x["target1"] * 2})
ds = ds.map(lambda x: {"target3": x["target2"] * 2})
ds = ds.filter(lambda x: x["target3"] % 4 == 0)

# Data starts flowing when you call a method like show()
ds.show(5)

This creates a logical plan like the following:

.. code-block::
Filter(<lambda>)
+- Map(<lambda>)
+- Map(<lambda>)
+- Map(<lambda>)
+- Dataset(schema={...})
The streaming topology looks like the following:

.. https://docs.google.com/drawings/d/10myFIVtpI_ZNdvTSxsaHlOhA_gHRdUde_aHRC9zlfOw/edit
.. image:: images/streaming-topology.svg
:width: 1000
:align: center

In the streaming execution model, operators are connected in a pipeline, with each operator's output queue feeding directly into the input queue of the next downstream operator. This creates an efficient flow of data through the execution plan.

The streaming execution model provides significant advantages for data processing.

In particular, the pipeline architecture enables multiple stages to execute concurrently, improving overall performance and resource utilization. For example, if the map operator requires GPU resources, the streaming execution model can execute the map operator concurrently with the filter operator (which may run on CPUs), effectively utilizing the GPU through the entire duration of the pipeline.

To summarize, Ray Data's streaming execution model can efficiently process datasets that are much larger than available memory while maintaining high performance through parallel execution across the cluster.

.. note::
Operations like :meth:`ds.sort() <ray.data.Dataset.sort>` and :meth:`ds.groupby() <ray.data.Dataset.groupby>` require materializing data, which may impact memory usage for very large datasets.

0 comments on commit 333155f

Please sign in to comment.