Skip to content

Commit

Permalink
rename files dist -> par
Browse files Browse the repository at this point in the history
  • Loading branch information
alecmocatta committed Jun 17, 2020
1 parent 69d0279 commit 08c5e82
Show file tree
Hide file tree
Showing 44 changed files with 37 additions and 60 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ name = "common_crawl"
required-features = ["commoncrawl", "constellation"]

[[test]]
name = "into_dist_stream_dist"
name = "into_par_stream_dist"
harness = false

[[test]]
Expand Down
2 changes: 1 addition & 1 deletion amadeus-aws/src/cloudfront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
};

use amadeus_core::{
dist_stream::{DistributedStream, ParallelStream}, into_dist_stream::IntoDistributedStream, util::{DistParStream, ResultExpandIter}, Source
into_par_stream::IntoDistributedStream, par_stream::{DistributedStream, ParallelStream}, util::{DistParStream, ResultExpandIter}, Source
};
use amadeus_types::{DateTime, IpAddr, Url};

Expand Down
2 changes: 1 addition & 1 deletion amadeus-commoncrawl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use serde_closure::*;
use std::{io, time};

use amadeus_core::{
dist_stream::{DistributedStream, ParallelStream}, into_dist_stream::IntoDistributedStream, util::DistParStream, Source
into_par_stream::IntoDistributedStream, par_stream::{DistributedStream, ParallelStream}, util::DistParStream, Source
};
use amadeus_types::Webpage;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::dist_stream::{DistributedStream, ParallelStream, StreamTask, StreamTaskAsync};
use crate::par_stream::{DistributedStream, ParallelStream, StreamTask, StreamTaskAsync};

mod collections;
mod iterator;
Expand Down
File renamed without changes.
10 changes: 5 additions & 5 deletions amadeus-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ macro_rules! impl_par_dist {
}
mod impl_par_dist {
pub use crate::{
combiner_dist_sink as combiner_par_sink, dist_pipe::DistributedPipe as ParallelPipe, dist_sink::{DistributedSink as ParallelSink, FromDistributedStream as FromParallelStream}, dist_stream::DistributedStream as ParallelStream, folder_dist_sink as folder_par_sink, pool::ProcessSend as Send
combiner_dist_sink as combiner_par_sink, folder_dist_sink as folder_par_sink, par_pipe::DistributedPipe as ParallelPipe, par_sink::{DistributedSink as ParallelSink, FromDistributedStream as FromParallelStream}, par_stream::DistributedStream as ParallelStream, pool::ProcessSend as Send
};
}

Expand Down Expand Up @@ -59,12 +59,12 @@ macro_rules! rename {
);
}

