Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: daft documentation v2 #3595

Merged
merged 12 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ repos:
)$
- id: check-yaml
exclude: kubernetes-ops
args:
- --unsafe
- id: pretty-format-json
exclude: |
(?x)^(
Expand Down
1,797 changes: 1,797 additions & 0 deletions docs-v2/10min.ipynb

Large diffs are not rendered by default.

72 changes: 72 additions & 0 deletions docs-v2/advanced/distributed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Distributed Computing

By default, Daft runs using your local machine's resources and your operations are thus limited by the CPUs, memory and GPUs available to you in your single local development machine.

However, Daft has strong integrations with [Ray](https://www.ray.io) which is a distributed computing framework for distributing computations across a cluster of machines. Here is a snippet showing how you can connect Daft to a Ray cluster:

=== "🐍 Python"

```python
import daft

daft.context.set_runner_ray()
```

By default, if no address is specified Daft will spin up a Ray cluster locally on your machine. If you are running Daft on a powerful machine (such as an AWS P3 machine which is equipped with multiple GPUs) this is already very useful because Daft can parallelize its execution of computation across your CPUs and GPUs. However, if instead you already have your own Ray cluster running remotely, you can connect Daft to it by supplying an address:

=== "🐍 Python"

```python
daft.context.set_runner_ray(address="ray://url-to-mycluster")
```

For more information about the `address` keyword argument, please see the [Ray documentation on initialization](https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html).


If you want to start a single node ray cluster on your local machine, you can do the following:

```bash
> pip install ray[default]
> ray start --head --port=6379
```

This should output something like:

```
Usage stats collection is enabled. To disable this, add `--disable-usage-stats` to the command that starts the cluster, or run the following command: `ray disable-usage-stats` before starting the cluster. See https://docs.ray.io/en/master/cluster/usage-stats.html for more details.

Local node IP: 127.0.0.1

--------------------
Ray runtime started.
--------------------

...
```

You can take the IP address and port and pass it to Daft:

=== "🐍 Python"

```python
>>> import daft
>>> daft.context.set_runner_ray("127.0.0.1:6379")
DaftContext(_daft_execution_config=<daft.daft.PyDaftExecutionConfig object at 0x100fbd1f0>, _daft_planning_config=<daft.daft.PyDaftPlanningConfig object at 0x100fbd270>, _runner_config=_RayRunnerConfig(address='127.0.0.1:6379', max_task_backlog=None), _disallow_set_runner=True, _runner=None)
>>> df = daft.from_pydict({
... 'text': ['hello', 'world']
... })
2024-07-29 15:49:26,610 INFO worker.py:1567 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2024-07-29 15:49:26,622 INFO worker.py:1752 -- Connected to Ray cluster.
>>> print(df)
╭───────╮
│ text │
│ --- │
│ Utf8 │
╞═══════╡
│ hello │
├╌╌╌╌╌╌╌┤
│ world │
╰───────╯

(Showing first 2 of 2 rows)
```
66 changes: 66 additions & 0 deletions docs-v2/advanced/memory.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Managing Memory Usage

Managing memory usage and avoiding out-of-memory (OOM) issues while still maintaining efficient throughput is one of the biggest challenges when building resilient big data processing system!

This page is a walkthrough on how Daft handles such situations and possible remedies available to users when you encounter such situations.

## Out-of-core Processing

Daft supports [out-of-core data processing](https://en.wikipedia.org/wiki/External_memory_algorithm) when running on the Ray runner by leveraging Ray's object spilling capabilities.

This means that when the total amount of data in Daft gets too large, Daft will spill data onto disk. This slows down the overall workload (because data now needs to be written to and read from disk) but frees up space in working memory for Daft to continue executing work without causing an OOM.

You will be alerted when spilling is occurring by log messages that look like this:

```
(raylet, ip=xx.xx.xx.xx) Spilled 16920 MiB, 9 objects, write throughput 576 MiB/s.
...
```

**Troubleshooting**

Spilling to disk is a mechanism that Daft uses to ensure workload completion in an environment where there is insufficient memory, but in some cases this can cause issues.

1. If your cluster is extremely aggressive with spilling (e.g. spilling hundreds of gigabytes of data) it can be possible that your machine may eventually run out of disk space and be killed by your cloud provider

2. Overly aggressive spilling can also cause your overall workload to be much slower

There are some things you can do that will help with this.

1. Use machines with more available memory per-CPU to increase each Ray worker's available memory (e.g. [AWS EC2 r5 instances](https://aws.amazon.com/ec2/instance-types/r5/)

2. Use more machines in your cluster to increase overall cluster memory size

3. Use machines with attached local nvme SSD drives for higher throughput when spilling (e.g. AWS EC2 r5d instances)

For more troubleshooting, you may also wish to consult the [Ray documentation's recommendations for object spilling](https://docs.ray.io/en/latest/ray-core/objects/object-spilling.html).

## Dealing with out-of-memory (OOM) errors

While Daft is built to be extremely memory-efficient, there will inevitably be situations in which it has poorly estimated the amount of memory that it will require for a certain operation, or simply cannot do so (for example when running arbitrary user-defined Python functions).

Even with object spilling enabled, you may still sometimes see errors indicating OOMKill behavior on various levels such as your operating system, Ray or a higher-level cluster orchestrator such as Kubernetes:

1. On the local PyRunner, you may see that your operating system kills the process with an error message `OOMKilled`.

2. On the RayRunner, you may notice Ray logs indicating that workers are aggressively being killed by the Raylet with log messages such as: `Workers (tasks / actors) killed due to memory pressure (OOM)`

3. If you are running in an environment such as Kubernetes, you may notice that your pods are being killed or restarted with an `OOMKill` reason

These OOMKills are often recoverable (Daft-on-Ray will take care of retrying work after reviving the workers), however they may often significantly affect the runtime of your workload or if we simply cannot recover, fail the workload entirely.

**Troubleshooting**

There are some options available to you.

1. Use machines with more available memory per-CPU to increase each Ray worker's available memory (e.g. AWS EC2 r5 instances)

2. Use more machines in your cluster to increase overall cluster memory size

3. Aggressively filter your data so that Daft can avoid reading data that it does not have to (e.g. `df.where(...)`)

4. Request more memory for your UDFs (see [Resource Requests](../core_concepts/udf.md#resource-requests) if your UDFs are memory intensive (e.g. decompression of data, running large matrix computations etc)

5. Increase the number of partitions in your dataframe (hence making each partition smaller) using something like: `df.into_partitions(df.num_partitions() * 2)`

If your workload continues to experience OOM issues, perhaps Daft could be better estimating the required memory to run certain steps in your workload. Please contact Daft developers on our forums!
113 changes: 113 additions & 0 deletions docs-v2/advanced/partitioning.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Partitioning

Daft is a **distributed** dataframe. This means internally, data is represented as partitions which are then spread out across your system.

## Why do we need partitions?

When running in a distributed settings (a cluster of machines), Daft spreads your dataframe's data across these machines. This means that your workload is able to efficiently utilize all the resources in your cluster because each machine is able to work on its assigned partition(s) independently.

Additionally, certain global operations in a distributed setting requires data to be partitioned in a specific way for the operation to be correct, because all the data matching a certain criteria needs to be on the same machine and in the same partition. For example, in a groupby-aggregation Daft needs to bring together all the data for a given key into the same partition before it can perform a definitive local groupby-aggregation which is then globally correct. Daft refers to this as a "clustering specification", and you are able to see this in the plans that it constructs as well.

!!! note "Note"

When running locally on just a single machine, Daft is currently still using partitioning as well. This is still useful for controlling parallelism and how much data is being materialized at a time.

However, Daft's new experimental execution engine will remove the concept of partitioning entirely for local execution. You may enable it with `DAFT_RUNNER=native`. Instead of using partitioning to control parallelism, this new execution engine performs a streaming-based execution on small "morsels" of data, which provides much more stable memory utilization while improving the user experience with not having to worry about partitioning.

This user guide helps you think about how to correctly partition your data to improve performance as well as memory stability in Daft.

General rule of thumb:

1. **Have Enough Partitions**: our general recommendation for high throughput and maximal resource utilization is to have *at least* `2 x TOTAL_NUM_CPUS` partitions, which allows Daft to fully saturate your CPUs.

2. **More Partitions**: if you are observing memory issues (excessive spilling or out-of-memory (OOM) issues) then you may choose to increase the number of partitions. This increases the amount of overhead in your system, but improves overall memory stability (since each partition will be smaller).

3. **Fewer Partitions**: if you are observing a large amount of overhead (e.g. if you observe that shuffle operations such as joins and sorts are taking too much time), then you may choose to decrease the number of partitions. This decreases the amount of overhead in the system, at the cost of using more memory (since each partition will be larger).

!!! tip "See Also"

[Managing Memory Usage](memory.md) - a guide for dealing with memory issues when using Daft

## How is my data partitioned?

Daft will automatically use certain heuristics to determine the number of partitions for you when you create a DataFrame. When reading data from files (e.g. Parquet, CSV or JSON), Daft will group small files/split large files appropriately
into nicely-sized partitions based on their estimated in-memory data sizes.

To interrogate the partitioning of your current DataFrame, you may use the [`df.explain(show_all=True)`](https://www.getdaft.io/projects/docs/en/stable/api_docs/doc_gen/dataframe_methods/daft.DataFrame.explain.html#daft.DataFrame.explain) method. Here is an example output from a simple `df = daft.read_parquet(...)` call on a fairly large number of Parquet files.

=== "🐍 Python"

```python
df = daft.read_parquet("s3://bucket/path_to_100_parquet_files/**")
df.explain(show_all=True)
```

``` {title="Output"}

== Unoptimized Logical Plan ==

* GlobScanOperator
| Glob paths = [s3://bucket/path_to_100_parquet_files/**]
| ...


...


== Physical Plan ==

* TabularScan:
| Num Scan Tasks = 3
| Estimated Scan Bytes = 72000000
| Clustering spec = { Num partitions = 3 }
| ...
```

In the above example, the call to [`df.read_parquet`](https://www.getdaft.io/projects/docs/en/stable/api_docs/doc_gen/io_functions/daft.read_parquet.html) read 100 Parquet files, but the Physical Plan indicates that Daft will only create 3 partitions. This is because these files are quite small (in this example, totalling about 72MB of data) and Daft recognizes that it should be able to read them as just 3 partitions, each with about 33 files each!

## How can I change the way my data is partitioned?

You can change the way your data is partitioned by leveraging certain DataFrame methods:

1. [`daft.DataFrame.repartition`](https://www.getdaft.io/projects/docs/en/stable/api_docs/doc_gen/dataframe_methods/daft.DataFrame.repartition.html#daft.DataFrame.repartition): repartitions your data into `N` partitions by performing a hash-bucketing that ensure that all data with the same values for the specified columns ends up in the same partition. Expensive, requires data movement between partitions and machines.

2. [`daft.DataFrame.into_partitions`](https://www.getdaft.io/projects/docs/en/stable/api_docs/doc_gen/dataframe_methods/daft.DataFrame.into_partitions.html#daft.DataFrame.into_partitions): splits or coalesces adjacent partitions to meet the specified target number of total partitions. This is less expensive than a call to `df.repartition` because it does not require shuffling or moving data between partitions.

3. Many global dataframe operations such as [`daft.DataFrame.join`](https://www.getdaft.io/projects/docs/en/stable/api_docs/doc_gen/dataframe_methods/daft.DataFrame.join.html#daft.DataFrame.join), [`daft.DataFrame.sort`](https://www.getdaft.io/projects/docs/en/stable/api_docs/doc_gen/dataframe_methods/daft.DataFrame.sort.html#daft.DataFrame.sort) and [`daft.GroupedDataframe.agg`](https://www.getdaft.io/projects/docs/en/stable/api_docs/groupby.html#daft.dataframe.GroupedDataFrame.agg) will change the partitioning of your data. This is because they require shuffling data between partitions to be globally correct.

Note that many of these methods will change both the *number of partitions* as well as the *clustering specification* of the new partitioning. For example, when calling `df.repartition(8, col("x"))`, the resultant dataframe will now have 8 partitions in total with the additional guarantee that all rows with the same value of `col("x")` are in the same partition! This is called "hash partitioning".

=== "🐍 Python"

```python
df = df.repartition(8, daft.col("x"))
df.explain(show_all=True)
```

``` {title="Output"}

== Unoptimized Logical Plan ==

* Repartition: Scheme = Hash
| Num partitions = Some(8)
| By = col(x)
|
* GlobScanOperator
| Glob paths = [s3://bucket/path_to_1000_parquet_files/**]
| ...

...

== Physical Plan ==

* ReduceMerge
|
* FanoutByHash: 8
| Partition by = col(x)
|
* TabularScan:
| Num Scan Tasks = 3
| Estimated Scan Bytes = 72000000
| Clustering spec = { Num partitions = 3 }
| ...
```
Loading
Loading