Skip to content

Commit

Permalink
Use weak ref to allow more muxification
Browse files Browse the repository at this point in the history
  • Loading branch information
quackzar committed Jun 13, 2024
1 parent 89136eb commit 37382e6
Showing 1 changed file with 25 additions and 43 deletions.
68 changes: 25 additions & 43 deletions src/net/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use std::{ops::RangeBounds, sync::Arc};

use std::future::Future;

use futures::{future::join_all, FutureExt};
use futures::future::join_all;

Check warning on line 13 in src/net/mux.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/caring/caring/src/net/mux.rs

Check warning on line 13 in src/net/mux.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/caring/caring/src/net/mux.rs
use itertools::{multiunzip, Itertools};

Check failure on line 14 in src/net/mux.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused import: `multiunzip`

Check failure on line 14 in src/net/mux.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused import: `multiunzip`
use thiserror::Error;
use tokio::{join, sync::{mpsc::{self, unbounded_channel}, oneshot}};
use tokio::{join, sync::{mpsc::{self, unbounded_channel, UnboundedSender, WeakUnboundedSender}, oneshot}};

Check failure on line 16 in src/net/mux.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused import: `WeakUnboundedSender`

Check failure on line 16 in src/net/mux.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused import: `WeakUnboundedSender`
use tokio_util::bytes::{Buf, BufMut, Bytes, BytesMut};

use crate::{
Expand Down Expand Up @@ -79,8 +79,6 @@ impl RecvBytes for MuxedReceiver {
async fn recv_bytes(&mut self) -> Result<tokio_util::bytes::BytesMut, Self::RecvError> {
tokio::select! {
msg = self.mailbox.recv() => {
let idx = self.id;
println!("Mux {idx}: Receiving!");
let msg = msg.ok_or(MuxError::DeadGateway)?;
Ok(msg)
},
Expand Down Expand Up @@ -143,7 +141,7 @@ where
mailboxes: Vec<mpsc::UnboundedSender<BytesMut>>,

Check warning on line 141 in src/net/mux.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/caring/caring/src/net/mux.rs

Check warning on line 141 in src/net/mux.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/caring/caring/src/net/mux.rs
inbox: mpsc::UnboundedReceiver<MultiplexedMessage>,
errors: Vec<[oneshot::Sender<MuxError>; 2]>,
//outbox: mpsc::WeakUnboundedSender<MultiplexedMessage>
outbox: mpsc::WeakUnboundedSender<MultiplexedMessage>
}

/// Gateway channel for multiplexed connections/channels ([MuxConn]),
Expand Down Expand Up @@ -207,34 +205,14 @@ impl<C: SplitChannel + Send> Gateway<C> {
/// ```

Check warning on line 205 in src/net/mux.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/caring/caring/src/net/mux.rs

Check warning on line 205 in src/net/mux.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/caring/caring/src/net/mux.rs
///
pub fn multiplex(con: C, n: usize) -> Self {
let (gateway, inbox) = unbounded_channel();

let (sends, channels, errors) = multiunzip((0..n).map(|id| {
let (send, recv) = unbounded_channel();
let gateway = gateway.clone();

let mailbox = recv;
let (error_coms1, error) = oneshot::channel();
let receiver = MuxedReceiver { id, mailbox, error };
let (error_coms2, error) = oneshot::channel();
let sender = MuxedSender { id, gateway, error };
let chan = MuxConn(sender, receiver);

//
(send, chan, [error_coms1, error_coms2])
}));

let inner = GatewayInner {
mailboxes: sends,
inbox,
channel: con,
errors,
//outbox: gateway.downgrade(),
};

let (mut gateway, con) = GatewayInner::new(con);
let mut muxes = vec![con];
for _ in 1..n {
muxes.push(gateway.muxify());
}
Self {
inner,
muxes: channels,
inner: gateway,
muxes,
}
}

Expand Down Expand Up @@ -305,28 +283,29 @@ impl<C: SplitChannel + Send> GatewayInner<C> {
gateway
}

pub fn new(channel: C) -> Self {
pub fn new(channel: C) -> (Self, MuxConn) {
let (outbox, inbox) = unbounded_channel();
let outbox = outbox.downgrade();
Self {
let gateway = outbox.clone();
let outbox= outbox.downgrade();
let mut new = Self {
channel,
mailboxes: vec![],
errors: vec![],
inbox,
//outbox,
}
outbox,
};
let con = new.add_mux(gateway);
(new, con)

}

pub fn muxify(&mut self) -> MuxConn {
fn add_mux(&mut self, gateway: UnboundedSender<MultiplexedMessage>) -> MuxConn {
let id = self.mailboxes.len();
//let gateway = self.outbox.clone().upgrade().expect("We are holding the receiver");
let gateway = todo!();
let (errors_coms1, error) = oneshot::channel();
let mx_sender = MuxedSender {
id,
gateway,
error,

};
let (outbox, mailbox) = tokio::sync::mpsc::unbounded_channel();
let (errors_coms2, error) = oneshot::channel();
Expand All @@ -335,12 +314,15 @@ impl<C: SplitChannel + Send> GatewayInner<C> {
mailbox,
error,
};

self.errors.push([errors_coms1, errors_coms2]);
self.mailboxes.push(outbox);

MuxConn(mx_sender, mx_receiver)
}

pub fn muxify(&mut self) -> MuxConn {
let gateway = self.outbox.clone().upgrade().expect("We are holding the receiver");
self.add_mux(gateway)
}
}

pub struct NetworkGateway<C: SplitChannel> {
Expand Down

0 comments on commit 37382e6

Please sign in to comment.