-
Notifications
You must be signed in to change notification settings - Fork 122
/
Copy pathhandle.rs
113 lines (99 loc) · 3.99 KB
/
handle.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
//! Allows to accept connections
use crate::{
connection,
connection::Connection,
endpoint::{close, close::CloseHandle, connect},
};
use core::{
pin::Pin,
task::{Context, Poll, Waker},
};
use futures_channel::mpsc;
use futures_core::Stream;
/// Held by application. Used to accept new connections.
pub(crate) type AcceptorReceiver = mpsc::UnboundedReceiver<Connection>;
/// Held by library. Used to notify the application of newly-accepted connections.
pub(crate) type AcceptorSender = mpsc::UnboundedSender<Connection>;
/// Held by library. Used to receive connection attempts from the application.
pub(crate) type ConnectorReceiver = mpsc::Receiver<connect::Request>;
/// Held by application. Used to submit connection attempts to the library.
pub(crate) type ConnectorSender = mpsc::Sender<connect::Request>;
/// Held by library. Used to receive close attempts from the application.
pub(crate) type CloseReceiver = mpsc::Receiver<Waker>;
/// Held by the application. Used to submit connection close attempts to the library.
pub(crate) type CloseSender = mpsc::Sender<Waker>;
/// The [`Handle`] allows applications to accept and open QUIC connections on an `Endpoint`.
#[derive(Debug)]
pub(crate) struct Handle {
pub acceptor: Acceptor,
pub connector: Connector,
}
impl Handle {
/// Creates a new `Handle` with a limit opening connection limit.
pub(crate) fn new(
max_opening_connections: usize,
) -> (Self, AcceptorSender, ConnectorReceiver, CloseHandle) {
let (acceptor_sender, acceptor_receiver) = mpsc::unbounded();
let (connector_sender, connector_receiver) = mpsc::channel(max_opening_connections);
let (close_sender, close_receiver) = mpsc::channel(max_opening_connections);
let endpoint_state = close::EndpointState::default();
let closer = close::Closer::new(close_sender, endpoint_state.clone());
let handle = Self {
acceptor: Acceptor {
acceptor: acceptor_receiver,
},
connector: Connector {
connector: connector_sender,
closer,
},
};
(
handle,
acceptor_sender,
connector_receiver,
CloseHandle::new(close_receiver, endpoint_state),
)
}
}
#[derive(Debug)]
pub struct Acceptor {
acceptor: AcceptorReceiver,
}
impl Acceptor {
/// Polls for incoming connections and returns them.
///
/// The method will return
/// - `Poll::Ready(Some(connection))` if a connection was accepted.
/// - `Poll::Ready(None)` if the acceptor is closed.
/// - `Poll::Pending` if no new connection was accepted yet.
/// In this case the caller must retry polling as soon as a client
/// establishes a connection.
/// In order to notify the application of this condition,
/// the method will save the [`core::task::Waker`] which is provided as part of the
/// [`Context`] parameter, and notify it as soon as retrying
/// the method will yield a different result.
pub fn poll_accept(&mut self, context: &mut Context) -> Poll<Option<Connection>> {
match Stream::poll_next(Pin::new(&mut self.acceptor), context) {
Poll::Ready(Some(connection)) => Poll::Ready(Some(connection)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Clone, Debug)]
pub struct Connector {
connector: ConnectorSender,
closer: close::Closer,
}
impl Connector {
/// Attempts to establish a connection to an endpoint and returns a future to be awaited
pub fn connect(&self, connect: connect::Connect) -> connect::Attempt {
connect::Attempt::new(&self.connector, connect)
}
/// Polls to close the endpoint
pub fn poll_close(&mut self, context: &mut Context) -> Poll<Result<(), connection::Error>> {
self.closer.poll_close(context)
}
}