-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathcommoncrawl_dist.rs
75 lines (67 loc) · 2.4 KB
/
commoncrawl_dist.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
//! # Distributed parsing and analysis of 3.25 billion webpages
//!
//! This example finds the most prevalent 100 IP addresses in the 3.25 billion
//! page, 255 TiB Common Crawl dataset.
//!
//! The download, parsing and analysis is farmed out to a process pool
//! leveraging Amadeus, the distributed data processing library for Rust.
//!
//! ## Usage
//!
//! ```bash
//! cargo run --example common_crawl --release -- 16
//! ```
//!
//! where `16` is the number of processes with which to initialize the pool.
//! Defaults to the maximum available if omitted.
//!
//! It can also be run distributed on a [`constellation`](https://github.com/constellation-rs/constellation)
//! cluster like so:
//!
//! ```bash
//! cargo deploy 10.0.0.1 --example common_crawl --release -- 1000
//! ```
//!
//! where `10.0.0.1` is the address of the master. See [here](https://github.com/constellation-rs/constellation)
//! for instructions on setting up the cluster.
use amadeus::{data::Webpage, dist::prelude::*};
use constellation::{init, Resources};
use std::env;
#[allow(unreachable_code)]
fn main() {
init(Resources::default());
tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.unwrap()
.block_on(async {
return; // TODO: runs for a long time
// Accept the number of processes at the command line, defaulting to the maximum available
let processes = env::args().nth(1).and_then(|arg| arg.parse::<usize>().ok());
let pool = ProcessPool::new(processes, None, None, Resources::default()).unwrap();
let webpages = CommonCrawl::new("CC-MAIN-2020-24").await.unwrap();
let (count, (most_frequent_ips, most_diverse_ips)) = webpages
.dist_stream()
.map(FnMut!(|webpage: Result<_, _>| webpage.unwrap()))
.fork(
&pool,
Identity
.map(FnMut!(|webpage: Webpage<'static>| webpage))
.count(),
(
Identity
.map(FnMut!(|webpage: &Webpage<'static>| webpage.ip))
.most_frequent(100, 0.99, 2.0 / 1000.0),
Identity
.map(FnMut!(|webpage: &Webpage<'static>| {
(webpage.ip, webpage.url.host_str().unwrap().to_owned())
}))
.most_distinct(100, 0.99, 2.0 / 1000.0, 0.0808),
),
)
.await;
println!("Of the {} webpages processed, these are the most prevalent host IP addresses: {:?}", count, most_frequent_ips);
println!("and these are the IP addresses hosting the most distinct domains: {:?}", most_diverse_ips);
})
}