Skip to content

Commit

Permalink
[release] cherry-pick from master and release for 2.2.1 (dmlc#7388)
Browse files Browse the repository at this point in the history
Co-authored-by: Muhammed Fatih BALIN <[email protected]>
Co-authored-by: Xinyu Yao <[email protected]>
Co-authored-by: Ubuntu <[email protected]>
  • Loading branch information
4 people authored and lijialin03 committed Jan 6, 2025
1 parent 0a31e7b commit 998cd6f
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 115 deletions.
4 changes: 4 additions & 0 deletions examples/graphbolt/pyg/node_classification_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,11 @@ def main():
num_classes = dataset.tasks[0].metadata["num_classes"]

if args.gpu_cache_size > 0 and args.feature_device != "cuda":
<<<<<<< HEAD:examples/graphbolt/pyg/node_classification_advanced.py
features._features[("node", None, "feat")] = gb.gpu_cached_feature(
=======
features._features[("node", None, "feat")] = gb.GPUCachedFeature(
>>>>>>> [release] cherry-pick from master and release for 2.2.1 (#7388):examples/sampling/graphbolt/pyg/node_classification_advanced.py
features._features[("node", None, "feat")],
args.gpu_cache_size,
)
Expand Down
13 changes: 13 additions & 0 deletions python/dgl/graphbolt/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,19 @@
import torch
from torch.torch_version import TorchVersion

if TorchVersion(torch.__version__) >= "2.3.0":
# [TODO][https://github.com/dmlc/dgl/issues/7387] Remove or refine below
# check.
# Due to https://github.com/dmlc/dgl/issues/7380, we need to check if dill
# is available before using it.
torch.utils.data.datapipes.utils.common.DILL_AVAILABLE = (
torch.utils._import_utils.dill_available()
)

# pylint: disable=wrong-import-position
from torch.utils.data import functional_datapipe
from torchdata.datapipes.iter import IterDataPipe

if (
TorchVersion(torch.__version__) >= "2.3.0"
and TorchVersion(torch.__version__) < "2.3.1"
Expand Down
18 changes: 16 additions & 2 deletions python/dgl/graphbolt/feature_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,18 @@ def wait(self):
for type_name, nodes in input_nodes.items():
if type_name not in self.node_feature_keys or nodes is None:
continue
if nodes.is_cuda:
nodes.record_stream(torch.cuda.current_stream())
for feature_name in self.node_feature_keys[type_name]:
node_features[(type_name, feature_name)] = read_helper(
("node", type_name, feature_name), nodes
node_features[
(type_name, feature_name)
] = record_stream(
self.feature_store.read(
"node",
type_name,
feature_name,
nodes,
)
)
else:
for feature_name in self.node_feature_keys:
Expand All @@ -266,6 +275,11 @@ def wait(self):
or edges is None
):
continue
<<<<<<< HEAD
=======
if edges.is_cuda:
edges.record_stream(torch.cuda.current_stream())
>>>>>>> [release] cherry-pick from master and release for 2.2.1 (#7388)
for feature_name in self.edge_feature_keys[type_name]:
edge_features[i][
(type_name, feature_name)
Expand Down
37 changes: 35 additions & 2 deletions python/dgl/graphbolt/impl/gpu_cached_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@
__all__ = ["GPUCachedFeature", "gpu_cached_feature"]


def nbytes(tensor):
"""Returns the number of bytes to store the given tensor.
Needs to be defined only for torch versions less than 2.1. In torch >= 2.1,
we can simply use "tensor.nbytes".
"""
return tensor.numel() * tensor.element_size()


def num_cache_items(cache_capacity_in_bytes, single_item):
"""Returns the number of rows to be cached."""
item_bytes = nbytes(single_item)
# Round up so that we never get a size of 0, unless bytes is 0.
return (cache_capacity_in_bytes + item_bytes - 1) // item_bytes


class GPUCachedFeature(Feature):
r"""GPU cached feature wrapping a fallback feature. It uses the least
recently used (LRU) algorithm as the cache eviction policy. Use
Expand All @@ -26,12 +42,17 @@ class GPUCachedFeature(Feature):
----------
fallback_feature : Feature
The fallback feature.
<<<<<<< HEAD
cache : GPUFeatureCache
A GPUFeatureCache instance to serve as the cache backend.
offset : int, optional
The offset value to add to the given ids before using the cache. This
parameter is useful if multiple `GPUCachedFeature`s are sharing a single
GPUFeatureCache object.
=======
max_cache_size_in_bytes : int
The capacity of the GPU cache in bytes.
>>>>>>> [release] cherry-pick from master and release for 2.2.1 (#7388)
Examples
--------
Expand All @@ -55,6 +76,7 @@ class GPUCachedFeature(Feature):
torch.Size([5])
"""

<<<<<<< HEAD
_cache_type = GPUFeatureCache

def __init__(
Expand All @@ -63,14 +85,25 @@ def __init__(
cache: GPUFeatureCache,
offset: int = 0,
):
=======
def __init__(self, fallback_feature: Feature, max_cache_size_in_bytes: int):
>>>>>>> [release] cherry-pick from master and release for 2.2.1 (#7388)
super(GPUCachedFeature, self).__init__()
assert isinstance(fallback_feature, Feature), (
f"The fallback_feature must be an instance of Feature, but got "
f"{type(fallback_feature)}."
)
self._fallback_feature = fallback_feature
<<<<<<< HEAD
self._feature = cache
self._offset = offset
=======
self.max_cache_size_in_bytes = max_cache_size_in_bytes
# Fetching the feature dimension from the underlying feature.
feat0 = fallback_feature.read(torch.tensor([0]))
cache_size = num_cache_items(max_cache_size_in_bytes, feat0)
self._feature = GPUCache((cache_size,) + feat0.shape[1:], feat0.dtype)
>>>>>>> [release] cherry-pick from master and release for 2.2.1 (#7388)

def read(self, ids: torch.Tensor = None):
"""Read the feature by index.
Expand Down Expand Up @@ -236,11 +269,11 @@ def update(self, value: torch.Tensor, ids: torch.Tensor = None):
feat0 = value[:1]
self._fallback_feature.update(value)
cache_size = min(
bytes_to_number_of_items(self.cache_size_in_bytes, feat0),
num_cache_items(self.max_cache_size_in_bytes, feat0),
value.shape[0],
)
self._feature = None # Destroy the existing cache first.
self._feature = self._cache_type(
self._feature = GPUCache(
(cache_size,) + feat0.shape[1:], feat0.dtype
)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1702,21 +1702,6 @@ def test_sample_neighbors_homo(
assert subgraph.original_row_node_ids is None


@pytest.mark.parametrize("labor", [False, True])
def test_sample_neighbors_hetero_single_fanout(labor):
u, i = torch.randint(20, size=(1000,)), torch.randint(10, size=(1000,))
graph = dgl.heterograph({("u", "w", "i"): (u, i), ("i", "b", "u"): (i, u)})

graph = gb.from_dglgraph(graph).to(F.ctx())

sampler = graph.sample_layer_neighbors if labor else graph.sample_neighbors

for i in range(11):
nodes = {"u": torch.randint(10, (100,), device=F.ctx())}
sampler(nodes, fanouts=torch.tensor([-1]))
# Should reach here without crashing.


@pytest.mark.parametrize("indptr_dtype", [torch.int32, torch.int64])
@pytest.mark.parametrize("indices_dtype", [torch.int32, torch.int64])
@pytest.mark.parametrize("labor", [False, True])
Expand Down
96 changes: 2 additions & 94 deletions tests/python/pytorch/graphbolt/impl/test_gpu_cached_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ def test_gpu_cached_feature(dtype, cache_size_a, cache_size_b):
cache_size_a *= a[:1].element_size() * a[:1].numel()
cache_size_b *= b[:1].element_size() * b[:1].numel()

feat_store_a = gb.gpu_cached_feature(gb.TorchBasedFeature(a), cache_size_a)
feat_store_b = gb.gpu_cached_feature(gb.TorchBasedFeature(b), cache_size_b)
feat_store_a = gb.GPUCachedFeature(gb.TorchBasedFeature(a), cache_size_a)
feat_store_b = gb.GPUCachedFeature(gb.TorchBasedFeature(b), cache_size_b)

# Test read the entire feature.
assert torch.equal(feat_store_a.read(), a.to("cuda"))
Expand Down Expand Up @@ -125,95 +125,3 @@ def test_gpu_cached_feature(dtype, cache_size_a, cache_size_b):
# Test with different dimensionality
feat_store_a.update(b)
assert torch.equal(feat_store_a.read(), b.to("cuda"))


@unittest.skipIf(
_skip_condition_cached_feature(),
reason=_reason_to_skip_cached_feature(),
)
@pytest.mark.parametrize(
"dtype",
[
torch.bool,
torch.uint8,
torch.int8,
torch.int16,
torch.int32,
torch.int64,
torch.float16,
torch.bfloat16,
torch.float32,
torch.float64,
],
)
@pytest.mark.parametrize("pin_memory", [False, True])
def test_gpu_cached_feature_read_async(dtype, pin_memory):
a = torch.randint(0, 2, [1000, 13], dtype=dtype, pin_memory=pin_memory)
a_cuda = a.to(F.ctx())

cache_size = 256 * a[:1].nbytes

feat_store = gb.gpu_cached_feature(gb.TorchBasedFeature(a), cache_size)

# Test read with ids.
ids1 = torch.tensor([0, 15, 71, 101], device=F.ctx())
ids2 = torch.tensor([71, 101, 202, 303], device=F.ctx())
for ids in [ids1, ids2]:
reader = feat_store.read_async(ids)
for _ in range(feat_store.read_async_num_stages(ids.device)):
values = next(reader)
assert torch.equal(values.wait(), a_cuda[ids])


@unittest.skipIf(
_skip_condition_cached_feature(),
reason=_reason_to_skip_cached_feature(),
)
@unittest.skipIf(
not torch.ops.graphbolt.detect_io_uring(),
reason="DiskBasedFeature is not available on this system.",
)
@pytest.mark.parametrize(
"dtype",
[
torch.bool,
torch.uint8,
torch.int8,
torch.int16,
torch.int32,
torch.int64,
torch.float16,
torch.float32,
torch.float64,
],
)
def test_gpu_cached_nested_feature_async(dtype):
a = torch.randint(0, 2, [1000, 13], dtype=dtype, device=F.ctx())

cache_size = 256 * a[:1].nbytes

ids1 = torch.tensor([0, 15, 71, 101], device=F.ctx())
ids2 = torch.tensor([71, 101, 202, 303], device=F.ctx())

with tempfile.TemporaryDirectory() as test_dir:
path = to_on_disk_numpy(test_dir, "tensor", a)

disk_store = gb.DiskBasedFeature(path=path)
feat_store1 = gb.gpu_cached_feature(disk_store, cache_size)
feat_store2 = gb.gpu_cached_feature(
gb.cpu_cached_feature(disk_store, cache_size * 2), cache_size
)
feat_store3 = gb.gpu_cached_feature(
gb.cpu_cached_feature(disk_store, cache_size * 2, pin_memory=True),
cache_size,
)

# Test read feature.
for feat_store in [feat_store1, feat_store2, feat_store3]:
for ids in [ids1, ids2]:
reader = feat_store.read_async(ids)
for _ in range(feat_store.read_async_num_stages(ids.device)):
values = next(reader)
assert torch.equal(values.wait(), a[ids])

feat_store1 = feat_store2 = feat_store3 = disk_store = None
3 changes: 1 addition & 2 deletions tests/python/pytorch/graphbolt/test_feature_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ def add_node_and_edge_ids(minibatch):
}
)
item_sampler_dp = gb.ItemSampler(itemset, batch_size=2)
fn = partial(_func, add_node_and_edge_ids)
converter_dp = Mapper(item_sampler_dp, fn)
converter_dp = Mapper(item_sampler_dp, add_node_and_edge_ids)
# "n3:e3:n3" is not in the sampled edges.
# Do not fetch feature for "n2:e2:n1".
node_feature_keys = {"n1": ["a"]}
Expand Down

0 comments on commit 998cd6f

Please sign in to comment.