diff --git a/README.md b/README.md index 653812e6..401af8ff 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@

- 📖 Docs | 🌐 Home | 💬 Chat + 📖 Docs | 🌐 Home | 💬 Chat

## Amadeus provides: diff --git a/amadeus-aws/src/cloudfront.rs b/amadeus-aws/src/cloudfront.rs index 59779546..1e206b3b 100644 --- a/amadeus-aws/src/cloudfront.rs +++ b/amadeus-aws/src/cloudfront.rs @@ -10,7 +10,7 @@ use std::{ }; use amadeus_core::{ - into_par_stream::IntoDistributedStream, par_stream::{DistributedStream, ParallelStream}, util::{DistParStream, ResultExpandIter}, Source + into_par_stream::IntoDistributedStream, par_stream::DistributedStream, util::{DistParStream, ResultExpandIter}, Source }; use amadeus_types::{DateTime, IpAddr, Url}; @@ -55,9 +55,11 @@ impl Source for Cloudfront { type Error = AwsError; #[cfg(not(feature = "doc"))] - type ParStream = impl ParallelStream>; + type ParStream = + impl amadeus_core::par_stream::ParallelStream>; #[cfg(feature = "doc")] - type ParStream = amadeus_core::util::ImplParallelStream>; + type ParStream = + DistParStream>>; #[cfg(not(feature = "doc"))] type DistStream = impl DistributedStream>; #[cfg(feature = "doc")] diff --git a/amadeus-aws/src/file.rs b/amadeus-aws/src/file.rs index ddf3f02e..af3d77e2 100644 --- a/amadeus-aws/src/file.rs +++ b/amadeus-aws/src/file.rs @@ -73,11 +73,10 @@ impl Directory for S3Directory { let file_name = path.pop().unwrap(); skip = skip && path.len() >= current_path.depth() - && path + && current_path.iter().eq(path .iter() .take(current_path.depth()) - .copied() - .eq(current_path.iter()); + .copied()); if skip { return false; } diff --git a/amadeus-commoncrawl/src/lib.rs b/amadeus-commoncrawl/src/lib.rs index ad6a0a54..c9a6eb45 100644 --- a/amadeus-commoncrawl/src/lib.rs +++ b/amadeus-commoncrawl/src/lib.rs @@ -11,7 +11,7 @@ use serde_closure::*; use std::{io, time}; use amadeus_core::{ - into_par_stream::IntoDistributedStream, par_stream::{DistributedStream, ParallelStream}, util::DistParStream, Source + into_par_stream::IntoDistributedStream, par_stream::DistributedStream, util::DistParStream, Source }; use amadeus_types::Webpage; @@ -59,9 +59,11 @@ impl Source for CommonCrawl { type Error = io::Error; #[cfg(not(feature = "doc"))] - type ParStream = impl ParallelStream>; + type ParStream = + impl amadeus_core::par_stream::ParallelStream>; #[cfg(feature = "doc")] - type ParStream = amadeus_core::util::ImplParallelStream>; + type ParStream = + DistParStream>>; #[cfg(not(feature = "doc"))] type DistStream = impl DistributedStream>; #[cfg(feature = "doc")] diff --git a/amadeus-core/Cargo.toml b/amadeus-core/Cargo.toml index e1319ccb..d9280ac3 100644 --- a/amadeus-core/Cargo.toml +++ b/amadeus-core/Cargo.toml @@ -36,3 +36,4 @@ serde_closure = "0.2" streaming_algorithms = "0.2" sum = { version = "0.1", features = ["serde"] } walkdir = "2.2" +widestring = "0.4" diff --git a/amadeus-core/src/file.rs b/amadeus-core/src/file.rs index f4018ff5..6327c4b9 100644 --- a/amadeus-core/src/file.rs +++ b/amadeus-core/src/file.rs @@ -1,3 +1,5 @@ +// TODO: Use WTF-8 rather than UTF-16 + #![allow(clippy::type_complexity)] mod local; @@ -6,8 +8,9 @@ use async_trait::async_trait; use futures::{future::BoxFuture, ready}; use pin_project::pin_project; use std::{ - convert::TryFrom, error::Error, fmt, future::Future, io, pin::Pin, sync::Arc, task::{Context, Poll} + convert::TryFrom, error::Error, ffi, fmt, future::Future, io, pin::Pin, sync::Arc, task::{Context, Poll} }; +use widestring::U16String; use crate::pool::ProcessSend; @@ -15,100 +18,169 @@ pub use local::LocalFile; const PAGE_SIZE: usize = 10 * 1024 * 1024; // `Reader` reads this many bytes at a time -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct PathBuf { - wide: bool, // for if the Vec is actually utf16 - components: Vec>, - file_name: Option>, +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] +pub struct OsString { + buf: U16String, } -impl PathBuf { +impl OsString { pub fn new() -> Self { Self { - wide: false, - components: Vec::new(), - file_name: None, + buf: U16String::new(), + } + } + pub fn to_string_lossy(&self) -> String { + self.buf.to_string_lossy() + } + pub fn display<'a>(&'a self) -> impl fmt::Display + 'a { + struct Display<'a>(&'a OsString); + impl<'a> fmt::Display for Display<'a> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.to_string_lossy().fmt(f) + } } + Display(self) } - pub fn new_wide() -> Self { +} +impl From> for OsString { + fn from(from: Vec) -> Self { + Self { + buf: String::from_utf8(from) + .expect("Not yet imlemented: Handling non-UTF-8") + .into(), + } // TODO + } +} +impl From for OsString { + fn from(from: String) -> Self { + Self { buf: from.into() } + } +} +impl From<&str> for OsString { + fn from(from: &str) -> Self { + Self { + buf: U16String::from_str(from), + } + } +} +impl From for OsString { + fn from(from: ffi::OsString) -> Self { + Self { + buf: U16String::from_os_str(&from), + } + } +} +impl From<&ffi::OsStr> for OsString { + fn from(from: &ffi::OsStr) -> Self { + Self { + buf: U16String::from_os_str(from), + } + } +} +pub struct InvalidOsString; +impl TryFrom for ffi::OsString { + type Error = InvalidOsString; + + fn try_from(from: OsString) -> Result { + Ok(from.buf.to_os_string()) // TODO: this is lossy but it should error + } +} +impl PartialEq> for OsString { + fn eq(&self, other: &Vec) -> bool { + self == &OsString::from(other.clone()) + } +} +impl PartialEq for OsString { + fn eq(&self, other: &String) -> bool { + self == &OsString::from(other.clone()) + } +} +impl PartialEq for OsString { + fn eq(&self, other: &str) -> bool { + self == &OsString::from(other) + } +} +impl PartialEq for OsString { + fn eq(&self, other: &ffi::OsString) -> bool { + self == &OsString::from(other.clone()) + } +} +impl PartialEq for OsString { + fn eq(&self, other: &ffi::OsStr) -> bool { + self == &OsString::from(other) + } +} +impl fmt::Debug for OsString { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.display()) + } +} + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] +pub struct PathBuf { + components: Vec, + file_name: Option, +} +impl PathBuf { + pub fn new() -> Self { Self { - wide: true, components: Vec::new(), file_name: None, } } pub fn push(&mut self, component: S) where - S: Into>, + S: Into, { assert!(self.file_name.is_none()); self.components.push(component.into()); } - pub fn pop(&mut self) -> Option> { + pub fn pop(&mut self) -> Option { assert!(self.file_name.is_none()); self.components.pop() } - pub fn last(&self) -> Option { + pub fn last(&self) -> Option<&OsString> { assert!(self.file_name.is_none()); - self.components - .last() - .map(|bytes| String::from_utf8_lossy(bytes).into_owned()) + self.components.last() } pub fn set_file_name(&mut self, file_name: Option) where - S: Into>, + S: Into, { self.file_name = file_name.map(Into::into); } pub fn is_file(&self) -> bool { self.file_name.is_some() } - pub fn file_name(&self) -> Option { - self.file_name - .as_ref() - .map(|file_name| String::from_utf8_lossy(file_name).into_owned()) + pub fn file_name(&self) -> Option<&OsString> { + self.file_name.as_ref() } pub fn depth(&self) -> usize { self.components.len() } - pub fn iter<'a>(&'a self) -> impl Iterator + 'a { - self.components - .iter() - .map(|bytes| String::from_utf8_lossy(bytes).into_owned()) - } -} -impl Default for PathBuf { - fn default() -> Self { - Self::new() + pub fn iter<'a>(&'a self) -> impl Iterator + 'a { + self.components.iter() } -} -impl fmt::Display for PathBuf { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut res: fmt::Result = self - .iter() - .map(|component| write!(f, "{}/", component)) - .collect(); - if let Some(file_name) = self.file_name() { - res = res.and_then(|()| write!(f, "{}", file_name)); + pub fn display<'a>(&'a self) -> impl fmt::Display + 'a { + struct Display<'a>(&'a PathBuf); + impl<'a> fmt::Display for Display<'a> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut res: fmt::Result = self + .0 + .iter() + .map(|component| write!(f, "{}/", component.to_string_lossy())) + .collect(); + if let Some(file_name) = self.0.file_name() { + res = res.and_then(|()| write!(f, "{}", file_name.to_string_lossy())); + } + res + } } - res + Display(self) } } impl fmt::Debug for PathBuf { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Debug::fmt(&self.to_string(), f) - } -} -impl PartialEq for PathBuf { - fn eq(&self, other: &str) -> bool { - let mut vec: Vec = Vec::new(); - for component in self.components.iter() { - vec.extend(component); - vec.push(b'/'); - } - if let Some(file_name) = &self.file_name { - vec.extend(file_name); - } - &*vec == other.as_bytes() + write!(f, "{}", self.display()) } } diff --git a/amadeus-core/src/file/local.rs b/amadeus-core/src/file/local.rs index b6e10175..614cbbc3 100644 --- a/amadeus-core/src/file/local.rs +++ b/amadeus-core/src/file/local.rs @@ -99,34 +99,16 @@ impl Directory for &Path { return true; } let mut path = path.strip_prefix(self).unwrap(); - let mut path_buf = if !cfg!(windows) { - super::PathBuf::new() - } else { - super::PathBuf::new_wide() - }; + let mut path_buf = super::PathBuf::new(); let mut file_name = None; - #[cfg(unix)] - let into = |osstr: &OsStr| -> Vec { - std::os::unix::ffi::OsStrExt::as_bytes(osstr).to_owned() - }; - #[cfg(windows)] - let into = |osstr: &OsStr| -> Vec { - std::os::windows::ffi::OsStrExt::encode_wide(osstr) - .flat_map(|char| { - let char = char.to_be(); - std::iter::once(u8::try_from(char >> 8).unwrap()) - .chain(std::iter::once(u8::try_from(char & 0xff).unwrap())) - }) - .collect() - }; if !is_dir { file_name = Some(path.file_name().unwrap()); path = path.parent().unwrap(); } for component in path { - path_buf.push(into(component)); + path_buf.push(component); } - path_buf.set_file_name(file_name.map(into)); + path_buf.set_file_name(file_name); f(&path_buf) }) .filter_map(|e| match e { diff --git a/amadeus-core/src/util.rs b/amadeus-core/src/util.rs index 668a2ea0..bfd58b21 100644 --- a/amadeus-core/src/util.rs +++ b/amadeus-core/src/util.rs @@ -9,7 +9,7 @@ use std::{ use crate::par_stream::{DistributedStream, ParallelStream}; #[cfg(feature = "doc")] use crate::{ - par_stream::{DistributedStream, StreamTask, StreamTaskAsync}, sink::Sink + par_stream::{StreamTask, StreamTaskAsync}, sink::Sink }; pub struct ResultExpand(pub Result); @@ -110,30 +110,40 @@ where } } -impl_par_dist_rename! { - #[cfg(feature = "doc")] - #[doc(hidden)] - pub struct ImplParallelStream(PhantomData T>); - #[cfg(feature = "doc")] - impl ImplParallelStream { - pub fn new(_drop: U) -> Self - where - U: ParallelStream, - { - Self(PhantomData) - } +#[cfg(feature = "doc")] +#[doc(hidden)] +pub struct ImplDistributedStream(PhantomData T>); +#[cfg(feature = "doc")] +impl ImplDistributedStream { + pub fn new(_drop: U) -> Self + where + U: DistributedStream, + { + Self(PhantomData) } - #[cfg(feature = "doc")] - impl ParallelStream for ImplParallelStream { - type Item = T; - type Task = ImplTask; +} +#[cfg(feature = "doc")] +impl DistributedStream for ImplDistributedStream { + type Item = T; + type Task = ImplTask; - fn size_hint(&self) -> (usize, Option) { - unreachable!() - } - fn next_task(&mut self) -> Option { - unreachable!() - } + fn size_hint(&self) -> (usize, Option) { + unreachable!() + } + fn next_task(&mut self) -> Option { + unreachable!() + } +} +#[cfg(feature = "doc")] +impl ParallelStream for ImplDistributedStream { + type Item = T; + type Task = ImplTask; + + fn size_hint(&self) -> (usize, Option) { + unreachable!() + } + fn next_task(&mut self) -> Option { + unreachable!() } } diff --git a/amadeus-parquet/src/internal/file/reader.rs b/amadeus-parquet/src/internal/file/reader.rs index 31238cf8..9652109e 100644 --- a/amadeus-parquet/src/internal/file/reader.rs +++ b/amadeus-parquet/src/internal/file/reader.rs @@ -207,6 +207,7 @@ impl SerializedFileReader { fn parse_metadata(buf: &mut Rc>>) -> Result { let buf = &mut *buf.borrow_mut(); let file_size = buf.len(); + println!("file_size: {}", file_size); if file_size < (FOOTER_SIZE as u64) { return Err(general_err!( "Invalid Parquet file. Size is smaller than footer" diff --git a/amadeus-parquet/src/lib.rs b/amadeus-parquet/src/lib.rs index c82918b4..aab7c0bd 100644 --- a/amadeus-parquet/src/lib.rs +++ b/amadeus-parquet/src/lib.rs @@ -21,7 +21,7 @@ use std::{ }; use amadeus_core::{ - file::{Directory, File, Page, Partition, PathBuf}, into_par_stream::IntoDistributedStream, par_stream::{DistributedStream, ParallelStream}, util::{DistParStream, ResultExpandIter}, Source + file::{Directory, File, Page, Partition, PathBuf}, into_par_stream::IntoDistributedStream, par_stream::DistributedStream, util::{DistParStream, ResultExpandIter}, Source }; pub use internal::record::ParquetData; @@ -69,9 +69,11 @@ where >; #[cfg(not(feature = "doc"))] - type ParStream = impl ParallelStream>; + type ParStream = + impl amadeus_core::par_stream::ParallelStream>; #[cfg(feature = "doc")] - type ParStream = amadeus_core::util::ImplParallelStream>; + type ParStream = + DistParStream>>; #[cfg(not(feature = "doc"))] type DistStream = impl DistributedStream>; #[cfg(feature = "doc")] @@ -95,7 +97,7 @@ where ) .flat_map(|page| { async move { - let mut buf = Vec::new(); + let mut buf = Vec::with_capacity(10 * 1024 * 1024); let reader = Page::reader(page); pin_mut!(reader); let buf = PassError::new( @@ -169,11 +171,12 @@ where .partitions_filter(|path| { let skip; if !path.is_file() { - let dir_name = path.last().unwrap(); + let dir_name = path.last().unwrap().to_string_lossy(); + skip = dir_name.starts_with('.') // Hidden files || (dir_name.starts_with('_') && !dir_name.contains('=')) // ARROW-1079: Filter out "private" directories starting with underscore; } else { - let file_name = path.file_name().unwrap(); + let file_name = path.file_name().unwrap().to_string_lossy(); let extension = file_name.rfind('.').map(|offset| &file_name[offset + 1..]); skip = file_name.starts_with('.') // Hidden files || file_name == "_metadata" || file_name == "_common_metadata" // Summary metadata diff --git a/amadeus-postgres/src/impls.rs b/amadeus-postgres/src/impls.rs index 896b0111..f900c76a 100644 --- a/amadeus-postgres/src/impls.rs +++ b/amadeus-postgres/src/impls.rs @@ -62,7 +62,7 @@ impl PostgresData for Bson { name.unwrap().fmt(f) } fn decode(_type_: &Type, _buf: Option<&[u8]>) -> Result> { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } } @@ -71,7 +71,7 @@ impl PostgresData for Json { name.unwrap().fmt(f) } fn decode(_type_: &Type, _buf: Option<&[u8]>) -> Result> { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } } @@ -80,7 +80,7 @@ impl PostgresData for Enum { name.unwrap().fmt(f) } fn decode(_type_: &Type, _buf: Option<&[u8]>) -> Result> { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } } @@ -89,7 +89,7 @@ impl PostgresData for Url { name.unwrap().fmt(f) } fn decode(_type_: &Type, _buf: Option<&[u8]>) -> Result> { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } } @@ -98,7 +98,7 @@ impl PostgresData for Webpage<'static> { name.unwrap().fmt(f) } fn decode(_type_: &Type, _buf: Option<&[u8]>) -> Result> { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } } @@ -107,7 +107,7 @@ impl PostgresData for IpAddr { name.unwrap().fmt(f) } fn decode(_type_: &Type, _buf: Option<&[u8]>) -> Result> { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } } @@ -116,16 +116,16 @@ impl PostgresData for Decimal { name.unwrap().fmt(f) } fn decode(_type_: &Type, _buf: Option<&[u8]>) -> Result> { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } } impl PostgresData for Group { fn query(_f: &mut fmt::Formatter, _name: Option<&Names<'_>>) -> fmt::Result { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } fn decode(_type_: &Type, _buf: Option<&[u8]>) -> Result> { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } } @@ -134,12 +134,12 @@ where T: PostgresData, { default fn query(_f: &mut fmt::Formatter, _name: Option<&Names<'_>>) -> fmt::Result { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } default fn decode( _type_: &Type, _buf: Option<&[u8]>, ) -> Result> { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } } /// BYTEA @@ -148,7 +148,7 @@ impl PostgresData for List { name.unwrap().fmt(f) } fn decode(_type_: &Type, _buf: Option<&[u8]>) -> Result> { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } } @@ -159,10 +159,10 @@ where S: BuildHasher + Clone + 'static, { fn query(_f: &mut fmt::Formatter, _name: Option<&Names<'_>>) -> fmt::Result { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } fn decode(_type_: &Type, _buf: Option<&[u8]>) -> Result> { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } } @@ -171,7 +171,7 @@ impl PostgresData for Date { name.unwrap().fmt(f) } fn decode(_type_: &Type, _buf: Option<&[u8]>) -> Result> { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } } @@ -193,7 +193,7 @@ impl PostgresData for Time { name.unwrap().fmt(f) } fn decode(_type_: &Type, _buf: Option<&[u8]>) -> Result> { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } } @@ -244,16 +244,16 @@ impl PostgresData for Timezone { name.unwrap().fmt(f) } fn decode(_type_: &Type, _buf: Option<&[u8]>) -> Result> { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } } impl PostgresData for Value { fn query(_f: &mut fmt::Formatter, _name: Option<&Names<'_>>) -> fmt::Result { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } fn decode(_type_: &Type, _buf: Option<&[u8]>) -> Result> { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } } @@ -270,7 +270,7 @@ macro_rules! array { fn decode( _type_: &Type, _buf: Option<&[u8]>, ) -> Result> { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } } @@ -283,7 +283,7 @@ macro_rules! array { fn decode( _type_: &Type, _buf: Option<&[u8]>, ) -> Result> { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } } )*}; @@ -295,10 +295,10 @@ macro_rules! tuple { ($len:tt $($t:ident $i:tt)*) => ( impl<$($t,)*> PostgresData for ($($t,)*) where $($t: PostgresData,)* { fn query(_f: &mut fmt::Formatter, _name: Option<&Names<'_>>) -> fmt::Result { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } fn decode(_type_: &Type, _buf: Option<&[u8]>) -> Result> { - unimplemented!() + todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/63") } } ); diff --git a/amadeus-postgres/src/lib.rs b/amadeus-postgres/src/lib.rs index 771c1dce..e7c33074 100644 --- a/amadeus-postgres/src/lib.rs +++ b/amadeus-postgres/src/lib.rs @@ -23,7 +23,7 @@ use std::{ }; use amadeus_core::{ - into_par_stream::IntoDistributedStream, par_stream::{DistributedStream, ParallelStream}, util::{DistParStream, IoError}, Source as DSource + into_par_stream::IntoDistributedStream, par_stream::DistributedStream, util::{DistParStream, IoError}, Source as DSource }; const MAGIC: &[u8] = b"PGCOPY\n\xff\r\n\0"; @@ -205,9 +205,11 @@ where type Error = PostgresError; #[cfg(not(feature = "doc"))] - type ParStream = impl ParallelStream>; + type ParStream = + impl amadeus_core::par_stream::ParallelStream>; #[cfg(feature = "doc")] - type ParStream = amadeus_core::util::ImplParallelStream>; + type ParStream = + DistParStream>>; #[cfg(not(feature = "doc"))] type DistStream = impl DistributedStream>; #[cfg(feature = "doc")] diff --git a/amadeus-serde/src/csv.rs b/amadeus-serde/src/csv.rs index 1c366bac..71fb4a8b 100644 --- a/amadeus-serde/src/csv.rs +++ b/amadeus-serde/src/csv.rs @@ -7,7 +7,7 @@ use std::{ }; use amadeus_core::{ - file::{File, Page, Partition}, into_par_stream::IntoDistributedStream, par_stream::{DistributedStream, ParallelStream}, util::{DistParStream, ResultExpandIter}, Source + file::{File, Page, Partition}, into_par_stream::IntoDistributedStream, par_stream::DistributedStream, util::{DistParStream, ResultExpandIter}, Source }; use super::{SerdeData, SerdeDeserializeGroup}; @@ -88,9 +88,11 @@ where >; #[cfg(not(feature = "doc"))] - type ParStream = impl ParallelStream>; + type ParStream = + impl amadeus_core::par_stream::ParallelStream>; #[cfg(feature = "doc")] - type ParStream = amadeus_core::util::ImplParallelStream>; + type ParStream = + DistParStream>>; #[cfg(not(feature = "doc"))] type DistStream = impl DistributedStream>; #[cfg(feature = "doc")] @@ -114,7 +116,7 @@ where ) .flat_map(|page| { async move { - let mut buf = Vec::new(); + let mut buf = Vec::with_capacity(10 * 1024 * 1024); let reader = Page::reader(page); pin_mut!(reader); let _ = reader diff --git a/amadeus-serde/src/json.rs b/amadeus-serde/src/json.rs index 1dfee4ef..92b85812 100644 --- a/amadeus-serde/src/json.rs +++ b/amadeus-serde/src/json.rs @@ -7,7 +7,7 @@ use std::{ }; use amadeus_core::{ - file::{File, Page, Partition}, into_par_stream::IntoDistributedStream, par_stream::{DistributedStream, ParallelStream}, util::{DistParStream, ResultExpandIter}, Source + file::{File, Page, Partition}, into_par_stream::IntoDistributedStream, par_stream::DistributedStream, util::{DistParStream, ResultExpandIter}, Source }; use super::{SerdeData, SerdeDeserialize}; @@ -47,9 +47,11 @@ where >; #[cfg(not(feature = "doc"))] - type ParStream = impl ParallelStream>; + type ParStream = + impl amadeus_core::par_stream::ParallelStream>; #[cfg(feature = "doc")] - type ParStream = amadeus_core::util::ImplParallelStream>; + type ParStream = + DistParStream>>; #[cfg(not(feature = "doc"))] type DistStream = impl DistributedStream>; #[cfg(feature = "doc")] @@ -73,7 +75,7 @@ where ) .flat_map(|page| { async move { - let mut buf = Vec::new(); + let mut buf = Vec::with_capacity(10 * 1024 * 1024); let reader = Page::reader(page); pin_mut!(reader); let buf = PassError::new(