pub mod dist_pipe;
pub mod dist_sink;
pub mod dist_stream;
pub mod file;
pub mod into_dist_stream;
pub mod into_par_stream;
pub mod misc_serde;
pub mod par_pipe;
pub mod par_sink;
pub mod par_stream;
pub mod pool;
pub mod sink;
mod source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{

use crate::{pool::ProcessSend, sink::Sink};

use super::{dist_sink::*, dist_stream::*};
use super::{par_sink::*, par_stream::*};

#[must_use]
pub trait PipeTask<Source> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{

use crate::pool::ProcessSend;

use super::dist_pipe::*;
use super::par_pipe::*;

pub use self::{
all::*, any::*, collect::*, combine::*, combiner::*, count::*, fold::*, folder::*, for_each::*, max::*, sample::*, sum::*, tuple::*
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ mod macros {
#[macro_export]
macro_rules! combiner_dist_sink {
($combiner:ty, $self:ident, $init:expr) => {
type Output = <Self::ReduceC as $crate::dist_sink::Reducer>::Output;
type Output = <Self::ReduceC as $crate::par_sink::Reducer>::Output;
type Pipe = I;
type ReduceAFactory = FolderSyncReducerFactory<I::Item, $combiner>;
type ReduceBFactory = FolderSyncReducerFactory<<Self::ReduceA as $crate::dist_sink::Reducer>::Output, $combiner>;
type ReduceBFactory = FolderSyncReducerFactory<<Self::ReduceA as $crate::par_sink::Reducer>::Output, $combiner>;
type ReduceA = FolderSyncReducer<I::Item, $combiner>;
type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::dist_sink::Reducer>::Output, $combiner>;
type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::dist_sink::Reducer>::Output, $combiner>;
type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer>::Output, $combiner>;
type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::par_sink::Reducer>::Output, $combiner>;

fn reducers($self) -> (I, Self::ReduceAFactory, Self::ReduceBFactory, Self::ReduceC) {
let init = $init;
Expand All @@ -26,11 +26,11 @@ mod macros {
#[macro_export]
macro_rules! combiner_par_sink {
($combiner:ty, $self:ident, $init:expr) => {
type Output = <Self::ReduceC as $crate::dist_sink::Reducer>::Output;
type Output = <Self::ReduceC as $crate::par_sink::Reducer>::Output;
type Pipe = I;
type ReduceAFactory = FolderSyncReducerFactory<I::Item, $combiner>;
type ReduceA = FolderSyncReducer<I::Item, $combiner>;
type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::dist_sink::Reducer>::Output, $combiner>;
type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer>::Output, $combiner>;

fn reducers($self) -> (I, Self::ReduceAFactory, Self::ReduceC) {
let init = $init;
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ mod macros {
#[macro_export]
macro_rules! folder_dist_sink {
($folder_a:ty, $folder_b:ty, $self:ident, $init_a:expr, $init_b:expr) => {
type Output = <Self::ReduceC as $crate::dist_sink::Reducer>::Output;
type Output = <Self::ReduceC as $crate::par_sink::Reducer>::Output;
type Pipe = I;
type ReduceAFactory = FolderSyncReducerFactory<I::Item, $folder_a>;
type ReduceBFactory = FolderSyncReducerFactory<<Self::ReduceA as $crate::dist_sink::Reducer>::Output, $folder_b>;
type ReduceBFactory = FolderSyncReducerFactory<<Self::ReduceA as $crate::par_sink::Reducer>::Output, $folder_b>;
type ReduceA = FolderSyncReducer<I::Item, $folder_a>;
type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::dist_sink::Reducer>::Output, $folder_b>;
type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::dist_sink::Reducer>::Output, $folder_b>;
type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer>::Output, $folder_b>;
type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::par_sink::Reducer>::Output, $folder_b>;

fn reducers($self) -> (I, Self::ReduceAFactory, Self::ReduceBFactory, Self::ReduceC) {
let init_a = $init_a;
Expand All @@ -39,11 +39,11 @@ mod macros {
#[macro_export]
macro_rules! folder_par_sink {
($folder_a:ty, $folder_b:ty, $self:ident, $init_a:expr, $init_b:expr) => {
type Output = <Self::ReduceC as $crate::dist_sink::Reducer>::Output;
type Output = <Self::ReduceC as $crate::par_sink::Reducer>::Output;
type Pipe = I;
type ReduceAFactory = FolderSyncReducerFactory<I::Item, $folder_a>;
type ReduceA = FolderSyncReducer<I::Item, $folder_a>;
type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::dist_sink::Reducer>::Output, $folder_b>;
type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer>::Output, $folder_b>;

fn reducers($self) -> (I, Self::ReduceAFactory, Self::ReduceC) {
let init_a = $init_a;
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ use std::{
};

use crate::{
into_dist_stream::IntoDistributedStream, pool::{ProcessPool, ProcessSend, ThreadPool}, sink::{Sink, SinkMap}, util::type_coerce
into_par_stream::IntoDistributedStream, pool::{ProcessPool, ProcessSend, ThreadPool}, sink::{Sink, SinkMap}, util::type_coerce
};

pub use self::{
chain::*, cloned::*, filter::*, flat_map::*, identity::*, inspect::*, map::*, update::*
};

use super::{dist_pipe::*, dist_sink::*};
use super::{par_pipe::*, par_sink::*};

#[must_use]
pub trait StreamTask {
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion amadeus-core/src/source.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::error::Error;

use crate::{
dist_sink::{DistributedSink, ParallelSink}, dist_stream::{DistributedStream, ParallelStream}
par_sink::{DistributedSink, ParallelSink}, par_stream::{DistributedStream, ParallelStream}
};

pub trait Source {
Expand Down
4 changes: 2 additions & 2 deletions amadeus-core/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use std::{
any::type_name, error, fmt, io, marker::PhantomData, pin::Pin, sync::Arc, task::{Context, Poll}
};

use crate::dist_stream::{DistributedStream, ParallelStream};
use crate::par_stream::{DistributedStream, ParallelStream};
#[cfg(feature = "doc")]
use crate::{
dist_stream::{DistributedStream, StreamTask, StreamTaskAsync}, sink::Sink
par_stream::{DistributedStream, StreamTask, StreamTaskAsync}, sink::Sink
};

pub struct ResultExpand<T, E>(pub Result<T, E>);
Expand Down
2 changes: 1 addition & 1 deletion amadeus-parquet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
};

use amadeus_core::{
dist_stream::{DistributedStream, ParallelStream}, file::{Directory, File, Page, Partition, PathBuf}, into_dist_stream::IntoDistributedStream, util::{DistParStream, ResultExpandIter}, Source
file::{Directory, File, Page, Partition, PathBuf}, into_par_stream::IntoDistributedStream, par_stream::{DistributedStream, ParallelStream}, util::{DistParStream, ResultExpandIter}, Source
};

pub use internal::record::ParquetData;
Expand Down
2 changes: 1 addition & 1 deletion amadeus-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
};

use amadeus_core::{
dist_stream::{DistributedStream, ParallelStream}, into_dist_stream::IntoDistributedStream, util::{DistParStream, IoError}, Source as DSource
into_par_stream::IntoDistributedStream, par_stream::{DistributedStream, ParallelStream}, util::{DistParStream, IoError}, Source as DSource
};

const MAGIC: &[u8] = b"PGCOPY\n\xff\r\n\0";
Expand Down
2 changes: 1 addition & 1 deletion amadeus-serde/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use amadeus_core::{
dist_stream::{DistributedStream, ParallelStream}, file::{File, Page, Partition}, into_dist_stream::IntoDistributedStream, util::{DistParStream, ResultExpandIter}, Source
file::{File, Page, Partition}, into_par_stream::IntoDistributedStream, par_stream::{DistributedStream, ParallelStream}, util::{DistParStream, ResultExpandIter}, Source
};

use super::{SerdeData, SerdeDeserializeGroup};
Expand Down
2 changes: 1 addition & 1 deletion amadeus-serde/src/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use amadeus_core::{
dist_stream::{DistributedStream, ParallelStream}, file::{File, Page, Partition}, into_dist_stream::IntoDistributedStream, util::{DistParStream, ResultExpandIter}, Source
file::{File, Page, Partition}, into_par_stream::IntoDistributedStream, par_stream::{DistributedStream, ParallelStream}, util::{DistParStream, ResultExpandIter}, Source
};

use super::{SerdeData, SerdeDeserialize};
Expand Down
23 changes: 0 additions & 23 deletions src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,26 +168,3 @@ impl From<amadeus_aws::CloudfrontRow> for CloudfrontRow {
}
}
}

// impl<T: ?Sized> Data for T where T: PartialEq + Eq + Clone + 'static {}

// pub trait DataSource: crate::dist_stream::DistributedStream<Item = <Self as DataSource>::Itemm> {
// type Itemm: Data;
// }

// impl<T: ?Sized> DataSource for T where T: crate::dist_stream::DistributedStream, <T as crate::dist_stream::DistributedStream>::Item: Data {
// type Itemm = <T as crate::dist_stream::DistributedStream>::Item;
// }

// pub trait DataSource
// where
// Self: amadeus_core::dist_stream::DistributedStream<Item = <Self as DataSource>::Item>,
// <Self as amadeus_core::dist_stream::DistributedStream>::Item: Data,
// {
// type Item;
// }

// impl<T: ?Sized> DataSource for T where T: crate::dist_stream::DistributedStream, <T as crate::dist_stream::DistributedStream>::Item: Data {
// // type Itemm = <T as crate::dist_stream::DistributedStream>::Item;
// type Item = <T as crate::dist_stream::DistributedStream>::Item;
// }
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ pub mod data;
pub mod pool;
pub mod source;

pub use amadeus_core::{dist_pipe, dist_sink, dist_stream, into_dist_stream};
pub use amadeus_core::{into_par_stream, par_pipe, par_sink, par_stream};

#[doc(inline)]
pub use crate::{
data::{Data, List, Value}, dist_sink::{FromDistributedStream, FromParallelStream}, dist_stream::{DistributedStream, ParallelStream}, into_dist_stream::{IntoDistributedStream, IntoParallelStream, IteratorExt}, source::Source
data::{Data, List, Value}, into_par_stream::{IntoDistributedStream, IntoParallelStream, IteratorExt}, par_sink::{FromDistributedStream, FromParallelStream}, par_stream::{DistributedStream, ParallelStream}, source::Source
};

pub mod dist {
Expand All @@ -65,7 +65,7 @@ pub mod dist {
pub use super::super::{
data, data::{
Date, DateTime, DateTimeWithoutTimezone, DateWithoutTimezone, Decimal, Downcast, DowncastFrom, Enum, Group, Time, TimeWithoutTimezone, Timezone
}, dist_pipe::DistributedPipe, dist_stream::Identity, pool::ThreadPool, source::*, Data, DistributedStream, FromDistributedStream, IntoDistributedStream, IteratorExt, List, Value
}, par_pipe::DistributedPipe, par_stream::Identity, pool::ThreadPool, source::*, Data, DistributedStream, FromDistributedStream, IntoDistributedStream, IteratorExt, List, Value
};
#[doc(no_inline)]
pub use serde_closure::{Fn, FnMut, FnOnce};
Expand All @@ -84,7 +84,7 @@ pub mod prelude {
pub use super::{
data, data::{
Date, DateTime, DateTimeWithoutTimezone, DateWithoutTimezone, Decimal, Downcast, DowncastFrom, Enum, Group, Time, TimeWithoutTimezone, Timezone
}, dist_pipe::ParallelPipe, dist_stream::Identity, pool::ThreadPool, source::*, Data, FromParallelStream, IntoParallelStream, IteratorExt, List, ParallelStream, Value
}, par_pipe::ParallelPipe, par_stream::Identity, pool::ThreadPool, source::*, Data, FromParallelStream, IntoParallelStream, IteratorExt, List, ParallelStream, Value
};
#[doc(no_inline)]
pub use serde_closure::{Fn, FnMut, FnOnce};
Expand Down
2 changes: 1 addition & 1 deletion src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};

use crate::{
dist_sink::{DistributedSink, ParallelSink}, dist_stream::{DistributedStream, ParallelStream, StreamTask, StreamTaskAsync}
par_sink::{DistributedSink, ParallelSink}, par_stream::{DistributedStream, ParallelStream, StreamTask, StreamTaskAsync}
};

#[cfg(feature = "aws")]
Expand Down
File renamed without changes.
File renamed without changes.

0 comments on commit 08c5e82

Please sign in to comment.