-
Notifications
You must be signed in to change notification settings - Fork 89
/
Copy pathe2e_stream.rs
120 lines (101 loc) · 3.31 KB
/
e2e_stream.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use std::{net::SocketAddr, time::Duration};
use anyhow::Context;
use tempfile::TempDir;
use tokio::{io::AsyncReadExt, time::timeout};
use tracing::info;
use crate::{
create_torrent,
tests::test_util::{setup_test_logging, TestPeerMetadata},
AddTorrent, CreateTorrentOptions, Session,
};
use super::test_util::create_default_random_dir_with_torrents;
async fn e2e_stream() -> anyhow::Result<()> {
setup_test_logging();
let files = create_default_random_dir_with_torrents(1, 8192, Some("test_e2e_stream"));
let torrent = create_torrent(
files.path(),
CreateTorrentOptions {
name: None,
piece_length: Some(1024),
},
)
.await?;
let orig_content = std::fs::read(files.path().join("0.data")).unwrap();
let server_session = Session::new_with_opts(
files.path().into(),
crate::SessionOptions {
disable_dht: true,
peer_id: Some(TestPeerMetadata::good().as_peer_id()),
persistence: None,
listen_port_range: Some(16001..16100),
enable_upnp_port_forwarding: false,
..Default::default()
},
)
.await
.context("error creating server session")?;
info!("created server session");
timeout(
Duration::from_secs(5),
server_session
.add_torrent(
AddTorrent::from_bytes(torrent.as_bytes()?),
Some(crate::AddTorrentOptions {
paused: false,
output_folder: Some(files.path().to_str().unwrap().to_owned()),
overwrite: true,
..Default::default()
}),
)
.await?
.into_handle()
.unwrap()
.wait_until_completed(),
)
.await?
.context("error adding torrent")?;
info!("server torrent was completed");
let peer = SocketAddr::new(
"127.0.0.1".parse().unwrap(),
server_session.tcp_listen_port().unwrap(),
);
let client_dir = TempDir::with_prefix("test_e2e_stream_client")?;
let client_session = Session::new_with_opts(
client_dir.path().into(),
crate::SessionOptions {
disable_dht: true,
persistence: None,
peer_id: Some(TestPeerMetadata::good().as_peer_id()),
listen_port_range: None,
enable_upnp_port_forwarding: false,
..Default::default()
},
)
.await?;
info!("created client session");
let client_handle = client_session
.add_torrent(
AddTorrent::from_bytes(torrent.as_bytes()?),
Some(crate::AddTorrentOptions {
paused: false,
initial_peers: Some(vec![peer]),
..Default::default()
}),
)
.await?
.into_handle()
.unwrap();
client_handle.wait_until_initialized().await?;
info!("client torrent initialized, starting stream");
let mut stream = client_handle.stream(0)?;
let mut buf = Vec::<u8>::with_capacity(8192);
stream.read_to_end(&mut buf).await?;
if buf != orig_content {
panic!("contents differ")
}
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn test_e2e_stream() -> anyhow::Result<()> {
timeout(Duration::from_secs(10), e2e_stream()).await?
}