Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Epoll #1

Merged
merged 3 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions examples/internals-timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use pyroscope::timer::Timer;

fn main() {
// Initialize the Timer
let mut timer = Timer::default().initialize();
let mut timer = Timer::default().initialize().unwrap();

// Create a streaming channel
let (tx, rx): (Sender<u64>, Receiver<u64>) = channel();
Expand All @@ -24,11 +24,19 @@ fn main() {
timer.attach_listener(tx).unwrap();
timer.attach_listener(tx2).unwrap();

// Show current time
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();

println!("Current Time: {}", now);

// Listen to the Timer events
std::thread::spawn(move || {
while let result = rx.recv() {
match result {
Ok(time) => println!("Thread 2 Notification: {}", time),
Ok(time) => println!("Thread 1 Notification: {}", time),
Err(err) => {
println!("Error Thread 1");
break;
Expand Down
2 changes: 1 addition & 1 deletion src/pyroscope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl PyroscopeAgentBuilder {
backend.lock()?.initialize(self.config.sample_rate)?;

// Start Timer
let timer = Timer::default().initialize();
let timer = Timer::default().initialize()?;

// Return PyroscopeAgent
Ok(PyroscopeAgent {
Expand Down
119 changes: 117 additions & 2 deletions src/timer/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed
// except according to those terms.

use crate::utils::{epoll_create1, epoll_ctl, epoll_wait, read, timerfd_create, timerfd_settime};
use crate::Result;

use std::sync::{mpsc::Sender, Arc, Mutex};
Expand All @@ -19,8 +20,122 @@ pub struct Timer {
}

impl Timer {
pub fn initialize(self) -> Self {
self
pub fn initialize(self) -> Result<Self> {
let txs = Arc::clone(&self.txs);

let timer_fd = Timer::set_timerfd()?;
let epoll_fd = Timer::create_epollfd(timer_fd)?;

let handle = Some(thread::spawn(move || {
loop {
// Exit thread if there are no listeners
if txs.lock()?.len() == 0 {
// TODO: should close file descriptors?
return Ok(());
}

// Fire @ 10th sec
Timer::epoll_wait(timer_fd, epoll_fd)?;

// Get current time
let current = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();

// Iterate through Senders
txs.lock()?.iter().for_each(|tx| {
// Send event to attached Sender
tx.send(current).unwrap();
});
}
}));

Ok(Self { handle, ..self })
}

/// create and set a timer file descriptor
fn set_timerfd() -> Result<libc::c_int> {
// Set the timer to use the system time.
let clockid: libc::clockid_t = libc::CLOCK_REALTIME;
// Non-blocking file descriptor
let clock_flags: libc::c_int = libc::TFD_NONBLOCK;

// Create timer fd
let tfd = timerfd_create(clockid, clock_flags)?;

// Get the next event time
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
let rem = 10u64.checked_sub(now.checked_rem(10).unwrap()).unwrap();
let first_fire = now + rem;

// new_value sets the Timer
let mut new_value = libc::itimerspec {
it_interval: libc::timespec {
tv_sec: 10,
tv_nsec: 0,
},
it_value: libc::timespec {
tv_sec: first_fire as i64,
tv_nsec: 0,
},
};

// Empty itimerspec object
let mut old_value = libc::itimerspec {
it_interval: libc::timespec {
tv_sec: 0,
tv_nsec: 0,
},
it_value: libc::timespec {
tv_sec: 0,
tv_nsec: 0,
},
};

let set_flags = libc::TFD_TIMER_ABSTIME;

// Set the timer
timerfd_settime(tfd, set_flags, &mut new_value, &mut old_value)?;

// Return file descriptor
Ok(tfd)
}

/// Create a new epoll file descriptor and add the timer to its interests
fn create_epollfd(timer_fd: libc::c_int) -> Result<libc::c_int> {
// create a new epoll fd
let epoll_fd = epoll_create1(0)?;

// event to pull
let mut event = libc::epoll_event {
events: libc::EPOLLIN as u32,
u64: 1,
};

let epoll_flags = libc::EPOLL_CTL_ADD;

// add event to the epoll
epoll_ctl(epoll_fd, epoll_flags, timer_fd, &mut event)?;

// return epoll fd
Ok(epoll_fd)
}

fn epoll_wait(timer_fd: libc::c_int, epoll_fd: libc::c_int) -> Result<()> {
// vector to store events
let mut events = Vec::with_capacity(1);

// wait for the timer to fire an event. This is function will block.
epoll_wait(epoll_fd, events.as_mut_ptr(), 1, -1)?;

// read the value from the timerfd. This is required to re-arm the timer.
let mut buffer: u64 = 0;
let bufptr: *mut _ = &mut buffer;
read(timer_fd, bufptr as *mut libc::c_void, 8)?;

Ok(())
}

/// Attach an mpsc::Sender to Timer
Expand Down
4 changes: 2 additions & 2 deletions src/timer/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ pub struct Timer {
}

impl Timer {
pub fn initialize(self) -> Self {
self
pub fn initialize(self) -> Result<Self> {
Ok(self)
}

/// Attach an mpsc::Sender to Timer
Expand Down
4 changes: 2 additions & 2 deletions src/timer/sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct Timer {

impl Timer {
/// Initialize Timer and run a thread to send events to attached listeners
pub fn initialize(self) -> Self {
pub fn initialize(self) -> Result<Self> {
let txs = Arc::clone(&self.txs);

// Add tx
Expand Down Expand Up @@ -69,7 +69,7 @@ impl Timer {
}
}));

Self { handle, ..self }
Ok(Self { handle, ..self })
}

/// Attach an mpsc::Sender to Timer
Expand Down
59 changes: 59 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// except according to those terms.

use crate::error::Result;
use crate::PyroscopeError;

use std::collections::HashMap;

Expand Down Expand Up @@ -54,3 +55,61 @@ mod tests {
)
}
}

/// Wrapper for libc functions.
///
/// Error wrapper for some libc functions used by the library. This only does
/// Error (-1 return) wrapping. Alternatively, the nix crate could be used
/// instead of expanding this wrappers (if more functions and types are used
/// from libc)

/// Error Wrapper for libc return. Only check for errors.
fn check_err<T: Ord + Default>(num: T) -> Result<T> {
if num < T::default() {
return Err(PyroscopeError::from(std::io::Error::last_os_error()));
}
Ok(num)
}

/// libc::timerfd wrapper
pub fn timerfd_create(clockid: libc::clockid_t, clock_flags: libc::c_int) -> Result<i32> {
check_err(unsafe { libc::timerfd_create(clockid, clock_flags) }).map(|timer_fd| timer_fd as i32)
}

/// libc::timerfd_settime wrapper
pub fn timerfd_settime(
timer_fd: i32, set_flags: libc::c_int, new_value: &mut libc::itimerspec,
old_value: &mut libc::itimerspec,
) -> Result<()> {
check_err(unsafe { libc::timerfd_settime(timer_fd, set_flags, new_value, old_value) })?;
Ok(())
}

/// libc::epoll_create1 wrapper
pub fn epoll_create1(epoll_flags: libc::c_int) -> Result<i32> {
check_err(unsafe { libc::epoll_create1(epoll_flags) }).map(|epoll_fd| epoll_fd as i32)
}

/// libc::epoll_ctl wrapper
pub fn epoll_ctl(epoll_fd: i32, epoll_flags: libc::c_int, timer_fd: i32, event: &mut libc::epoll_event) -> Result<()> {
check_err(unsafe {
libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, event)
})?;
Ok(())
}

/// libc::epoll_wait wrapper
pub fn epoll_wait(epoll_fd: i32, events: *mut libc::epoll_event, maxevents: libc::c_int, timeout: libc::c_int) -> Result<()> {
check_err(unsafe {
libc::epoll_wait(epoll_fd, events, maxevents, timeout)
})?;
Ok(())
}

/// libc::read wrapper
pub fn read(timer_fd: i32, bufptr: *mut libc::c_void, count: libc::size_t) -> Result<()> {
check_err(unsafe {
libc::read(timer_fd, bufptr, count)
})?;
Ok(())
}