Skip to content

Commit

Permalink
peer_connection: Implement state changes after spec and fix race (#598)
Browse files Browse the repository at this point in the history
* peer_connection: Implement state changes after spec

https://www.w3.org/TR/webrtc/#rtcpeerconnectionstate-enum

Ported from PION
pion/webrtc#2435

* peer_connection: Fix state transition race

DTLS transport state was previously acquired and sent over to the async
context returned. This would in some rare cases obviously lead to
updating peer connection state with an old DTLS transport state.
This would in turn lead to PeerConnection not updating it's state
correctly. Sometimes we would see PeerConnection never reaching Connected.

* peer_connection: Initialize PeerConnectionInternal correctly

PeerConnectionInternal was initialized with default transports and then
mutated right after. This resulted in create_ice_transport() using the
default DTLS transport instance which again led to state transition
reading DTLS transport state from the wrong instance.
  • Loading branch information
haaspors authored Aug 2, 2024
1 parent fa42cad commit 54b5843
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 87 deletions.
19 changes: 13 additions & 6 deletions webrtc/src/peer_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -884,14 +884,21 @@ impl RTCPeerConnection {
// Any of the RTCIceTransports or RTCDtlsTransports are in the "disconnected"
// state and none of them are in the "failed" or "connecting" or "checking" state.
RTCPeerConnectionState::Disconnected
} else if ice_connection_state == RTCIceConnectionState::Connected && dtls_transport_state == RTCDtlsTransportState::Connected {
} else if (ice_connection_state == RTCIceConnectionState::New || ice_connection_state == RTCIceConnectionState::Closed) &&
(dtls_transport_state == RTCDtlsTransportState::New || dtls_transport_state == RTCDtlsTransportState::Closed) {
// None of the previous states apply and all RTCIceTransports are in the "new" or "closed" state,
// and all RTCDtlsTransports are in the "new" or "closed" state, or there are no transports.
RTCPeerConnectionState::New
} else if (ice_connection_state == RTCIceConnectionState::New || ice_connection_state == RTCIceConnectionState::Checking) ||
(dtls_transport_state == RTCDtlsTransportState::New || dtls_transport_state == RTCDtlsTransportState::Connecting) {
// None of the previous states apply and any RTCIceTransport is in the "new" or "checking" state or
// any RTCDtlsTransport is in the "new" or "connecting" state.
RTCPeerConnectionState::Connecting
} else if (ice_connection_state == RTCIceConnectionState::Connected || ice_connection_state == RTCIceConnectionState::Completed || ice_connection_state == RTCIceConnectionState::Closed) &&
(dtls_transport_state == RTCDtlsTransportState::Connected || dtls_transport_state == RTCDtlsTransportState::Closed) {
// All RTCIceTransports and RTCDtlsTransports are in the "connected", "completed" or "closed"
// state and at least one of them is in the "connected" or "completed" state.
// state and all RTCDtlsTransports are in the "connected" or "closed" state.
RTCPeerConnectionState::Connected
} else if ice_connection_state == RTCIceConnectionState::Checking && dtls_transport_state == RTCDtlsTransportState::Connecting {
// Any of the RTCIceTransports or RTCDtlsTransports are in the "connecting" or
// "checking" state and none of them is in the "failed" state.
RTCPeerConnectionState::Connecting
} else {
RTCPeerConnectionState::New
};
Expand Down
160 changes: 79 additions & 81 deletions webrtc/src/peer_connection/peer_connection_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,24 @@ impl PeerConnectionInternal {
stats_interceptor: Arc<stats::StatsInterceptor>,
mut configuration: RTCConfiguration,
) -> Result<(Arc<Self>, RTCConfiguration)> {
let mut pc = PeerConnectionInternal {
// Create the ice gatherer
let ice_gatherer = Arc::new(api.new_ice_gatherer(RTCIceGatherOptions {
ice_servers: configuration.get_ice_servers(),
ice_gather_policy: configuration.ice_transport_policy,
})?);

// Create the ICE transport
let ice_transport = Arc::new(api.new_ice_transport(Arc::clone(&ice_gatherer)));

// Create the DTLS transport
let certificates = configuration.certificates.drain(..).collect();
let dtls_transport =
Arc::new(api.new_dtls_transport(Arc::clone(&ice_transport), certificates)?);

// Create the SCTP transport
let sctp_transport = Arc::new(api.new_sctp_transport(Arc::clone(&dtls_transport))?);

let pc = Arc::new(PeerConnectionInternal {
greater_mid: AtomicIsize::new(-1),
sdp_origin: Mutex::new(Default::default()),
last_offer: Mutex::new("".to_owned()),
Expand All @@ -89,16 +106,16 @@ impl PeerConnectionInternal {
is_negotiation_needed: Arc::new(AtomicBool::new(false)),
negotiation_needed_state: Arc::new(AtomicU8::new(NegotiationNeededState::Empty as u8)),
signaling_state: Arc::new(AtomicU8::new(RTCSignalingState::Stable as u8)),
ice_transport: Arc::new(Default::default()),
dtls_transport: Arc::new(Default::default()),
ice_transport,
dtls_transport,
ice_connection_state: Arc::new(AtomicU8::new(RTCIceConnectionState::New as u8)),
sctp_transport: Arc::new(Default::default()),
sctp_transport,
rtp_transceivers: Arc::new(Default::default()),
on_track_handler: Arc::new(ArcSwapOption::empty()),
on_signaling_state_change_handler: ArcSwapOption::empty(),
on_ice_connection_state_change_handler: Arc::new(ArcSwapOption::empty()),
on_data_channel_handler: Arc::new(Default::default()),
ice_gatherer: Arc::new(Default::default()),
ice_gatherer,
current_local_description: Arc::new(Default::default()),
current_remote_description: Arc::new(Default::default()),
pending_local_description: Arc::new(Default::default()),
Expand All @@ -114,39 +131,77 @@ impl PeerConnectionInternal {
stats_interceptor,
on_peer_connection_state_change_handler: Arc::new(ArcSwapOption::empty()),
pending_remote_description: Arc::new(Default::default()),
};

// Create the ice gatherer
pc.ice_gatherer = Arc::new(api.new_ice_gatherer(RTCIceGatherOptions {
ice_servers: configuration.get_ice_servers(),
ice_gather_policy: configuration.ice_transport_policy,
})?);

// Create the ice transport
pc.ice_transport = pc.create_ice_transport(api).await;
});

// Create the DTLS transport
let certificates = configuration.certificates.drain(..).collect();
pc.dtls_transport =
Arc::new(api.new_dtls_transport(Arc::clone(&pc.ice_transport), certificates)?);
// Wire up the ice transport connection state change handler
let ice_connection_state = Arc::clone(&pc.ice_connection_state);
let peer_connection_state = Arc::clone(&pc.peer_connection_state);
let is_closed = Arc::clone(&pc.is_closed);
let dtls_transport = Arc::clone(&pc.dtls_transport);
let on_ice_connection_state_change_handler =
Arc::clone(&pc.on_ice_connection_state_change_handler);
let on_peer_connection_state_change_handler =
Arc::clone(&pc.on_peer_connection_state_change_handler);

pc.ice_transport.on_connection_state_change(Box::new(
move |state: RTCIceTransportState| {
let cs = match state {
RTCIceTransportState::New => RTCIceConnectionState::New,
RTCIceTransportState::Checking => RTCIceConnectionState::Checking,
RTCIceTransportState::Connected => RTCIceConnectionState::Connected,
RTCIceTransportState::Completed => RTCIceConnectionState::Completed,
RTCIceTransportState::Failed => RTCIceConnectionState::Failed,
RTCIceTransportState::Disconnected => RTCIceConnectionState::Disconnected,
RTCIceTransportState::Closed => RTCIceConnectionState::Closed,
_ => {
log::warn!("on_connection_state_change: unhandled ICE state: {}", state);
return Box::pin(async {});
}
};

// Create the SCTP transport
pc.sctp_transport = Arc::new(api.new_sctp_transport(Arc::clone(&pc.dtls_transport))?);
let dtls_transport = Arc::clone(&dtls_transport);
let ice_connection_state = Arc::clone(&ice_connection_state);
let on_ice_connection_state_change_handler =
Arc::clone(&on_ice_connection_state_change_handler);
let on_peer_connection_state_change_handler =
Arc::clone(&on_peer_connection_state_change_handler);
let is_closed = Arc::clone(&is_closed);
let peer_connection_state = Arc::clone(&peer_connection_state);
Box::pin(async move {
RTCPeerConnection::do_ice_connection_state_change(
&on_ice_connection_state_change_handler,
&ice_connection_state,
cs,
)
.await;

let dtls_transport_state = dtls_transport.state();
RTCPeerConnection::update_connection_state(
&on_peer_connection_state_change_handler,
&is_closed,
&peer_connection_state,
cs,
dtls_transport_state,
)
.await;
})
},
));

// Wire up the on datachannel handler
let on_data_channel_handler = Arc::clone(&pc.on_data_channel_handler);
pc.sctp_transport
.on_data_channel(Box::new(move |d: Arc<RTCDataChannel>| {
let on_data_channel_handler2 = Arc::clone(&on_data_channel_handler);
let on_data_channel_handler = Arc::clone(&on_data_channel_handler);
Box::pin(async move {
if let Some(handler) = &*on_data_channel_handler2.load() {
if let Some(handler) = &*on_data_channel_handler.load() {
let mut f = handler.lock().await;
f(d).await;
}
})
}));

Ok((Arc::new(pc), configuration))
Ok((pc, configuration))
}

pub(super) async fn start_rtp(
Expand Down Expand Up @@ -1141,63 +1196,6 @@ impl PeerConnectionInternal {
}
}

pub(super) async fn create_ice_transport(&self, api: &API) -> Arc<RTCIceTransport> {
let ice_transport = Arc::new(api.new_ice_transport(Arc::clone(&self.ice_gatherer)));

let ice_connection_state = Arc::clone(&self.ice_connection_state);
let peer_connection_state = Arc::clone(&self.peer_connection_state);
let is_closed = Arc::clone(&self.is_closed);
let dtls_transport = Arc::clone(&self.dtls_transport);
let on_ice_connection_state_change_handler =
Arc::clone(&self.on_ice_connection_state_change_handler);
let on_peer_connection_state_change_handler =
Arc::clone(&self.on_peer_connection_state_change_handler);

ice_transport.on_connection_state_change(Box::new(move |state: RTCIceTransportState| {
let cs = match state {
RTCIceTransportState::New => RTCIceConnectionState::New,
RTCIceTransportState::Checking => RTCIceConnectionState::Checking,
RTCIceTransportState::Connected => RTCIceConnectionState::Connected,
RTCIceTransportState::Completed => RTCIceConnectionState::Completed,
RTCIceTransportState::Failed => RTCIceConnectionState::Failed,
RTCIceTransportState::Disconnected => RTCIceConnectionState::Disconnected,
RTCIceTransportState::Closed => RTCIceConnectionState::Closed,
_ => {
log::warn!("on_connection_state_change: unhandled ICE state: {}", state);
return Box::pin(async {});
}
};

let ice_connection_state2 = Arc::clone(&ice_connection_state);
let on_ice_connection_state_change_handler2 =
Arc::clone(&on_ice_connection_state_change_handler);
let on_peer_connection_state_change_handler2 =
Arc::clone(&on_peer_connection_state_change_handler);
let is_closed2 = Arc::clone(&is_closed);
let dtls_transport_state = dtls_transport.state();
let peer_connection_state2 = Arc::clone(&peer_connection_state);
Box::pin(async move {
RTCPeerConnection::do_ice_connection_state_change(
&on_ice_connection_state_change_handler2,
&ice_connection_state2,
cs,
)
.await;

RTCPeerConnection::update_connection_state(
&on_peer_connection_state_change_handler2,
&is_closed2,
&peer_connection_state2,
cs,
dtls_transport_state,
)
.await;
})
}));

ice_transport
}

/// has_local_description_changed returns whether local media (rtp_transceivers) has changed
/// caller of this method should hold `pc.mu` lock
pub(super) async fn has_local_description_changed(&self, desc: &RTCSessionDescription) -> bool {
Expand Down
105 changes: 105 additions & 0 deletions webrtc/src/peer_connection/peer_connection_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,3 +635,108 @@ async fn test_peer_connection_simulcast_no_data_channel() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_peer_connection_state() -> Result<()> {
let mut m = MediaEngine::default();
m.register_default_codecs()?;
let api = APIBuilder::new().with_media_engine(m).build();
let pc = api.new_peer_connection(RTCConfiguration::default()).await?;

assert_eq!(pc.connection_state(), RTCPeerConnectionState::New);

RTCPeerConnection::update_connection_state(
&pc.internal.on_peer_connection_state_change_handler,
&pc.internal.is_closed,
&pc.internal.peer_connection_state,
RTCIceConnectionState::Checking,
RTCDtlsTransportState::New,
)
.await;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Connecting);

RTCPeerConnection::update_connection_state(
&pc.internal.on_peer_connection_state_change_handler,
&pc.internal.is_closed,
&pc.internal.peer_connection_state,
RTCIceConnectionState::Connected,
RTCDtlsTransportState::New,
)
.await;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Connecting);

RTCPeerConnection::update_connection_state(
&pc.internal.on_peer_connection_state_change_handler,
&pc.internal.is_closed,
&pc.internal.peer_connection_state,
RTCIceConnectionState::Connected,
RTCDtlsTransportState::Connecting,
)
.await;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Connecting);

RTCPeerConnection::update_connection_state(
&pc.internal.on_peer_connection_state_change_handler,
&pc.internal.is_closed,
&pc.internal.peer_connection_state,
RTCIceConnectionState::Connected,
RTCDtlsTransportState::Connected,
)
.await;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Connected);

RTCPeerConnection::update_connection_state(
&pc.internal.on_peer_connection_state_change_handler,
&pc.internal.is_closed,
&pc.internal.peer_connection_state,
RTCIceConnectionState::Completed,
RTCDtlsTransportState::Connected,
)
.await;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Connected);

RTCPeerConnection::update_connection_state(
&pc.internal.on_peer_connection_state_change_handler,
&pc.internal.is_closed,
&pc.internal.peer_connection_state,
RTCIceConnectionState::Connected,
RTCDtlsTransportState::Closed,
)
.await;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Connected);

RTCPeerConnection::update_connection_state(
&pc.internal.on_peer_connection_state_change_handler,
&pc.internal.is_closed,
&pc.internal.peer_connection_state,
RTCIceConnectionState::Disconnected,
RTCDtlsTransportState::Connected,
)
.await;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Disconnected);

RTCPeerConnection::update_connection_state(
&pc.internal.on_peer_connection_state_change_handler,
&pc.internal.is_closed,
&pc.internal.peer_connection_state,
RTCIceConnectionState::Failed,
RTCDtlsTransportState::Connected,
)
.await;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Failed);

RTCPeerConnection::update_connection_state(
&pc.internal.on_peer_connection_state_change_handler,
&pc.internal.is_closed,
&pc.internal.peer_connection_state,
RTCIceConnectionState::Connected,
RTCDtlsTransportState::Failed,
)
.await;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Failed);

pc.close().await?;
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Closed);

Ok(())
}

0 comments on commit 54b5843

Please sign in to comment.