Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various improvements for disk graph #1866

Merged
merged 15 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ numpy = "0.22.1"
itertools = "0.13.0"
rand = "0.8.5"
rayon = "1.8.1"
roaring = "0.10.6"
sorted_vector_map = "0.2.0"
tokio = { version = "1.36.0", features = ["full"] }
once_cell = "1.19.0"
Expand Down
2 changes: 1 addition & 1 deletion pometry-storage-private
182 changes: 182 additions & 0 deletions raphtory-api/src/core/entities/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use super::input::input_node::parse_u64_strict;
use bytemuck::{Pod, Zeroable};
use edges::edge_ref::EdgeRef;
use num_traits::ToPrimitive;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::{
borrow::Cow,
fmt::{Display, Formatter},
sync::Arc,
};

pub mod edges;
Expand Down Expand Up @@ -311,3 +314,182 @@ impl<'a> GidRef<'a> {
}
}
}

#[derive(Clone, Debug)]
pub enum LayerIds {
None,
All,
One(usize),
Multiple(Multiple),
}

#[derive(Clone, Debug, Default)]
pub struct Multiple(pub Arc<[usize]>);

impl Multiple {
#[inline]
pub fn binary_search(&self, pos: &usize) -> Option<usize> {
self.0.binary_search(pos).ok()
}

#[inline]
pub fn into_iter(&self) -> impl Iterator<Item = usize> {
let ids = self.0.clone();
(0..ids.len()).map(move |i| ids[i])
}

#[inline]
pub fn iter(&self) -> impl Iterator<Item = usize> + '_ {
self.0.iter().copied()
}

#[inline]
pub fn find(&self, id: usize) -> Option<usize> {
self.0.get(id).copied()
}

#[inline]
pub fn par_iter(&self) -> impl rayon::iter::ParallelIterator<Item = usize> {
let bit_vec = self.0.clone();
(0..bit_vec.len()).into_par_iter().map(move |i| bit_vec[i])
}

#[inline]
pub fn len(&self) -> usize {
self.0.len()
}
}

impl FromIterator<usize> for Multiple {
fn from_iter<I: IntoIterator<Item = usize>>(iter: I) -> Self {
Multiple(iter.into_iter().collect())
}
}

impl From<Vec<usize>> for Multiple {
fn from(v: Vec<usize>) -> Self {
v.into_iter().collect()
}
}

#[cfg(test)]
mod test {
use crate::core::entities::Multiple;

#[test]
fn empty_bit_multiple() {
let bm = super::Multiple::default();
let actual = bm.into_iter().collect::<Vec<_>>();
let expected: Vec<usize> = vec![];
assert_eq!(actual, expected);
}

#[test]
fn set_one() {
let bm: Multiple = [1].into_iter().collect();
let actual = bm.into_iter().collect::<Vec<_>>();
assert_eq!(actual, vec![1usize]);
}

#[test]
fn set_two() {
let bm: Multiple = [1, 67].into_iter().collect();

let actual = bm.into_iter().collect::<Vec<_>>();
assert_eq!(actual, vec![1usize, 67]);
}
}

impl LayerIds {
pub fn find(&self, layer_id: usize) -> Option<usize> {
match self {
LayerIds::All => Some(layer_id),
LayerIds::One(id) => {
if *id == layer_id {
Some(layer_id)
} else {
None
}
}
LayerIds::Multiple(ids) => ids.binary_search(&layer_id).map(|_| layer_id),
LayerIds::None => None,
}
}

pub fn intersect(&self, other: &LayerIds) -> LayerIds {
match (self, other) {
(LayerIds::None, _) => LayerIds::None,
(_, LayerIds::None) => LayerIds::None,
(LayerIds::All, other) => other.clone(),
(this, LayerIds::All) => this.clone(),
(LayerIds::One(id), other) => {
if other.contains(id) {
LayerIds::One(*id)
} else {
LayerIds::None
}
}
(LayerIds::Multiple(ids), other) => {
let ids: Vec<usize> = ids.iter().filter(|id| other.contains(id)).collect();
match ids.len() {
0 => LayerIds::None,
1 => LayerIds::One(ids[0]),
_ => LayerIds::Multiple(ids.into()),
}
}
}
}

pub fn constrain_from_edge(&self, e: EdgeRef) -> Cow<LayerIds> {
match e.layer() {
None => Cow::Borrowed(self),
Some(l) => self
.find(l)
.map(|id| Cow::Owned(LayerIds::One(id)))
.unwrap_or(Cow::Owned(LayerIds::None)),
}
}

pub fn contains(&self, layer_id: &usize) -> bool {
self.find(*layer_id).is_some()
}

pub fn is_none(&self) -> bool {
matches!(self, LayerIds::None)
}
}

