Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the Clone bound #1

Merged
merged 1 commit into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 48 additions & 78 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,34 +50,34 @@
//!
//! // happy path: no cancelation
//! let request = Request::This(1, 2);
//! assert!(rq.request(&request).is_ok());
//! assert!(rq.request(request).is_ok());
//!
//! let request = rp.take_request().unwrap();
//! println!("rp got request: {:?}", &request);
//! println!("rp got request: {:?}", request);
//!
//! let response = Response::There(-1);
//! assert!(!rp.is_canceled());
//! assert!(rp.respond(&response).is_ok());
//! assert!(rp.respond(response).is_ok());
//!
//! let response = rq.take_response().unwrap();
//! println!("rq got response: {:?}", &response);
//! println!("rq got response: {:?}", response);
//!
//! // early cancelation path
//! assert!(rq.request(&request).is_ok());
//! assert!(rq.request(request).is_ok());
//!
//! let request = rq.cancel().unwrap().unwrap();
//! println!("responder could cancel: {:?}", &request);
//! println!("responder could cancel: {:?}", request);
//!
//! assert!(rp.take_request().is_none());
//! assert!(State::Idle == rq.state());
//!
//! // late cancelation
//! assert!(rq.request(&request).is_ok());
//! assert!(rq.request(request).is_ok());
//! let request = rp.take_request().unwrap();
//!
//! println!("responder could cancel: {:?}", &rq.cancel().unwrap().is_none());
//! assert!(rp.is_canceled());
//! assert!(rp.respond(&response).is_err());
//! assert!(rp.respond(response).is_err());
//! assert!(rp.acknowledge_cancel().is_ok());
//! assert!(State::Idle == rq.state());
//!
Expand All @@ -92,7 +92,7 @@
//! assert!(rq.send_request().is_ok());
//! let request = rp.take_request().unwrap();
//! assert_eq!(request, Request::This(1, 2));
//! println!("rp got request: {:?}", &request);
//! println!("rp got request: {:?}", request);
//!
//! // building into response buffer
//! impl Default for Response {
Expand Down Expand Up @@ -190,11 +190,7 @@ enum Message<Q, A> {
Response(A),
}

