diff --git a/.environment.yml b/.environment.yml index 323602f891..51a843255b 100644 --- a/.environment.yml +++ b/.environment.yml @@ -1,9 +1,9 @@ - name: py39 channels: - conda-forge - defaults dependencies: + - protobuf>=4.25.3 - ca-certificates>=2021.10.8 - certifi>=2021.10.8 - ipykernel @@ -27,89 +27,89 @@ dependencies: - xz>=5.2.5 - zlib>=1.2.12 - pip: - - aiohttp==3.9.2 - - aiosignal==1.3.1 - - appnope==0.1.3 - - asttokens==2.4.0 - - async-timeout==4.0.3 - - attrs==23.1.0 - - autodocsumm==0.2.11 - - backcall==0.2.0 - - backoff==2.2.1 - - bleach==6.0.0 - - botocore==1.31.53 - - certifi==2023.7.22 - - contourpy==1.1.1 - - cycler==0.11.0 - - decorator==5.1.1 - - defusedxml==0.7.1 - - docopt==0.6.2 - - exceptiongroup==1.1.3 - - executing==1.2.0 - - fastjsonschema==2.18.0 - - fonttools==4.42.1 - - frozenlist==1.4.0 - - gql==3.4.1 - - graphql-core==3.2.3 - - ipython==8.15.0 - - jedi==0.19.0 - - jmespath==1.0.1 - - jsonpickle==3.0.2 - - jsonschema==4.19.1 - - jsonschema-specifications==2023.7.1 - - jupyter_client==8.3.1 - - jupyter_core==5.3.1 - - jupyterlab-pygments==0.2.2 - - kiwisolver==1.4.5 - - matplotlib==3.8.0 - - matplotlib-inline==0.1.6 - - maturin==1.2.3 - - mistune==3.0.1 - - multidict==6.0.4 - - nbclient==0.8.0 - - nbconvert==7.8.0 - - nbformat==5.9.2 - - nbsphinx==0.9.3 - - networkx==3.1 - - numpy==1.26.0 - - numpydoc==1.5.0 - - pandas==2.1.1 - - pandocfilters==1.5.0 - - parso==0.8.3 - - pexpect==4.8.0 - - pickleshare==0.7.5 - - Pillow==10.0.1 - - pipreqs==0.4.13 - - platformdirs==3.10.0 - - prompt-toolkit==3.0.39 - - ptyprocess==0.7.0 - - pure-eval==0.2.2 - - pyarrow==13.0.0 - - pydata-sphinx-theme==0.14.1 - - pyparsing==3.1.1 - - python-dateutil==2.8.2 - - pyvis==0.3.2 - - pyzmq==25.1.1 - - referencing==0.30.2 - - requests-toolbelt==0.10.1 - - rpds-py==0.10.3 - - six==1.16.0 - - sphinx-automodapi==0.16.0 - - sphinx-autosummary-accessors==2023.4.0 - - sphinx-copybutton==0.5.2 - - sphinx-favicon==1.0.1 - - sphinx_design==0.5.0 - - sphinx-notfound-page==1.0.0 - - stack-data==0.6.2 - - tinycss2==1.2.1 - - tomli==2.0.1 - - tornado==6.3.3 - - traitlets==5.10.0 - - typing_extensions==4.8.0 - - tzdata==2023.3 - - urllib3==1.26.16 - - wcwidth==0.2.6 - - webencodings==0.5.1 - - websockets==10.4 - - yarg==0.1.9 - - yarl==1.9.2 \ No newline at end of file + - aiohttp==3.9.2 + - aiosignal==1.3.1 + - appnope==0.1.3 + - asttokens==2.4.0 + - async-timeout==4.0.3 + - attrs==23.1.0 + - autodocsumm==0.2.11 + - backcall==0.2.0 + - backoff==2.2.1 + - bleach==6.0.0 + - botocore==1.31.53 + - certifi==2023.7.22 + - contourpy==1.1.1 + - cycler==0.11.0 + - decorator==5.1.1 + - defusedxml==0.7.1 + - docopt==0.6.2 + - exceptiongroup==1.1.3 + - executing==1.2.0 + - fastjsonschema==2.18.0 + - fonttools==4.42.1 + - frozenlist==1.4.0 + - gql==3.4.1 + - graphql-core==3.2.3 + - ipython==8.15.0 + - jedi==0.19.0 + - jmespath==1.0.1 + - jsonpickle==3.0.2 + - jsonschema==4.19.1 + - jsonschema-specifications==2023.7.1 + - jupyter_client==8.3.1 + - jupyter_core==5.3.1 + - jupyterlab-pygments==0.2.2 + - kiwisolver==1.4.5 + - matplotlib==3.8.0 + - matplotlib-inline==0.1.6 + - maturin==1.2.3 + - mistune==3.0.1 + - multidict==6.0.4 + - nbclient==0.8.0 + - nbconvert==7.8.0 + - nbformat==5.9.2 + - nbsphinx==0.9.3 + - networkx==3.1 + - numpy==1.26.0 + - numpydoc==1.5.0 + - pandas==2.1.1 + - pandocfilters==1.5.0 + - parso==0.8.3 + - pexpect==4.8.0 + - pickleshare==0.7.5 + - Pillow==10.0.1 + - pipreqs==0.4.13 + - platformdirs==3.10.0 + - prompt-toolkit==3.0.39 + - ptyprocess==0.7.0 + - pure-eval==0.2.2 + - pyarrow==13.0.0 + - pydata-sphinx-theme==0.14.1 + - pyparsing==3.1.1 + - python-dateutil==2.8.2 + - pyvis==0.3.2 + - pyzmq==25.1.1 + - referencing==0.30.2 + - requests-toolbelt==0.10.1 + - rpds-py==0.10.3 + - six==1.16.0 + - sphinx-automodapi==0.16.0 + - sphinx-autosummary-accessors==2023.4.0 + - sphinx-copybutton==0.5.2 + - sphinx-favicon==1.0.1 + - sphinx_design==0.5.0 + - sphinx-notfound-page==1.0.0 + - stack-data==0.6.2 + - tinycss2==1.2.1 + - tomli==2.0.1 + - tornado==6.3.3 + - traitlets==5.10.0 + - typing_extensions==4.8.0 + - tzdata==2023.3 + - urllib3==1.26.16 + - wcwidth==0.2.6 + - webencodings==0.5.1 + - websockets==10.4 + - yarg==0.1.9 + - yarl==1.9.2 \ No newline at end of file diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 5bb8f9c225..40603c2933 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -47,8 +47,7 @@ jobs: - name: Run benchmark (Unix) run: | set -o pipefail - cargo bench --bench base -p raphtory-benchmark -- --output-format=bencher | tee benchmark-result.txt - cargo bench --bench algobench -p raphtory-benchmark -- --output-format=bencher | tee benchmark-result.txt + cargo bench --bench base --bench algobench -p raphtory-benchmark -- --output-format=bencher | tee benchmark-result.txt - name: Delete cargo.lock if it exists run: | rm -f Cargo.lock diff --git a/.github/workflows/test_rust_disk_storage_workflow.yml b/.github/workflows/test_rust_disk_storage_workflow.yml index 4fe4c62152..f57acb524f 100644 --- a/.github/workflows/test_rust_disk_storage_workflow.yml +++ b/.github/workflows/test_rust_disk_storage_workflow.yml @@ -18,9 +18,9 @@ jobs: strategy: matrix: include: - - os: macos-latest - - os: ubuntu-20.04 - - os: windows-latest + - { os: macos-latest, flags: "" } + - { os: ubuntu-20.04, flags: "-C link-arg=-fuse-ld=lld" } + - { os: windows-latest, flags: "" } steps: - uses: maxim-lobanov/setup-xcode@v1 name: Xcode version @@ -43,6 +43,10 @@ jobs: sudo rm -rf /usr/local/lib/android sudo rm -rf /opt/ghc sudo rm -rf "$AGENT_TOOLSDIRECTORY" + - name: Install LLD + if: "contains(matrix.os, 'ubuntu')" + run: | + sudo apt-get install lld - uses: webfactory/ssh-agent@v0.7.0 name: Load pometry-storage key with: @@ -56,14 +60,14 @@ jobs: - name: Install Protoc uses: arduino/setup-protoc@v3 with: - repo-token: ${{ secrets.GITHUB_TOKEN }} + repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Install nextest uses: taiki-e/install-action@nextest - name: Activate pometry-storage in Cargo.toml run: make pull-storage - name: Run all Tests (disk_graph) env: - RUSTFLAGS: -Awarnings + RUSTFLAGS: -Awarnings ${{ matrix.flags }} TEMPDIR: ${{ runner.temp }} run: | cargo nextest run --all --no-default-features --features "storage" diff --git a/Cargo.toml b/Cargo.toml index 0b1bbcfef1..534e76327b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,7 +84,7 @@ display-error-chain = "0.2.0" polars-arrow = "0.39.2" polars-parquet = "0.39.2" polars-utils = "0.39.2" -kdam = { version = "0.5.1", features = ["notebook"] } +kdam = { version = "0.5.1" } pretty_assertions = "1.4.0" quickcheck = "1.0.3" quickcheck_macros = "1.0.0" diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml index c7f9926b49..a280794697 100644 --- a/examples/rust/Cargo.toml +++ b/examples/rust/Cargo.toml @@ -30,10 +30,3 @@ name = "crypto" [[bin]] name = "pokec" - -[target.x86_64-unknown-linux-gnu] -linker = "/usr/bin/clang" -rustflags = ["-Clink-arg=-fuse-ld=lld", "-Clink-arg=-Wl,--no-rosegment"] - -[profile.release] -debug = true diff --git a/pometry-storage-private b/pometry-storage-private index 45f16ded88..88d24c596c 160000 --- a/pometry-storage-private +++ b/pometry-storage-private @@ -1 +1 @@ -Subproject commit 45f16ded88a34586c75a6e45133220a84cf0bdd1 +Subproject commit 88d24c596c1624b069cccb06661f739293352ed2 diff --git a/python/src/lib.rs b/python/src/lib.rs index c2afad853d..00456219e5 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1,5 +1,3 @@ -#![allow(non_local_definitions)] - extern crate core; use pyo3::prelude::*; use raphtory_core::python::packages::base_modules::{ diff --git a/raphtory-cypher/examples/raphtory_cypher.rs b/raphtory-cypher/examples/raphtory_cypher.rs index a9084dba71..fe41287075 100644 --- a/raphtory-cypher/examples/raphtory_cypher.rs +++ b/raphtory-cypher/examples/raphtory_cypher.rs @@ -17,7 +17,7 @@ mod cypher { use arrow::util::pretty::print_batches; use clap::Parser; use futures::{stream, StreamExt}; - use raphtory::disk_graph::graph_impl::{DiskGraph, ParquetLayerCols}; + use raphtory::disk_graph::{graph_impl::ParquetLayerCols, DiskGraph}; use raphtory_cypher::{run_cypher, run_cypher_to_streams, run_sql}; use serde::{de::DeserializeOwned, Deserialize}; diff --git a/raphtory-cypher/src/executor/table_provider/edge.rs b/raphtory-cypher/src/executor/table_provider/edge.rs index fe54dfe23e..673d8c6f53 100644 --- a/raphtory-cypher/src/executor/table_provider/edge.rs +++ b/raphtory-cypher/src/executor/table_provider/edge.rs @@ -28,7 +28,7 @@ use datafusion::{ }; use futures::Stream; use pometry_storage::prelude::*; -use raphtory::disk_graph::graph_impl::DiskGraph; +use raphtory::disk_graph::DiskGraph; use crate::executor::{arrow2_to_arrow_buf, ExecError}; @@ -99,7 +99,7 @@ impl EdgeListTableProvider { fn lift_nested_arrow_schema(graph: &DiskGraph, layer_id: usize) -> Result, ExecError> { let arrow2_fields = graph.as_ref().layer(layer_id).edges_data_type(); - let a2_dt = crate::arrow2::datatypes::ArrowDataType::Struct(arrow2_fields.clone()); + let a2_dt = crate::arrow2::datatypes::ArrowDataType::Struct(arrow2_fields.to_vec()); let a_dt: DataType = a2_dt.into(); let schema = match a_dt { DataType::Struct(fields) => { diff --git a/raphtory-cypher/src/executor/table_provider/node.rs b/raphtory-cypher/src/executor/table_provider/node.rs index 112afb7d84..9320dd79fb 100644 --- a/raphtory-cypher/src/executor/table_provider/node.rs +++ b/raphtory-cypher/src/executor/table_provider/node.rs @@ -24,7 +24,7 @@ use futures::Stream; use pometry_storage::properties::ConstProps; use raphtory::{ core::entities::VID, - disk_graph::{graph_impl::DiskGraph, prelude::*}, + disk_graph::{prelude::*, DiskGraph}, }; use std::{any::Any, fmt::Formatter, sync::Arc}; diff --git a/raphtory-cypher/src/hop/execution.rs b/raphtory-cypher/src/hop/execution.rs index 2a49112ddf..e4580e8806 100644 --- a/raphtory-cypher/src/hop/execution.rs +++ b/raphtory-cypher/src/hop/execution.rs @@ -38,17 +38,16 @@ use datafusion::{ use datafusion::physical_expr::Partitioning; use futures::{Stream, StreamExt}; +use crate::take_record_batch; use pometry_storage::graph_fragment::TempColGraphFragment; use raphtory::{ core::{entities::VID, Direction}, disk_graph::{ - graph_impl::DiskGraph, prelude::{ArrayOps, BaseArrayOps, PrimitiveCol}, + DiskGraph, }, }; -use crate::take_record_batch; - use super::operator::HopPlan; #[derive(Debug)] diff --git a/raphtory-cypher/src/hop/operator.rs b/raphtory-cypher/src/hop/operator.rs index c274cf56e9..b7d11928ad 100644 --- a/raphtory-cypher/src/hop/operator.rs +++ b/raphtory-cypher/src/hop/operator.rs @@ -5,7 +5,7 @@ use datafusion::{ logical_expr::{Expr, LogicalPlan, TableScan, UserDefinedLogicalNodeCore}, }; -use raphtory::{core::Direction, disk_graph::graph_impl::DiskGraph}; +use raphtory::{core::Direction, disk_graph::DiskGraph}; #[derive(Debug, PartialEq, Hash, Eq)] pub struct HopPlan { diff --git a/raphtory-cypher/src/hop/rule.rs b/raphtory-cypher/src/hop/rule.rs index 60efb7a94a..331beec2cf 100644 --- a/raphtory-cypher/src/hop/rule.rs +++ b/raphtory-cypher/src/hop/rule.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use crate::hop::operator::HopPlan; use async_trait::async_trait; use datafusion::{ common::Column, @@ -10,9 +11,7 @@ use datafusion::{ physical_plan::ExecutionPlan, physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}, }; -use raphtory::{core::Direction, disk_graph::graph_impl::DiskGraph}; - -use crate::hop::operator::HopPlan; +use raphtory::{core::Direction, disk_graph::DiskGraph}; use super::execution::HopExec; @@ -141,7 +140,7 @@ impl ExtensionPlanner for HopPlanner { #[cfg(test)] mod test { use arrow::util::pretty::print_batches; - use raphtory::disk_graph::graph_impl::DiskGraph; + use raphtory::disk_graph::DiskGraph; use tempfile::tempdir; use crate::prepare_plan; diff --git a/raphtory-cypher/src/lib.rs b/raphtory-cypher/src/lib.rs index 0939e5972f..4cca9e433a 100644 --- a/raphtory-cypher/src/lib.rs +++ b/raphtory-cypher/src/lib.rs @@ -34,7 +34,7 @@ mod cypher { parser::ast::*, *, }; - use raphtory::disk_graph::graph_impl::DiskGraph; + use raphtory::disk_graph::DiskGraph; use crate::{ executor::table_provider::node::NodeTableProvider, @@ -185,9 +185,8 @@ mod cypher { use arrow_array::RecordBatch; use tempfile::tempdir; - use raphtory::{disk_graph::graph_impl::DiskGraph, prelude::*}; - use crate::run_cypher; + use raphtory::{disk_graph::DiskGraph, prelude::*}; lazy_static::lazy_static! { static ref EDGES: Vec<(u64, u64, i64, f64)> = vec![ @@ -278,10 +277,9 @@ mod cypher { datatypes::*, }; use arrow::util::pretty::print_batches; + use raphtory::disk_graph::{graph_impl::ParquetLayerCols, DiskGraph}; use tempfile::tempdir; - use raphtory::disk_graph::graph_impl::{DiskGraph, ParquetLayerCols}; - use crate::run_cypher; fn schema() -> ArrowSchema { diff --git a/raphtory-cypher/src/transpiler/mod.rs b/raphtory-cypher/src/transpiler/mod.rs index a83d1f5f70..890cf3d8a6 100644 --- a/raphtory-cypher/src/transpiler/mod.rs +++ b/raphtory-cypher/src/transpiler/mod.rs @@ -14,7 +14,7 @@ use raphtory::{ Direction, }, db::{api::properties::internal::ConstPropertiesOps, graph::node::NodeView}, - disk_graph::graph_impl::DiskGraph, + disk_graph::DiskGraph, prelude::*, }; use sqlparser::ast::{ @@ -1143,18 +1143,15 @@ fn sql_like( #[cfg(test)] mod test { - use crate::{parser, transpiler}; - - use super::*; + use pretty_assertions::assert_eq; use raphtory::{ db::{api::mutation::AdditionOps, graph::graph::Graph}, + disk_graph::DiskGraph, prelude::NO_PROPS, }; use tempfile::tempdir; - use pretty_assertions::assert_eq; - #[test] fn count_all_nodes() { check_cypher_to_sql( diff --git a/raphtory-graphql/src/data.rs b/raphtory-graphql/src/data.rs index d3fde782b2..2d1eb51c23 100644 --- a/raphtory-graphql/src/data.rs +++ b/raphtory-graphql/src/data.rs @@ -1,6 +1,6 @@ use parking_lot::RwLock; #[cfg(feature = "storage")] -use raphtory::disk_graph::graph_impl::DiskGraph; +use raphtory::disk_graph::DiskGraph; use raphtory::{ core::Prop, db::api::view::MaterializedGraph, diff --git a/raphtory-graphql/src/lib.rs b/raphtory-graphql/src/lib.rs index d01c64f288..d17ee0ebfc 100644 --- a/raphtory-graphql/src/lib.rs +++ b/raphtory-graphql/src/lib.rs @@ -46,7 +46,7 @@ mod graphql_test { use async_graphql::UploadValue; use dynamic_graphql::{Request, Variables}; #[cfg(feature = "storage")] - use raphtory::disk_graph::graph_impl::DiskGraph; + use raphtory::disk_graph::DiskGraph; use raphtory::{ db::{api::view::IntoDynamic, graph::views::deletion_graph::PersistentGraph}, prelude::*, diff --git a/raphtory/Cargo.toml b/raphtory/Cargo.toml index 6b7994491d..9616a0d9ba 100644 --- a/raphtory/Cargo.toml +++ b/raphtory/Cargo.toml @@ -124,8 +124,8 @@ python = [ "dep:display-error-chain", "polars-arrow?/compute", "raphtory-api/python", - "dep:kdam", "dep:rpds", + "kdam?/notebook" ] # storage @@ -145,7 +145,8 @@ storage = [ arrow = [ "dep:polars-arrow", "dep:polars-parquet", - "polars-parquet?/compression" + "polars-parquet?/compression", + "dep:kdam", ] proto = [ diff --git a/raphtory/src/core/utils/errors.rs b/raphtory/src/core/utils/errors.rs index 95ac04c7ee..ae485258ee 100644 --- a/raphtory/src/core/utils/errors.rs +++ b/raphtory/src/core/utils/errors.rs @@ -95,11 +95,11 @@ pub enum GraphError { source: std::io::Error, }, - #[cfg(feature = "python")] + #[cfg(feature = "arrow")] #[error("Failed to load graph: {0}")] LoadFailure(String), - #[cfg(feature = "python")] + #[cfg(feature = "arrow")] #[error( "Failed to load graph as the following columns are not present within the dataframe: {0}" )] diff --git a/raphtory/src/db/api/properties/props.rs b/raphtory/src/db/api/properties/props.rs index db651b0981..9e8cc4df37 100644 --- a/raphtory/src/db/api/properties/props.rs +++ b/raphtory/src/db/api/properties/props.rs @@ -8,7 +8,7 @@ use raphtory_api::core::storage::arc_str::ArcStr; use std::collections::HashMap; /// View of the properties of an entity (graph|node|edge) -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Properties { pub(crate) props: P, } diff --git a/raphtory/src/db/api/view/internal/materialize.rs b/raphtory/src/db/api/view/internal/materialize.rs index d06f237001..9f94cf4032 100644 --- a/raphtory/src/db/api/view/internal/materialize.rs +++ b/raphtory/src/db/api/view/internal/materialize.rs @@ -42,7 +42,7 @@ use serde::{de::Error, Deserialize, Deserializer, Serialize}; use std::path::Path; #[cfg(feature = "storage")] -use crate::disk_graph::graph_impl::DiskGraph; +use crate::disk_graph::DiskGraph; #[enum_dispatch(CoreGraphOps)] #[enum_dispatch(InternalLayerOps)] diff --git a/raphtory/src/db/graph/node.rs b/raphtory/src/db/graph/node.rs index 0d1953db16..ce6b1e3372 100644 --- a/raphtory/src/db/graph/node.rs +++ b/raphtory/src/db/graph/node.rs @@ -36,6 +36,7 @@ use chrono::{DateTime, Utc}; use raphtory_api::core::storage::arc_str::ArcStr; use std::{ fmt, + fmt::Debug, hash::{Hash, Hasher}, sync::Arc, }; @@ -72,15 +73,12 @@ impl AsNodeRef for NodeView { } } -impl<'graph, G, GH: GraphViewOps<'graph>> fmt::Debug for NodeView { +impl<'graph, G, GH: GraphViewOps<'graph> + Debug> fmt::Debug for NodeView { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "NodeView {{ graph: {}{}, node: {} }}", - self.graph.count_nodes(), - self.graph.count_edges(), - self.node.0 - ) + f.debug_struct("NodeView") + .field("node", &self.node) + .field("graph", &self.graph) + .finish() } } diff --git a/raphtory/src/disk_graph/graph_impl/const_properties_ops.rs b/raphtory/src/disk_graph/graph_impl/const_properties_ops.rs index be62971eed..743ef46043 100644 --- a/raphtory/src/disk_graph/graph_impl/const_properties_ops.rs +++ b/raphtory/src/disk_graph/graph_impl/const_properties_ops.rs @@ -1,7 +1,7 @@ use crate::{db::api::properties::internal::ConstPropertiesOps, prelude::Prop}; use raphtory_api::core::storage::arc_str::ArcStr; -use super::DiskGraph; +use crate::disk_graph::DiskGraph; impl ConstPropertiesOps for DiskGraph { #[doc = " Find id for property name (note this only checks the meta-data, not if the property actually exists for the entity)"] diff --git a/raphtory/src/disk_graph/graph_impl/core_ops.rs b/raphtory/src/disk_graph/graph_impl/core_ops.rs index 423f60d9ae..df4f86c800 100644 --- a/raphtory/src/disk_graph/graph_impl/core_ops.rs +++ b/raphtory/src/disk_graph/graph_impl/core_ops.rs @@ -22,18 +22,18 @@ use crate::{ view::{internal::CoreGraphOps, BoxedIter}, }, disk_graph::{ - graph_impl::DiskGraph, storage_interface::{ edge::DiskOwnedEdge, edges::DiskEdges, node::{DiskNode, DiskOwnedNode}, nodes::DiskNodesOwned, }, + DiskGraph, }, }; use itertools::Itertools; use polars_arrow::datatypes::ArrowDataType; -use pometry_storage::{properties::ConstProps, GidRef, GID}; +use pometry_storage::{properties::ConstProps, GidRef}; use raphtory_api::core::{input::input_node::InputNode, storage::arc_str::ArcStr}; use rayon::prelude::*; @@ -121,8 +121,8 @@ impl CoreGraphOps for DiskGraph { fn internalise_node(&self, v: NodeRef) -> Option { match v { NodeRef::Internal(vid) => Some(vid), - NodeRef::External(vid) => self.inner.find_node(&GID::U64(vid)), - NodeRef::ExternalStr(string) => self.inner.find_node(&GID::Str(string.into())), + NodeRef::External(vid) => self.inner.find_node(GidRef::U64(vid)), + NodeRef::ExternalStr(string) => self.inner.find_node(GidRef::Str(string)), } } diff --git a/raphtory/src/disk_graph/graph_impl/edge_filter_ops.rs b/raphtory/src/disk_graph/graph_impl/edge_filter_ops.rs index 5d7beed9f2..b3fd2dce54 100644 --- a/raphtory/src/disk_graph/graph_impl/edge_filter_ops.rs +++ b/raphtory/src/disk_graph/graph_impl/edge_filter_ops.rs @@ -1,7 +1,7 @@ -use super::DiskGraph; use crate::{ core::entities::LayerIds, db::api::{storage::edges::edge_ref::EdgeStorageRef, view::internal::EdgeFilterOps}, + disk_graph::DiskGraph, }; impl EdgeFilterOps for DiskGraph { diff --git a/raphtory/src/disk_graph/graph_impl/layer_ops.rs b/raphtory/src/disk_graph/graph_impl/layer_ops.rs index a0a6bf1b4f..30fb1bb032 100644 --- a/raphtory/src/disk_graph/graph_impl/layer_ops.rs +++ b/raphtory/src/disk_graph/graph_impl/layer_ops.rs @@ -1,7 +1,7 @@ -use super::DiskGraph; use crate::{ core::{entities::LayerIds, utils::errors::GraphError}, db::api::view::internal::InternalLayerOps, + disk_graph::DiskGraph, prelude::Layer, }; use itertools::Itertools; diff --git a/raphtory/src/disk_graph/graph_impl/list_ops.rs b/raphtory/src/disk_graph/graph_impl/list_ops.rs index b9b5a66d99..c948b17220 100644 --- a/raphtory/src/disk_graph/graph_impl/list_ops.rs +++ b/raphtory/src/disk_graph/graph_impl/list_ops.rs @@ -1,6 +1,6 @@ use crate::{ db::api::view::internal::{CoreGraphOps, EdgeList, ListOps, NodeList}, - disk_graph::graph_impl::DiskGraph, + disk_graph::DiskGraph, }; use rayon::prelude::*; diff --git a/raphtory/src/disk_graph/graph_impl/materialize.rs b/raphtory/src/disk_graph/graph_impl/materialize.rs index 4561428d9a..b497f50bbf 100644 --- a/raphtory/src/disk_graph/graph_impl/materialize.rs +++ b/raphtory/src/disk_graph/graph_impl/materialize.rs @@ -3,7 +3,7 @@ use crate::{ db::api::view::{internal::InternalMaterialize, MaterializedGraph}, }; -use super::DiskGraph; +use crate::disk_graph::DiskGraph; impl InternalMaterialize for DiskGraph { fn new_base_graph(&self, _graph: InternalGraph) -> MaterializedGraph { diff --git a/raphtory/src/disk_graph/graph_impl/mod.rs b/raphtory/src/disk_graph/graph_impl/mod.rs index 28809998c2..082d0d004b 100644 --- a/raphtory/src/disk_graph/graph_impl/mod.rs +++ b/raphtory/src/disk_graph/graph_impl/mod.rs @@ -1,35 +1,15 @@ use crate::{ - arrow2::{ - array::{PrimitiveArray, StructArray}, - datatypes::{ArrowDataType as DataType, Field}, - }, core::{ - entities::{ - properties::{graph_meta::GraphMeta, props::Meta}, - LayerIds, EID, VID, - }, + entities::{EID, VID}, utils::errors::GraphError, Prop, PropType, }, - db::api::{ - mutation::internal::{InternalAdditionOps, InternalPropertyAdditionOps}, - view::{internal::Immutable, DynamicGraph, IntoDynamic}, - }, - disk_graph::{graph_impl::prop_conversion::make_node_properties_from_graph, Error}, - prelude::{Graph, GraphViewOps}, -}; -use pometry_storage::{ - disk_hmap::DiskHashMap, graph::TemporalGraph, graph_fragment::TempColGraphFragment, - load::ExternalEdgeList, RAError, + db::api::mutation::internal::{InternalAdditionOps, InternalPropertyAdditionOps}, + disk_graph::{DiskGraph, DiskGraphError}, + prelude::Graph, }; use raphtory_api::core::storage::timeindex::TimeIndexEntry; -use rayon::prelude::*; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use std::{ - fmt::{Display, Formatter}, - path::{Path, PathBuf}, - sync::Arc, -}; +use std::path::Path; pub mod const_properties_ops; pub mod core_ops; @@ -55,302 +35,12 @@ pub struct ParquetLayerCols<'a> { pub time_col: &'a str, } -#[derive(Clone, Debug)] -pub struct DiskGraph { - pub(crate) inner: Arc, - node_meta: Arc, - edge_meta: Arc, - graph_props: Arc, - graph_dir: PathBuf, -} - -impl Serialize for DiskGraph { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let path = self.graph_dir.clone(); - path.serialize(serializer) - } -} - -impl<'de> Deserialize<'de> for DiskGraph { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let path = PathBuf::deserialize(deserializer)?; - let graph_result = DiskGraph::load_from_dir(&path).map_err(|err| { - serde::de::Error::custom(format!("Failed to load Diskgraph: {:?}", err)) - })?; - Ok(graph_result) - } -} - -impl Display for DiskGraph { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "Diskgraph(num_nodes={}, num_temporal_edges={}", - self.count_nodes(), - self.count_temporal_edges() - ) - } -} - -impl AsRef for DiskGraph { - fn as_ref(&self) -> &TemporalGraph { - &self.inner - } -} - impl Graph { - pub fn persist_as_disk_graph(&self, graph_dir: impl AsRef) -> Result { - DiskGraph::from_graph(self, graph_dir) - } -} - -impl Immutable for DiskGraph {} - -impl IntoDynamic for DiskGraph { - fn into_dynamic(self) -> DynamicGraph { - DynamicGraph::new(self) - } -} - -impl DiskGraph { - pub fn layer_from_ids(&self, layer_ids: &LayerIds) -> Option { - match layer_ids { - LayerIds::One(layer_id) => Some(*layer_id), - LayerIds::None => None, - LayerIds::All => match self.inner.layers().len() { - 0 => None, - 1 => Some(0), - _ => todo!("multilayer edge views not yet supported in Diskgraph"), - }, - LayerIds::Multiple(ids) => match ids.len() { - 0 => None, - 1 => Some(ids[0]), - _ => todo!("multilayer edge views not yet supported in Diskgraph"), - }, - } - } - - pub fn make_simple_graph( + pub fn persist_as_disk_graph( + &self, graph_dir: impl AsRef, - edges: &[(u64, u64, i64, f64)], - chunk_size: usize, - t_props_chunk_size: usize, - ) -> DiskGraph { - // unzip into 4 vectors - let (src, (dst, (time, weight))): (Vec<_>, (Vec<_>, (Vec<_>, Vec<_>))) = edges - .iter() - .map(|(a, b, c, d)| (*a, (*b, (*c, *d)))) - .unzip(); - - let edge_lists = vec![StructArray::new( - DataType::Struct(vec![ - Field::new("src", DataType::UInt64, false), - Field::new("dst", DataType::UInt64, false), - Field::new("time", DataType::Int64, false), - Field::new("weight", DataType::Float64, false), - ]), - vec![ - PrimitiveArray::from_vec(src).boxed(), - PrimitiveArray::from_vec(dst).boxed(), - PrimitiveArray::from_vec(time).boxed(), - PrimitiveArray::from_vec(weight).boxed(), - ], - None, - )]; - DiskGraph::load_from_edge_lists( - &edge_lists, - chunk_size, - t_props_chunk_size, - graph_dir.as_ref(), - 0, - 1, - 2, - ) - .expect("failed to create graph") - } - - fn new(inner_graph: TemporalGraph, graph_dir: PathBuf) -> Self { - let node_meta = Meta::new(); - let mut edge_meta = Meta::new(); - let graph_meta = GraphMeta::new(); - - for node_type in inner_graph.node_types().into_iter().flatten() { - if let Some(node_type) = node_type { - node_meta.get_or_create_node_type_id(node_type); - } else { - panic!("Node types cannot be null"); - } - } - - for layer in inner_graph.layers() { - let edge_props_fields = layer.edges_data_type(); - - for (id, field) in edge_props_fields.iter().enumerate() { - let prop_name = &field.name; - let data_type = field.data_type(); - - let resolved_id = edge_meta - .resolve_prop_id(prop_name, data_type.into(), false) - .expect("Arrow data types should without failing"); - if id != resolved_id { - println!("Warning: Layers with different edge properties are not supported by the high-level apis on top of the disk_graph graph yet, edge properties will not be available to high-level apis"); - edge_meta = Meta::new(); - break; - } - } - } - - for l_name in inner_graph.layer_names() { - edge_meta.layer_meta().get_or_create_id(l_name); - } - - if let Some(props) = &inner_graph.node_properties().const_props { - let node_const_props_fields = props.prop_dtypes(); - for field in node_const_props_fields { - node_meta - .resolve_prop_id(&field.name, field.data_type().into(), true) - .expect("Initial resolve should not fail"); - } - } - - if let Some(props) = &inner_graph.node_properties().temporal_props { - let node_temporal_props_fields = props.prop_dtypes(); - for field in node_temporal_props_fields { - node_meta - .resolve_prop_id(&field.name, field.data_type().into(), false) - .expect("Initial resolve should not fail"); - } - } - - Self { - inner: Arc::new(inner_graph), - node_meta: Arc::new(node_meta), - edge_meta: Arc::new(edge_meta), - graph_props: Arc::new(graph_meta), - graph_dir, - } - } - - pub fn from_graph(graph: &Graph, graph_dir: impl AsRef) -> Result { - let inner_graph = TemporalGraph::from_graph(graph, graph_dir.as_ref(), || { - make_node_properties_from_graph(graph, graph_dir.as_ref()) - })?; - Ok(Self::new(inner_graph, graph_dir.as_ref().to_path_buf())) - } - - pub fn load_from_edge_lists( - edge_list: &[StructArray], - chunk_size: usize, - t_props_chunk_size: usize, - graph_dir: impl AsRef + Sync, - src_col_idx: usize, - dst_col_idx: usize, - time_col_idx: usize, - ) -> Result { - let path = graph_dir.as_ref().to_path_buf(); - let inner = TemporalGraph::from_sorted_edge_list( - graph_dir, - src_col_idx, - dst_col_idx, - time_col_idx, - chunk_size, - t_props_chunk_size, - edge_list, - )?; - Ok(Self::new(inner, path)) - } - - pub fn load_from_dir(graph_dir: impl AsRef) -> Result { - let path = graph_dir.as_ref().to_path_buf(); - let inner = TemporalGraph::new(graph_dir)?; - Ok(Self::new(inner, path)) - } - - pub fn load_from_parquets>( - graph_dir: P, - layer_parquet_cols: Vec, - node_properties: Option