impl From<Vec<usize>> for LayerIds {
fn from(mut v: Vec<usize>) -> Self {
match v.len() {
0 => LayerIds::All,
1 => LayerIds::One(v[0]),
_ => {
v.sort_unstable();
v.dedup();
LayerIds::Multiple(v.into())
}
}
}
}

impl<const N: usize> From<[usize; N]> for LayerIds {
fn from(v: [usize; N]) -> Self {
match v.len() {
0 => LayerIds::All,
1 => LayerIds::One(v[0]),
_ => {
let mut v = v.to_vec();
v.sort_unstable();
v.dedup();
LayerIds::Multiple(v.into())
}
}
}
}

impl From<usize> for LayerIds {
fn from(id: usize) -> Self {
LayerIds::One(id)
}
}
8 changes: 8 additions & 0 deletions raphtory-api/src/core/utils/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,11 @@ pub fn global_debug_logger() {
pub fn global_trace_logger() {
init_global_logger("TRACE".to_string())
}

pub fn sysout_debug() {
tracing_subscriber::fmt::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_target(false)
.with_span_events(FmtSpan::ENTER | FmtSpan::CLOSE)
.init();
}
1 change: 1 addition & 0 deletions raphtory-cypher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ arrow.workspace = true
arrow-buffer.workspace = true
arrow-schema.workspace = true
arrow-array.workspace = true
tracing-subscriber.workspace = true

pest.workspace = true
pest_derive.workspace = true
Expand Down
6 changes: 3 additions & 3 deletions raphtory-cypher/examples/raphtory_cypher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod cypher {
use futures::{stream, StreamExt};
use raphtory::{
disk_graph::{graph_impl::ParquetLayerCols, DiskGraphStorage},
logging::global_info_logger,
logging::{global_info_logger, sysout_debug},
};
use raphtory_cypher::{run_cypher, run_cypher_to_streams, run_sql};
use serde::{de::DeserializeOwned, Deserialize};
Expand Down Expand Up @@ -126,11 +126,11 @@ mod cypher {

// #[tokio::main]
pub async fn main() {
global_info_logger();
let args = Args::parse();

match args {
Args::Query(args) => {
global_info_logger();
let graph =
DiskGraphStorage::load_from_dir(&args.graph_dir).expect("Failed to load graph");

Expand All @@ -145,7 +145,6 @@ mod cypher {

let now = std::time::Instant::now();
let batches = df.collect().await.unwrap();
global_info_logger();
info!("Query execution time: {:?}", now.elapsed());
print_batches(&batches).expect("Failed to print batches");
} else {
Expand All @@ -161,6 +160,7 @@ mod cypher {
}

Args::Load(args) => {
sysout_debug();
let layers = args.layers;
let layer_parquet_cols = (0..layers.len())
.map(|layer_id| {
Expand Down
1 change: 1 addition & 0 deletions raphtory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pometry-storage = { workspace = true, optional = true }

prost = { workspace = true, optional = true }
prost-types = { workspace = true, optional = true }
roaring ={ workspace = true }

[dev-dependencies]
csv = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions raphtory/src/algorithms/community_detection/louvain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ mod test {
test_storage,
};
use proptest::prelude::*;
#[cfg(feature = "io")]
use tracing::info;

#[cfg(feature = "io")]
Expand Down
Loading