impl<Q, A> Message<Q, A>
where
Q: Clone,
A: Clone,
{
impl<Q, A> Message<Q, A> {
fn is_request_state(&self) -> bool {
matches!(self, Self::Request(_))
}
Expand All @@ -203,10 +199,10 @@ where
matches!(self, Self::Response(_))
}

#[allow(unused)]
unsafe fn rq(self) -> Q {
match self {
Self::Request(request) => request,
unsafe fn take_rq(&mut self) -> Q {
let this = core::mem::replace(self, Message::None);
match this {
Message::Request(r) => r,
_ => unreachable!(),
}
}
Expand All @@ -225,10 +221,10 @@ where
}
}

#[allow(unused)]
unsafe fn rp(self) -> A {
match self {
Self::Response(response) => response,
unsafe fn take_rp(&mut self) -> A {
let this = core::mem::replace(self, Message::None);
match this {
Message::Response(r) => r,
_ => unreachable!(),
}
}
Expand All @@ -247,12 +243,12 @@ where
}
}

fn from_rq(rq: &Q) -> Self {
Self::Request(rq.clone())
fn from_rq(rq: Q) -> Self {
Self::Request(rq)
}

fn from_rp(rp: &A) -> Self {
Self::Response(rp.clone())
fn from_rp(rp: A) -> Self {
Self::Response(rp)
}
}

Expand Down Expand Up @@ -285,34 +281,34 @@ where
///
/// // happy path: no cancelation
/// let request = Request::This(1, 2);
/// assert!(rq.request(&request).is_ok());
/// assert!(rq.request(request).is_ok());
///
/// let request = rp.take_request().unwrap();
/// println!("rp got request: {:?}", &request);
/// println!("rp got request: {:?}", request);
///
/// let response = Response::There(-1);
/// assert!(!rp.is_canceled());
/// assert!(rp.respond(&response).is_ok());
/// assert!(rp.respond(response).is_ok());
///
/// let response = rq.take_response().unwrap();
/// println!("rq got response: {:?}", &response);
/// println!("rq got response: {:?}", response);
///
/// // early cancelation path
/// assert!(rq.request(&request).is_ok());
/// assert!(rq.request(request).is_ok());
///
/// let request = rq.cancel().unwrap().unwrap();
/// println!("responder could cancel: {:?}", &request);
/// println!("responder could cancel: {:?}", request);
///
/// assert!(rp.take_request().is_none());
/// assert!(State::Idle == rq.state());
///
/// // late cancelation
/// assert!(rq.request(&request).is_ok());
/// assert!(rq.request(request).is_ok());
/// let request = rp.take_request().unwrap();
///
/// println!("responder could cancel: {:?}", &rq.cancel().unwrap().is_none());
/// assert!(rp.is_canceled());
/// assert!(rp.respond(&response).is_err());
/// assert!(rp.respond(response).is_err());
/// assert!(rp.acknowledge_cancel().is_ok());
/// assert!(State::Idle == rq.state());
///
Expand All @@ -327,7 +323,7 @@ where
/// assert!(rq.send_request().is_ok());
/// let request = rp.take_request().unwrap();
/// assert_eq!(request, Request::This(1, 2));
/// println!("rp got request: {:?}", &request);
/// println!("rp got request: {:?}", request);
///
/// // building into response buffer
/// impl Default for Response {
Expand All @@ -349,11 +345,7 @@ pub struct Channel<Q, A> {
responder_claimed: AtomicBool,
}

impl<Q, A> Channel<Q, A>
where
Q: Clone,
A: Clone,
{
impl<Q, A> Channel<Q, A> {
#[cfg(not(loom))]
const CHANNEL_INIT: Channel<Q, A> = Self::new();

Expand Down Expand Up @@ -410,11 +402,7 @@ where
}
}

impl<Q, A> Default for Channel<Q, A>
where
Q: Clone,
A: Clone,
{
impl<Q, A> Default for Channel<Q, A> {
fn default() -> Self {
Self::new()
}
Expand All @@ -436,11 +424,7 @@ impl<'i, Q, A> Drop for Requester<'i, Q, A> {
}
}

impl<'i, Q, A> Requester<'i, Q, A>
where
Q: Clone,
A: Clone,
{
impl<'i, Q, A> Requester<'i, Q, A> {
#[inline]
fn transition(&self, from: State, to: State) -> bool {
self.channel
Expand Down Expand Up @@ -497,7 +481,7 @@ where
///
/// If the RPC state is `Idle`, this always succeeds, else calling
/// is a logic error and the request is returned.
pub fn request(&mut self, request: &Q) -> Result<(), Error> {
pub fn request(&mut self, request: Q) -> Result<(), Error> {
if State::Idle == self.channel.state.load(Ordering::Acquire) {
unsafe {
self.with_data_mut(|i| *i = Message::from_rq(request));
Expand Down Expand Up @@ -525,7 +509,7 @@ where
self.channel
.state
.store(State::Idle as u8, Ordering::Release);
return Ok(Some(unsafe { self.with_data(|i| i.rq_ref().clone()) }));
return Ok(Some(unsafe { self.with_data_mut(|i| i.take_rq()) }));
}

// we canceled after the responder took the request, but before they answered.
Expand Down Expand Up @@ -576,7 +560,7 @@ where
// it seems unnecessary to model this.
pub fn take_response(&mut self) -> Option<A> {
if self.transition(State::Responded, State::Idle) {
Some(unsafe { self.with_data(|i| i.rp_ref().clone()) })
Some(unsafe { self.with_data_mut(|i| i.take_rp()) })
} else {
None
}
Expand All @@ -585,8 +569,7 @@ where

impl<'i, Q, A> Requester<'i, Q, A>
where
Q: Clone + Default,
A: Clone,
Q: Default,
{
/// Initialize a request with its default values and mutates it with `f`
///
Expand All @@ -598,7 +581,7 @@ where
let res = unsafe {
self.with_data_mut(|i| {
if !i.is_request_state() {
*i = Message::from_rq(&Q::default());
*i = Message::from_rq(Q::default());
}
f(i.rq_mut())
})
Expand All @@ -622,7 +605,7 @@ where
unsafe {
self.with_data_mut(|i| {
if !i.is_request_state() {
*i = Message::from_rq(&Q::default());
*i = Message::from_rq(Q::default());
}
})
}
Expand Down Expand Up @@ -662,11 +645,7 @@ impl<'i, Q, A> Drop for Responder<'i, Q, A> {
}
}

impl<'i, Q, A> Responder<'i, Q, A>
where
Q: Clone,
A: Clone,
{
impl<'i, Q, A> Responder<'i, Q, A> {
#[inline]
fn transition(&self, from: State, to: State) -> bool {
self.channel
Expand Down Expand Up @@ -748,7 +727,7 @@ where
/// If you need copies, clone the request.
pub fn take_request(&mut self) -> Option<Q> {
if self.transition(State::Requested, State::BuildingResponse) {
Some(unsafe { self.with_data(|i| i.rq_ref().clone()) })
Some(unsafe { self.with_data_mut(|i| i.take_rq()) })
} else {
None
}
Expand Down Expand Up @@ -785,7 +764,7 @@ where
/// If efficiency is a concern, or responses need multiple steps to
/// construct, use `with_response_mut` or `response_mut` and `send_response`.
///
pub fn respond(&mut self, response: &A) -> Result<(), Error> {
pub fn respond(&mut self, response: A) -> Result<(), Error> {
if State::BuildingResponse == self.channel.state.load(Ordering::Acquire) {
unsafe {
self.with_data_mut(|i| *i = Message::from_rp(response));
Expand All @@ -802,8 +781,7 @@ where

impl<'i, Q, A> Responder<'i, Q, A>
where
Q: Clone,
A: Clone + Default,
A: Default,
{
/// Initialize a response with its default values and mutates it with `f`
///
Expand All @@ -815,7 +793,7 @@ where
let res = unsafe {
self.with_data_mut(|i| {
if !i.is_response_state() {
*i = Message::from_rp(&A::default());
*i = Message::from_rp(A::default());
}
f(i.rp_mut())
})
Expand All @@ -839,7 +817,7 @@ where
unsafe {
self.with_data_mut(|i| {
if !i.is_response_state() {
*i = Message::from_rp(&A::default());
*i = Message::from_rp(A::default());
}
})
}
Expand Down Expand Up @@ -900,11 +878,7 @@ pub struct Interchange<Q, A, const N: usize> {
last_claimed: AtomicUsize,
}

impl<Q, A, const N: usize> Interchange<Q, A, N>
where
Q: Clone,
A: Clone,
{
impl<Q, A, const N: usize> Interchange<Q, A, N> {
/// Create a new Interchange
#[cfg(not(loom))]
pub const fn new() -> Self {
Expand Down Expand Up @@ -936,11 +910,7 @@ where
}

#[cfg(not(loom))]
impl<Q, A, const N: usize> Default for Interchange<Q, A, N>
where
Q: Clone,
A: Clone,
{
impl<Q, A, const N: usize> Default for Interchange<Q, A, N> {
fn default() -> Self {
Self::new()
}
Expand Down
4 changes: 2 additions & 2 deletions tests/loom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn loom_interchange() {
}

fn requester_thread(mut requester: Requester<'static, u64, u64>) -> Option<()> {
requester.request(&53).unwrap();
requester.request(53).unwrap();
requester.with_response(|r| assert_eq!(*r, 63)).ok()?;
requester.with_response(|r| assert_eq!(*r, 63)).ok()?;
requester.take_response().unwrap();
Expand All @@ -63,7 +63,7 @@ fn requester_thread(mut requester: Requester<'static, u64, u64>) -> Option<()> {
fn responder_thread(mut responder: Responder<'static, u64, u64>) -> Option<()> {
let req = responder.take_request()?;
assert_eq!(req, 53);
responder.respond(&(req + 10)).unwrap();
responder.respond(req + 10).unwrap();
thread::yield_now();
responder
.with_request(|r| {
Expand Down