, - chunk_size: usize, - t_props_chunk_size: usize, - read_chunk_size: Option, - concurrent_files: Option, - num_threads: usize, - node_type_col: Option<&str>, - ) -> Result { - let layered_edge_list: Vec> = layer_parquet_cols - .iter() - .map( - |ParquetLayerCols { - parquet_dir, - layer, - src_col, - dst_col, - time_col, - }| { - ExternalEdgeList::new(layer, parquet_dir.as_ref(), src_col, dst_col, time_col) - .expect("Failed to load events") - }, - ) - .collect::>(); - - let t_graph = TemporalGraph::from_parquets( - num_threads, - chunk_size, - t_props_chunk_size, - read_chunk_size, - concurrent_files, - graph_dir.as_ref(), - layered_edge_list, - node_properties.as_ref().map(|p| p.as_ref()), - node_type_col, - )?; - Ok(Self::new(t_graph, graph_dir.as_ref().to_path_buf())) - } - - pub fn filtered_layers_par<'a>( - &'a self, - layer_ids: &'a LayerIds, - ) -> impl ParallelIterator + 'a { - self.inner - .layers() - .par_iter() - .enumerate() - .filter(|(l_id, _)| layer_ids.contains(l_id)) - .map(|(_, layer)| layer) - } - - pub fn filtered_layers_iter<'a>( - &'a self, - layer_ids: &'a LayerIds, - ) -> impl Iterator + 'a { - self.inner - .layers() - .iter() - .enumerate() - .filter(|(l_id, _)| layer_ids.contains(l_id)) - .map(|(_, layer)| layer) - } - - pub fn from_layer(layer: TempColGraphFragment) -> Self { - let path = layer.graph_dir().to_path_buf(); - let global_ordering = layer.nodes_storage().gids().clone(); - - let global_order = DiskHashMap::from_sorted_dedup(global_ordering.clone()) - .expect("Failed to create global order"); - - let inner = TemporalGraph::new_from_layers( - global_ordering, - Arc::new(global_order), - vec![layer], - vec!["_default".to_string()], - ); - Self::new(inner, path) + ) -> Result { + DiskGraph::from_graph(self, graph_dir) } } @@ -481,10 +171,12 @@ impl InternalPropertyAdditionOps for DiskGraph { #[cfg(test)] mod test { - use super::{DiskGraph, ParquetLayerCols}; + use super::ParquetLayerCols; use crate::{ - algorithms::components::weakly_connected_components, db::api::view::StaticGraphViewOps, - disk_graph::Time, prelude::*, + algorithms::components::weakly_connected_components, + db::api::view::StaticGraphViewOps, + disk_graph::{DiskGraph, Time}, + prelude::*, }; use itertools::{chain, Itertools}; use pometry_storage::{graph::TemporalGraph, properties::Properties}; @@ -1091,3 +783,208 @@ mod test { ); } } + +#[cfg(feature = "storage")] +#[cfg(test)] +mod storage_tests { + use crate::{ + core::Prop, + db::graph::graph::assert_graph_equal, + prelude::{AdditionOps, Graph, GraphViewOps, NodeViewOps, NO_PROPS, *}, + }; + use itertools::Itertools; + use proptest::prelude::*; + use raphtory_api::core::storage::arc_str::OptionAsStr; + use std::collections::BTreeSet; + use tempfile::TempDir; + + #[test] + fn test_merge() { + let g1 = Graph::new(); + g1.add_node(0, 0, [("node_prop", 0f64)], Some("1")).unwrap(); + g1.add_node(0, 1, NO_PROPS, None).unwrap(); + g1.add_node(0, 2, [("node_prop", 2f64)], Some("2")).unwrap(); + g1.add_edge(1, 0, 1, [("test", 1i32)], None).unwrap(); + g1.add_edge(2, 0, 1, [("test", 2i32)], Some("1")).unwrap(); + g1.add_edge(2, 1, 2, [("test2", "test")], None).unwrap(); + g1.node(1) + .unwrap() + .add_constant_properties([("const_str", "test")]) + .unwrap(); + g1.node(0) + .unwrap() + .add_updates(3, [("test", "test")]) + .unwrap(); + + let g2 = Graph::new(); + g2.add_node(1, 0, [("node_prop", 1f64)], None).unwrap(); + g2.add_node(0, 1, NO_PROPS, None).unwrap(); + g2.add_node(3, 2, [("node_prop", 3f64)], Some("3")).unwrap(); + g2.add_edge(1, 0, 1, [("test", 2i32)], None).unwrap(); + g2.add_edge(3, 0, 1, [("test", 3i32)], Some("2")).unwrap(); + g2.add_edge(2, 1, 2, [("test2", "test")], None).unwrap(); + g2.node(1) + .unwrap() + .add_constant_properties([("const_str2", "test2")]) + .unwrap(); + g2.node(0) + .unwrap() + .add_updates(3, [("test", "test")]) + .unwrap(); + let g1_dir = TempDir::new().unwrap(); + let g2_dir = TempDir::new().unwrap(); + let gm_dir = TempDir::new().unwrap(); + + let g1_a = g1.persist_as_disk_graph(&g1_dir).unwrap(); + let g2_a = g2.persist_as_disk_graph(&g2_dir).unwrap(); + + let gm = g1_a.merge_by_sorted_gids(&g2_a, &gm_dir).unwrap(); + + let n0 = gm.node(0).unwrap(); + assert_eq!( + n0.properties() + .temporal() + .get("node_prop") + .unwrap() + .iter() + .collect_vec(), + [(0, Prop::F64(0.)), (1, Prop::F64(1.))] + ); + assert_eq!( + n0.properties() + .temporal() + .get("test") + .unwrap() + .iter() + .collect_vec(), + [(3, Prop::str("test")), (3, Prop::str("test"))] + ); + assert_eq!(n0.node_type().as_str(), Some("1")); + let n1 = gm.node(1).unwrap(); + assert_eq!(n1.properties().get("const_str"), Some(Prop::str("test"))); + assert_eq!(n1.properties().get("const_str2").unwrap_str(), "test2"); + assert!(n1 + .properties() + .temporal() + .values() + .all(|prop| prop.values().is_empty())); + let n2 = gm.node(2).unwrap(); + assert_eq!(n2.node_type().as_str(), Some("3")); // right has priority + + assert_eq!( + gm.default_layer().edges().id().collect::>(), + [(0, 1), (1, 2)] + ); + assert_eq!( + gm.valid_layers("1").edges().id().collect::>(), + [(0, 1)] + ); + assert_eq!( + gm.valid_layers("2").edges().id().collect::>(), + [(0, 1)] + ); + } + + fn add_edges(g: &Graph, edges: &[(i64, u64, u64)]) { + let nodes: BTreeSet<_> = edges + .iter() + .flat_map(|(_, src, dst)| [*src, *dst]) + .collect(); + for n in nodes { + g.add_node(0, n, NO_PROPS, None).unwrap(); + } + for (t, src, dst) in edges { + g.add_edge(*t, *src, *dst, NO_PROPS, None).unwrap(); + } + } + + fn inner_merge_test(left_edges: &[(i64, u64, u64)], right_edges: &[(i64, u64, u64)]) { + let left_g = Graph::new(); + add_edges(&left_g, left_edges); + let right_g = Graph::new(); + add_edges(&right_g, right_edges); + let merged_g_expected = Graph::new(); + add_edges(&merged_g_expected, left_edges); + add_edges(&merged_g_expected, right_edges); + + let left_dir = TempDir::new().unwrap(); + let right_dir = TempDir::new().unwrap(); + let merged_dir = TempDir::new().unwrap(); + + let left_g_disk = left_g.persist_as_disk_graph(&left_dir).unwrap(); + let right_g_disk = right_g.persist_as_disk_graph(&right_dir).unwrap(); + + let merged_g_disk = left_g_disk + .merge_by_sorted_gids(&right_g_disk, &merged_dir) + .unwrap(); + assert_graph_equal(&merged_g_disk, &merged_g_expected) + } + + #[test] + fn test_merge_proptest() { + proptest!(|(left_edges in prop::collection::vec((0i64..10, 0u64..10, 0u64..10), 0..=100), right_edges in prop::collection::vec((0i64..10, 0u64..10, 0u64..10), 0..=100))| { + inner_merge_test(&left_edges, &right_edges) + }) + } + + #[test] + fn test_empty_graphs() { + inner_merge_test(&[], &[]) + } + + #[test] + fn test_one_empty_graph() { + inner_merge_test(&[], &[(0, 0, 0)]) + } + + #[test] + fn inbounds_not_merging() { + inner_merge_test(&[], &[(0, 0, 0), (0, 0, 1), (0, 0, 2)]) + } + + #[test] + fn inbounds_not_merging_take2() { + inner_merge_test( + &[(0, 0, 2)], + &[ + (0, 1, 0), + (0, 0, 0), + (0, 0, 0), + (0, 0, 0), + (0, 0, 0), + (0, 0, 0), + (0, 0, 0), + ], + ) + } + + #[test] + fn offsets_panic_overflow() { + inner_merge_test( + &[ + (0, 0, 4), + (0, 0, 4), + (0, 0, 0), + (0, 0, 4), + (0, 1, 2), + (0, 3, 4), + ], + &[(0, 0, 5), (0, 2, 0)], + ) + } + + #[test] + fn inbounds_not_merging_take3() { + inner_merge_test( + &[ + (0, 0, 4), + (0, 0, 4), + (0, 0, 0), + (0, 0, 4), + (0, 1, 2), + (0, 3, 4), + ], + &[(0, 0, 3), (0, 0, 4), (0, 2, 2), (0, 0, 5), (0, 0, 6)], + ) + } +} diff --git a/raphtory/src/disk_graph/graph_impl/node_filter_ops.rs b/raphtory/src/disk_graph/graph_impl/node_filter_ops.rs index bb699b700b..9091d54169 100644 --- a/raphtory/src/disk_graph/graph_impl/node_filter_ops.rs +++ b/raphtory/src/disk_graph/graph_impl/node_filter_ops.rs @@ -1,7 +1,7 @@ use crate::{ core::entities::LayerIds, db::api::{storage::nodes::node_ref::NodeStorageRef, view::internal::NodeFilterOps}, - disk_graph::graph_impl::DiskGraph, + disk_graph::DiskGraph, }; impl NodeFilterOps for DiskGraph { diff --git a/raphtory/src/disk_graph/graph_impl/temporal_properties_ops.rs b/raphtory/src/disk_graph/graph_impl/temporal_properties_ops.rs index 30b0e85816..0f4410e46b 100644 --- a/raphtory/src/disk_graph/graph_impl/temporal_properties_ops.rs +++ b/raphtory/src/disk_graph/graph_impl/temporal_properties_ops.rs @@ -4,7 +4,7 @@ use crate::{ }; use raphtory_api::core::storage::arc_str::ArcStr; -use super::DiskGraph; +use crate::disk_graph::DiskGraph; impl TemporalPropertiesOps for DiskGraph { fn get_temporal_prop_id(&self, name: &str) -> Option { diff --git a/raphtory/src/disk_graph/graph_impl/time_semantics.rs b/raphtory/src/disk_graph/graph_impl/time_semantics.rs index 2424193917..7cc8da9565 100644 --- a/raphtory/src/disk_graph/graph_impl/time_semantics.rs +++ b/raphtory/src/disk_graph/graph_impl/time_semantics.rs @@ -1,4 +1,3 @@ -use super::DiskGraph; use crate::{ core::{ entities::{edges::edge_ref::EdgeRef, LayerIds, VID}, @@ -12,7 +11,7 @@ use crate::{ }, view::{internal::TimeSemantics, BoxedIter}, }, - disk_graph::graph_impl::tprops::read_tprop_column, + disk_graph::{graph_impl::tprops::read_tprop_column, DiskGraph}, prelude::*, }; use itertools::Itertools; diff --git a/raphtory/src/disk_graph/mod.rs b/raphtory/src/disk_graph/mod.rs index 61e18c437b..657921dffc 100644 --- a/raphtory/src/disk_graph/mod.rs +++ b/raphtory/src/disk_graph/mod.rs @@ -1,3 +1,28 @@ +use crate::{ + core::entities::{ + properties::{graph_meta::GraphMeta, props::Meta}, + LayerIds, + }, + db::api::view::{internal::Immutable, DynamicGraph, IntoDynamic}, + disk_graph::graph_impl::{prop_conversion::make_node_properties_from_graph, ParquetLayerCols}, + prelude::{Graph, GraphViewOps}, +}; +use polars_arrow::{ + array::{PrimitiveArray, StructArray}, + datatypes::{ArrowDataType as DataType, Field}, +}; +use pometry_storage::{ + disk_hmap::DiskHashMap, graph::TemporalGraph, graph_fragment::TempColGraphFragment, + load::ExternalEdgeList, merge::merge_graph::merge_graphs, RAError, +}; +use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use std::{ + fmt::{Display, Formatter}, + path::{Path, PathBuf}, + sync::Arc, +}; + pub mod graph_impl; pub mod query; pub mod storage_interface; @@ -9,11 +34,319 @@ pub mod prelude { } #[derive(thiserror::Error, Debug)] -pub enum Error { +pub enum DiskGraphError { #[error("Raphtory Arrow Error: {0}")] RAError(#[from] pometry_storage::RAError), } +#[derive(Clone, Debug)] +pub struct DiskGraph { + pub(crate) inner: Arc, + node_meta: Arc, + edge_meta: Arc, + graph_props: Arc, + graph_dir: PathBuf, +} + +impl Serialize for DiskGraph { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let path = self.graph_dir.clone(); + path.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for DiskGraph { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let path = PathBuf::deserialize(deserializer)?; + let graph_result = DiskGraph::load_from_dir(&path).map_err(|err| { + serde::de::Error::custom(format!("Failed to load Diskgraph: {:?}", err)) + })?; + Ok(graph_result) + } +} + +impl Display for DiskGraph { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Diskgraph(num_nodes={}, num_temporal_edges={}", + self.count_nodes(), + self.count_temporal_edges() + ) + } +} + +impl AsRef for DiskGraph { + fn as_ref(&self) -> &TemporalGraph { + &self.inner + } +} + +impl Immutable for DiskGraph {} + +impl IntoDynamic for DiskGraph { + fn into_dynamic(self) -> DynamicGraph { + DynamicGraph::new(self) + } +} + +impl DiskGraph { + pub fn graph_dir(&self) -> &Path { + &self.graph_dir + } + fn layer_from_ids(&self, layer_ids: &LayerIds) -> Option { + match layer_ids { + LayerIds::One(layer_id) => Some(*layer_id), + LayerIds::None => None, + LayerIds::All => match self.inner.layers().len() { + 0 => None, + 1 => Some(0), + _ => todo!("multilayer edge views not yet supported in Diskgraph"), + }, + LayerIds::Multiple(ids) => match ids.len() { + 0 => None, + 1 => Some(ids[0]), + _ => todo!("multilayer edge views not yet supported in Diskgraph"), + }, + } + } + + pub fn make_simple_graph( + graph_dir: impl AsRef, + edges: &[(u64, u64, i64, f64)], + chunk_size: usize, + t_props_chunk_size: usize, + ) -> DiskGraph { + // unzip into 4 vectors + let (src, (dst, (time, weight))): (Vec<_>, (Vec<_>, (Vec<_>, Vec<_>))) = edges + .iter() + .map(|(a, b, c, d)| (*a, (*b, (*c, *d)))) + .unzip(); + + let edge_lists = vec![StructArray::new( + DataType::Struct(vec![ + Field::new("src", DataType::UInt64, false), + Field::new("dst", DataType::UInt64, false), + Field::new("time", DataType::Int64, false), + Field::new("weight", DataType::Float64, false), + ]), + vec![ + PrimitiveArray::from_vec(src).boxed(), + PrimitiveArray::from_vec(dst).boxed(), + PrimitiveArray::from_vec(time).boxed(), + PrimitiveArray::from_vec(weight).boxed(), + ], + None, + )]; + DiskGraph::load_from_edge_lists( + &edge_lists, + chunk_size, + t_props_chunk_size, + graph_dir.as_ref(), + 0, + 1, + 2, + ) + .expect("failed to create graph") + } + + /// Merge this graph with another `DiskGraph`. Note that both graphs should have nodes that are + /// sorted by their global ids or the resulting graph will be nonsense! + pub fn merge_by_sorted_gids( + &self, + other: &DiskGraph, + new_graph_dir: impl AsRef, + ) -> Result { + let graph_dir = new_graph_dir.as_ref(); + let inner = merge_graphs(graph_dir, &self.inner, &other.inner)?; + Ok(DiskGraph::new(inner, graph_dir.to_path_buf())) + } + + fn new(inner_graph: TemporalGraph, graph_dir: PathBuf) -> Self { + let node_meta = Meta::new(); + let mut edge_meta = Meta::new(); + let graph_meta = GraphMeta::new(); + + for node_type in inner_graph.node_types().into_iter().flatten() { + if let Some(node_type) = node_type { + node_meta.get_or_create_node_type_id(node_type); + } else { + panic!("Node types cannot be null"); + } + } + + for layer in inner_graph.layers() { + let edge_props_fields = layer.edges_data_type(); + + for (id, field) in edge_props_fields.iter().enumerate() { + let prop_name = &field.name; + let data_type = field.data_type(); + + let resolved_id = edge_meta + .resolve_prop_id(prop_name, data_type.into(), false) + .expect("Arrow data types should without failing"); + if id != resolved_id { + println!("Warning: Layers with different edge properties are not supported by the high-level apis on top of the disk_graph graph yet, edge properties will not be available to high-level apis"); + edge_meta = Meta::new(); + break; + } + } + } + + for l_name in inner_graph.layer_names() { + edge_meta.layer_meta().get_or_create_id(l_name); + } + + if let Some(props) = &inner_graph.node_properties().const_props { + let node_const_props_fields = props.prop_dtypes(); + for field in node_const_props_fields { + node_meta + .resolve_prop_id(&field.name, field.data_type().into(), true) + .expect("Initial resolve should not fail"); + } + } + + if let Some(props) = &inner_graph.node_properties().temporal_props { + let node_temporal_props_fields = props.prop_dtypes(); + for field in node_temporal_props_fields { + node_meta + .resolve_prop_id(&field.name, field.data_type().into(), false) + .expect("Initial resolve should not fail"); + } + } + + Self { + inner: Arc::new(inner_graph), + node_meta: Arc::new(node_meta), + edge_meta: Arc::new(edge_meta), + graph_props: Arc::new(graph_meta), + graph_dir, + } + } + + pub fn from_graph(graph: &Graph, graph_dir: impl AsRef) -> Result { + let inner_graph = TemporalGraph::from_graph(graph, graph_dir.as_ref(), || { + make_node_properties_from_graph(graph, graph_dir.as_ref()) + })?; + Ok(Self::new(inner_graph, graph_dir.as_ref().to_path_buf())) + } + + pub fn load_from_edge_lists( + edge_list: &[StructArray], + chunk_size: usize, + t_props_chunk_size: usize, + graph_dir: impl AsRef + Sync, + src_col_idx: usize, + dst_col_idx: usize, + time_col_idx: usize, + ) -> Result { + let path = graph_dir.as_ref().to_path_buf(); + let inner = TemporalGraph::from_sorted_edge_list( + graph_dir, + src_col_idx, + dst_col_idx, + time_col_idx, + chunk_size, + t_props_chunk_size, + edge_list, + )?; + Ok(Self::new(inner, path)) + } + + pub fn load_from_dir(graph_dir: impl AsRef) -> Result { + let path = graph_dir.as_ref().to_path_buf(); + let inner = TemporalGraph::new(graph_dir)?; + Ok(Self::new(inner, path)) + } + + pub fn load_from_parquets>( + graph_dir: P, + layer_parquet_cols: Vec, + node_properties: Option

, + chunk_size: usize, + t_props_chunk_size: usize, + read_chunk_size: Option, + concurrent_files: Option, + num_threads: usize, + node_type_col: Option<&str>, + ) -> Result { + let layered_edge_list: Vec> = layer_parquet_cols + .iter() + .map( + |ParquetLayerCols { + parquet_dir, + layer, + src_col, + dst_col, + time_col, + }| { + ExternalEdgeList::new(layer, parquet_dir.as_ref(), src_col, dst_col, time_col) + .expect("Failed to load events") + }, + ) + .collect::>(); + + let t_graph = TemporalGraph::from_parquets( + num_threads, + chunk_size, + t_props_chunk_size, + read_chunk_size, + concurrent_files, + graph_dir.as_ref(), + layered_edge_list, + node_properties.as_ref().map(|p| p.as_ref()), + node_type_col, + )?; + Ok(Self::new(t_graph, graph_dir.as_ref().to_path_buf())) + } + + pub fn filtered_layers_par<'a>( + &'a self, + layer_ids: &'a LayerIds, + ) -> impl ParallelIterator + 'a { + self.inner + .layers() + .par_iter() + .enumerate() + .filter(|(l_id, _)| layer_ids.contains(l_id)) + .map(|(_, layer)| layer) + } + + pub fn filtered_layers_iter<'a>( + &'a self, + layer_ids: &'a LayerIds, + ) -> impl Iterator + 'a { + self.inner + .layers() + .iter() + .enumerate() + .filter(|(l_id, _)| layer_ids.contains(l_id)) + .map(|(_, layer)| layer) + } + + pub fn from_layer(layer: TempColGraphFragment) -> Self { + let path = layer.graph_dir().to_path_buf(); + let global_ordering = layer.nodes_storage().gids().clone(); + + let global_order = DiskHashMap::from_sorted_dedup(global_ordering.clone()) + .expect("Failed to create global order"); + + let inner = TemporalGraph::new_from_layers( + global_ordering, + Arc::new(global_order), + vec![layer], + vec!["_default".to_string()], + ); + Self::new(inner, path) + } +} + #[cfg(test)] mod test { use std::path::Path; diff --git a/raphtory/src/disk_graph/query/executors/rayon2.rs b/raphtory/src/disk_graph/query/executors/rayon2.rs index 091bfe8861..97632caef6 100644 --- a/raphtory/src/disk_graph/query/executors/rayon2.rs +++ b/raphtory/src/disk_graph/query/executors/rayon2.rs @@ -2,12 +2,12 @@ use crate::{ core::{entities::VID, Direction}, db::{api::view::StaticGraphViewOps, graph::node::NodeView}, disk_graph::{ - graph_impl::DiskGraph, query::{ ast::{Hop, Query, Sink}, state::{HopState, StaticGraphHopState}, NodeSource, }, + DiskGraph, }, prelude::*, }; diff --git a/raphtory/src/disk_graph/query/mod.rs b/raphtory/src/disk_graph/query/mod.rs index 2627ceb4f4..9f84be3284 100644 --- a/raphtory/src/disk_graph/query/mod.rs +++ b/raphtory/src/disk_graph/query/mod.rs @@ -9,7 +9,7 @@ use crate::{ use self::state::HopState; use crate::core::storage::timeindex::TimeIndexOps; -use super::graph_impl::DiskGraph; +use crate::disk_graph::DiskGraph; use pometry_storage::nodes::Node; pub mod ast; @@ -38,7 +38,7 @@ impl NodeSource { NodeSource::ExternalIds(ext_ids) => Box::new( ext_ids .into_iter() - .filter_map(move |gid| graph.inner.find_node(&gid)), + .filter_map(move |gid| graph.inner.find_node(gid.as_ref())), ), } } diff --git a/raphtory/src/lib.rs b/raphtory/src/lib.rs index ad409ba5c5..477cd96dba 100644 --- a/raphtory/src/lib.rs +++ b/raphtory/src/lib.rs @@ -137,7 +137,7 @@ mod serialise { #[cfg(test)] mod test_utils { #[cfg(feature = "storage")] - use crate::disk_graph::graph_impl::DiskGraph; + use crate::disk_graph::DiskGraph; use crate::prelude::Graph; #[cfg(feature = "storage")] use tempfile::TempDir; diff --git a/raphtory/src/python/graph/disk_graph.rs b/raphtory/src/python/graph/disk_graph.rs index 3bfe1460a5..c86481ef9b 100644 --- a/raphtory/src/python/graph/disk_graph.rs +++ b/raphtory/src/python/graph/disk_graph.rs @@ -1,5 +1,4 @@ -use std::{io::Write, sync::Arc}; - +use super::io::pandas_loaders::*; use crate::{ arrow2::{ array::StructArray, @@ -14,10 +13,11 @@ use crate::{ graph::{edge::EdgeView, node::NodeView}, }, disk_graph::{ - graph_impl::{DiskGraph, ParquetLayerCols}, + graph_impl::ParquetLayerCols, query::{ast::Query, executors::rayon2, state::StaticGraphHopState, NodeSource}, - Error, + DiskGraph, DiskGraphError, }, + io::arrow::dataframe::DFView, prelude::{EdgeViewOps, GraphViewOps, NodeViewOps, TimeOps}, python::{ graph::{edge::PyDirection, graph::PyGraph, views::graph_view::PyGraphView}, @@ -30,14 +30,12 @@ use pometry_storage::GID; /// A columnar temporal graph. use pyo3::{ prelude::*, - types::{IntoPyDict, PyDict, PyList, PyString}, + types::{PyDict, PyList, PyString}, }; +use std::{io::Write, path::Path, sync::Arc}; -use super::io::pandas_loaders::*; -use crate::io::arrow::dataframe::DFView; - -impl From for PyErr { - fn from(value: Error) -> Self { +impl From for PyErr { + fn from(value: DiskGraphError) -> Self { adapt_err_value(&value) } } @@ -143,13 +141,16 @@ impl<'a> FromPyObject<'a> for ParquetLayerColsList<'a> { #[pymethods] impl PyGraph { /// save graph in disk_graph format and memory map the result - pub fn persist_as_disk_graph(&self, graph_dir: &str) -> Result { + pub fn persist_as_disk_graph(&self, graph_dir: &str) -> Result { self.graph.persist_as_disk_graph(graph_dir) } } #[pymethods] impl PyDiskGraph { + pub fn graph_dir(&self) -> &Path { + self.graph.graph_dir() + } #[staticmethod] #[pyo3(signature = (graph_dir, edge_df, src_col, dst_col, time_col))] pub fn load_from_pandas( @@ -160,14 +161,6 @@ impl PyDiskGraph { time_col: &str, ) -> Result { let graph: Result = Python::with_gil(|py| { - let size: usize = py - .eval( - "index.__len__()", - Some([("index", edge_df.getattr("index")?)].into_py_dict(py)), - None, - )? - .extract()?; - let cols_to_check = vec![src_col, dst_col, time_col]; let df_columns: Vec = edge_df.getattr("columns")?.extract()?; @@ -224,6 +217,16 @@ impl PyDiskGraph { }) } + /// Merge this graph with another `DiskGraph`. Note that both graphs should have nodes that are + /// sorted by their global ids or the resulting graph will be nonsense! + fn merge_by_sorted_gids( + &self, + other: &Self, + graph_dir: &str, + ) -> Result { + self.graph.merge_by_sorted_gids(&other.graph, graph_dir) + } + fn __repr__(&self) -> String { StructReprBuilder::new("DiskGraph") .add_field("number_of_nodes", self.graph.count_nodes()) diff --git a/raphtory/src/python/graph/mod.rs b/raphtory/src/python/graph/mod.rs index e32a05c680..008b470860 100644 --- a/raphtory/src/python/graph/mod.rs +++ b/raphtory/src/python/graph/mod.rs @@ -1,5 +1,3 @@ -#![allow(non_local_definitions)] - pub mod algorithm_result; #[cfg(feature = "storage")] pub mod disk_graph; diff --git a/raphtory/src/python/types/macros/trait_impl/node_state.rs b/raphtory/src/python/types/macros/trait_impl/node_state.rs index 5eb24d328f..8d3e3e1783 100644 --- a/raphtory/src/python/types/macros/trait_impl/node_state.rs +++ b/raphtory/src/python/types/macros/trait_impl/node_state.rs @@ -1,4 +1,3 @@ -#![allow(non_local_definitions)] use crate::{ core::entities::nodes::node_ref::NodeRef, db::{ diff --git a/raphtory/src/python/types/wrappers/document.rs b/raphtory/src/python/types/wrappers/document.rs index 6049268894..53644e5f3c 100644 --- a/raphtory/src/python/types/wrappers/document.rs +++ b/raphtory/src/python/types/wrappers/document.rs @@ -1,4 +1,3 @@ -#![allow(non_local_definitions)] use crate::core::{DocumentInput, Lifespan}; use itertools::Itertools; use pyo3::{exceptions::PyAttributeError, prelude::*, types::PyTuple};