Skip to content

Commit

Permalink
feat: optimize mutex in dynconfig and add keepalive to hyper proxy (d…
Browse files Browse the repository at this point in the history
…ragonflyoss#513)

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored May 30, 2024
1 parent 71a8892 commit e2a907a
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 53 deletions.
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.1.72"
version = "0.1.73"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.72" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.72" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.72" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.72" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.72" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.72" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.72" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.73" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.73" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.73" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.73" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.73" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.73" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.73" }
thiserror = "1.0"
dragonfly-api = "2.0.114"
reqwest = { version = "0.11.27", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }
Expand Down
1 change: 0 additions & 1 deletion dragonfly-client/src/grpc/dfdaemon_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
error!("missing content length in the response");
return Err(Status::internal("missing content length in the response"));
};

info!("content length: {}", content_length);

// Download's range priority is higher than the request header's range.
Expand Down
78 changes: 46 additions & 32 deletions dragonfly-client/src/grpc/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ impl SchedulerClient {
pub async fn announce_peer(
&self,
task_id: &str,
peer_id: &str,
request: impl tonic::IntoStreamingRequest<Message = AnnouncePeerRequest>,
) -> Result<tonic::Response<tonic::codec::Streaming<AnnouncePeerResponse>>> {
let response = self
.client(task_id.to_string())
.client(task_id, Some(peer_id))
.await?
.announce_peer(request)
.await?;
Expand All @@ -99,19 +100,15 @@ impl SchedulerClient {
#[instrument(skip(self))]
pub async fn stat_peer(&self, task_id: &str, request: StatPeerRequest) -> Result<Peer> {
let request = Self::make_request(request);
let response = self
.client(task_id.to_string())
.await?
.stat_peer(request)
.await?;
let response = self.client(task_id, None).await?.stat_peer(request).await?;
Ok(response.into_inner())
}

// leave_peer tells the scheduler that the peer is leaving.
#[instrument(skip(self))]
pub async fn leave_peer(&self, task_id: &str, request: LeavePeerRequest) -> Result<()> {
let request = Self::make_request(request);
self.client(task_id.to_string())
self.client(task_id, None)
.await?
.leave_peer(request)
.await?;
Expand All @@ -127,7 +124,7 @@ impl SchedulerClient {
) -> Result<ExchangePeerResponse> {
let request = Self::make_request(request);
let response = self
.client(task_id.to_string())
.client(task_id, None)
.await?
.exchange_peer(request)
.await?;
Expand All @@ -138,19 +135,15 @@ impl SchedulerClient {
#[instrument(skip(self))]
pub async fn stat_task(&self, task_id: &str, request: StatTaskRequest) -> Result<Task> {
let request = Self::make_request(request);
let response = self
.client(task_id.to_string())
.await?
.stat_task(request)
.await?;
let response = self.client(task_id, None).await?.stat_task(request).await?;
Ok(response.into_inner())
}

// leave_task tells the scheduler that the task is leaving.
#[instrument(skip(self))]
pub async fn leave_task(&self, task_id: &str, request: LeaveTaskRequest) -> Result<()> {
let request = Self::make_request(request);
self.client(task_id.to_string())
self.client(task_id, None)
.await?
.leave_task(request)
.await?;
Expand All @@ -162,7 +155,10 @@ impl SchedulerClient {
pub async fn init_announce_host(&self, request: AnnounceHostRequest) -> Result<()> {
let mut join_set = JoinSet::new();
let available_scheduler_addrs = self.available_scheduler_addrs.read().await;
for available_scheduler_addr in available_scheduler_addrs.iter() {
let available_scheduler_addrs_clone = available_scheduler_addrs.clone();
drop(available_scheduler_addrs);

for available_scheduler_addr in available_scheduler_addrs_clone.iter() {
let request = Self::make_request(request.clone());
async fn announce_host(
addr: SocketAddr,
Expand Down Expand Up @@ -214,7 +210,10 @@ impl SchedulerClient {
// Announce the host to the scheduler.
let mut join_set = JoinSet::new();
let available_scheduler_addrs = self.available_scheduler_addrs.read().await;
for available_scheduler_addr in available_scheduler_addrs.iter() {
let available_scheduler_addrs_clone = available_scheduler_addrs.clone();
drop(available_scheduler_addrs);

for available_scheduler_addr in available_scheduler_addrs_clone.iter() {
let request = Self::make_request(request.clone());
async fn announce_host(
addr: SocketAddr,
Expand Down Expand Up @@ -265,7 +264,10 @@ impl SchedulerClient {
// Leave the host from the scheduler.
let mut join_set = JoinSet::new();
let available_scheduler_addrs = self.available_scheduler_addrs.read().await;
for available_scheduler_addr in available_scheduler_addrs.iter() {
let available_scheduler_addrs_clone = available_scheduler_addrs.clone();
drop(available_scheduler_addrs);

for available_scheduler_addr in available_scheduler_addrs_clone.iter() {
let request = Self::make_request(request.clone());
async fn leave_host(
addr: SocketAddr,
Expand Down Expand Up @@ -309,16 +311,21 @@ impl SchedulerClient {

// client gets the grpc client of the scheduler.
#[instrument(skip(self))]
async fn client(&self, key: String) -> Result<SchedulerGRPCClient<Channel>> {
async fn client(
&self,
task_id: &str,
peer_id: Option<&str>,
) -> Result<SchedulerGRPCClient<Channel>> {
// Update scheduler addresses of the client.
self.update_available_scheduler_addrs().await?;

// Get the scheduler address from the hashring.
let addr = self.hashring.read().await;
let addr = addr
.get(&key[0..5].to_string())
.ok_or_else(|| Error::HashRing(key.clone()))?;
info!("{} picked {:?}", key, addr);
let addrs = self.hashring.read().await;
let addr = *addrs
.get(&task_id[0..5].to_string())
.ok_or_else(|| Error::HashRing(task_id.to_string()))?;
drop(addrs);
info!("picked {:?}", addr);

let channel = match Channel::from_shared(format!("http://{}", addr))
.map_err(|_| Error::InvalidURI(addr.to_string()))?
Expand Down Expand Up @@ -347,38 +354,40 @@ impl SchedulerClient {
async fn update_available_scheduler_addrs(&self) -> Result<()> {
// Get the endpoints of available schedulers.
let data = self.dynconfig.data.read().await;
let data_available_schedulers_clone = data.available_schedulers.clone();
drop(data);

// Check if the available schedulers is empty.
if data.available_schedulers.is_empty() {
if data_available_schedulers_clone.is_empty() {
return Err(Error::AvailableSchedulersNotFound);
}

// Get the available schedulers.
let available_schedulers = self.available_schedulers.read().await;
let available_schedulers_clone = available_schedulers.clone();
drop(available_schedulers);

// Check if the available schedulers is not changed.
if data.available_schedulers.len() == available_schedulers.len()
&& data
.available_schedulers
if data_available_schedulers_clone.len() == available_schedulers_clone.len()
&& data_available_schedulers_clone
.iter()
.zip(available_schedulers.iter())
.zip(available_schedulers_clone.iter())
.all(|(a, b)| a == b)
{
info!(
"available schedulers is not changed: {:?}",
data.available_schedulers
data_available_schedulers_clone
.iter()
.map(|s| s.ip.clone())
.collect::<Vec<String>>()
);
return Ok(());
}
drop(available_schedulers);

let mut new_available_schedulers = Vec::new();
let mut new_available_scheduler_addrs = Vec::new();
let mut new_hashring = HashRing::new();
for available_scheduler in data.available_schedulers.iter() {
for available_scheduler in data_available_schedulers_clone.iter() {
let ip = match IpAddr::from_str(&available_scheduler.ip) {
Ok(ip) => ip,
Err(err) => {
Expand All @@ -403,20 +412,25 @@ impl SchedulerClient {
// Update the available schedulers.
let mut available_schedulers = self.available_schedulers.write().await;
*available_schedulers = new_available_schedulers;
drop(available_schedulers);

// Update the addresses of available schedulers.
let mut available_scheduler_addrs = self.available_scheduler_addrs.write().await;
*available_scheduler_addrs = new_available_scheduler_addrs;
drop(available_scheduler_addrs);

// Update the hashring.
let mut hashring = self.hashring.write().await;
*hashring = new_hashring;
drop(hashring);

let available_scheduler_addrs = self.available_scheduler_addrs.read().await;
info!(
"refresh available scheduler addresses: {:?}",
available_scheduler_addrs
.iter()
.map(|s| s.ip().to_string())
.collect::<Vec<String>>()
.collect::<Vec<String>>(),
);
Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion dragonfly-client/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ impl Proxy {
let server_ca_cert = self.server_ca_cert.clone();
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.keep_alive(true)
.preserve_header_case(true)
.title_case_headers(true)
.serve_connection(
Expand All @@ -184,7 +185,7 @@ impl Proxy {
.with_upgrades()
.await
{
error!("failed to serve connection: {}", err);
error!("failed to serve connection from {}: {}", remote_address, err);
}
});
}
Expand Down
12 changes: 9 additions & 3 deletions dragonfly-client/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ impl Task {
let mut finished_pieces: Vec<metadata::Piece> = Vec::new();

// Initialize stream channel.
let (in_stream_tx, in_stream_rx) = mpsc::channel(1024);
let (in_stream_tx, in_stream_rx) = mpsc::channel(4096);

// Send the register peer request.
in_stream_tx
Expand All @@ -444,10 +444,16 @@ impl Task {

// Initialize the stream.
let in_stream = ReceiverStream::new(in_stream_rx);
let request = Request::new(in_stream);
let response = self
.scheduler_client
.announce_peer(task.id.as_str(), Request::new(in_stream))
.await?;
.announce_peer(task.id.as_str(), peer_id, request)
.await
.map_err(|err| {
error!("announce peer failed: {:?}", err);
err
})?;
info!("announced peer has been connected");

let out_stream = response
.into_inner()
Expand Down

0 comments on commit e2a907a

Please sign in to comment.