Skip to content

Commit d8ff824

Browse files
committed
replace hook processes with tokio::process
Previously, handling too many librespot events in a row, the spawned hook might not have been finished. In this case, the events would stack up, since the event channel then would not be polled. By using the tokio::process module, we gain support for waiting for processes asynchronously and even if an event would not be handled directly, it will be handled after the previous hook process has finished.
1 parent ae6dac7 commit d8ff824

File tree

3 files changed

+74
-84
lines changed

3 files changed

+74
-84
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ serde = { version = "1.0.115", features = ["derive"] }
2828
sha-1 = "0.9"
2929
structopt = "0.3.17"
3030
syslog = "4"
31-
tokio = {version = "1.6.1", features = ["signal", "rt-multi-thread"] }
31+
tokio = {version = "1.6.1", features = ["signal", "rt-multi-thread", "process"] }
3232
tokio-compat = { version = "0.1.6", features = ["rt-current-thread"] }
3333
tokio-compat-02 = "0.2.0"
3434
tokio-stream = "0.1.7"

src/main_loop.rs

+22-9
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ pub(crate) struct MainLoopState {
8888
pub(crate) autoplay: bool,
8989
pub(crate) volume_ctrl: VolumeCtrl,
9090
pub(crate) initial_volume: Option<u16>,
91-
pub(crate) running_event_program: Option<Child>,
91+
pub(crate) running_event_program: Option<Pin<Box<Child>>>,
9292
pub(crate) shell: String,
9393
pub(crate) device_type: DeviceType,
9494
// Command line option should still be available without dbus_mpris feature
@@ -120,15 +120,20 @@ impl Future for MainLoopState {
120120
}
121121

122122
if let Some(mut child) = self.running_event_program.take() {
123-
match child.try_wait() {
124-
// Still running...
125-
Ok(None) => self.running_event_program = Some(child),
126-
// Exited with error...
127-
Err(e) => error!("{}", e),
128-
// Exited without error...
129-
Ok(Some(_)) => (),
123+
// check if child has already exited
124+
if let Poll::Ready(result) = child.as_mut().poll(cx) {
125+
match result {
126+
// Exited without error...
127+
Ok(_) => (),
128+
// Exited with error...
129+
Err(e) => error!("{}", e),
130+
}
131+
} else {
132+
// drop the Box that holds a reference to our child
133+
self.running_event_program = Some(child);
130134
}
131135
}
136+
132137
if self.running_event_program.is_none() {
133138
if let Some(ref mut player_event_channel) = self.spotifyd_state.player_event_channel
134139
{
@@ -139,7 +144,15 @@ impl Future for MainLoopState {
139144
}
140145
if let Some(ref cmd) = self.spotifyd_state.player_event_program {
141146
match spawn_program_on_event(&self.shell, cmd, event) {
142-
Ok(child) => self.running_event_program = Some(child),
147+
Ok(child) => {
148+
self.running_event_program = Some({
149+
let mut child = Box::pin(child);
150+
// We poll the child once, so the waker is awoken once the process finishes.
151+
// Polling on a Poll::Ready(...) is allowed in this case.
152+
let _ = child.as_mut().poll(cx);
153+
child
154+
})
155+
}
143156
Err(e) => error!("{}", e),
144157
}
145158
}

src/process.rs

+51-74
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
use crate::error::Error;
2+
use futures::Future;
23
use librespot_playback::player::PlayerEvent;
34
use log::info;
45
use std::{
56
collections::HashMap,
6-
io::{self, Read, Write},
7-
process::{Command, ExitStatus, Stdio},
7+
io::{self, Write},
8+
pin::Pin,
9+
process::{Output, Stdio},
10+
task::{Context, Poll},
811
};
12+
use tokio::process::Command;
913

1014
/// Blocks while provided command is run in a subprocess using the provided
1115
/// shell. If successful, returns the contents of the subprocess's `stdout` as a
1216
/// `String`.
1317
pub(crate) fn run_program(shell: &str, cmd: &str) -> Result<String, Error> {
1418
info!("Running {:?} using {:?}", cmd, shell);
15-
let output = Command::new(shell)
19+
let output = std::process::Command::new(shell)
1620
.arg("-c")
1721
.arg(cmd)
1822
.output()
@@ -150,96 +154,69 @@ pub(crate) fn spawn_program_on_event(
150154
spawn_program(shell, cmd, env)
151155
}
152156

153-
/// Same as a `std::process::Child` except when this `Child` exits:
157+
/// Wraps a process into a Future that executes something after the process has
158+
/// exited:
154159
/// * successfully: It writes the contents of it's stdout to the stdout of the
155160
/// main process.
156161
/// * unsuccesfully: It returns an error that includes the contents it's stderr
157162
/// as well as information on the command that was run and the shell that
158163
/// invoked it.
159-
#[derive(Debug)]
160164
pub(crate) struct Child {
161165
cmd: String,
162-
inner: std::process::Child,
166+
future: Pin<Box<dyn Future<Output = io::Result<Output>>>>,
163167
shell: String,
164168
}
165169

166170
impl Child {
167-
pub(crate) fn new(cmd: String, child: std::process::Child, shell: String) -> Self {
171+
pub(crate) fn new(cmd: String, child: tokio::process::Child, shell: String) -> Self {
168172
Self {
169173
cmd,
170-
inner: child,
174+
future: Box::pin(child.wait_with_output()),
171175
shell,
172176
}
173177
}
178+
}
174179

175-
#[allow(unused)]
176-
pub(crate) fn wait(&mut self) -> Result<(), Error> {
177-
match self.inner.wait() {
178-
Ok(status) => {
179-
self.write_output(status)?;
180-
Ok(())
181-
}
182-
Err(e) => Err(Error::subprocess_with_err(&self.shell, &self.cmd, e)),
183-
}
184-
}
185-
186-
#[allow(unused)]
187-
pub(crate) fn try_wait(&mut self) -> Result<Option<()>, Error> {
188-
match self.inner.try_wait() {
189-
Ok(Some(status)) => {
190-
self.write_output(status)?;
191-
Ok(Some(()))
180+
impl Future for Child {
181+
type Output = Result<(), Error>;
182+
183+
fn poll(mut self: Pin<&mut Child>, cx: &mut Context<'_>) -> Poll<Self::Output> {
184+
if let Poll::Ready(result) = self.future.as_mut().poll(cx) {
185+
let output = match result {
186+
Ok(output) => output,
187+
Err(err) => {
188+
return Poll::Ready(Err(Error::subprocess_with_err(
189+
&self.shell,
190+
&self.cmd,
191+
err,
192+
)));
193+
}
194+
};
195+
196+
if output.status.success() {
197+
// If successful, write subprocess's stdout to main process's stdout...
198+
let stdout = io::stdout();
199+
let mut writer = stdout.lock();
200+
201+
writer
202+
.write_all(&output.stdout)
203+
.map_err(|e| Error::subprocess_with_err(&self.shell, &self.cmd, e))?;
204+
205+
writer
206+
.flush()
207+
.map_err(|e| Error::subprocess_with_err(&self.shell, &self.cmd, e))?;
208+
209+
Poll::Ready(Ok(()))
210+
} else {
211+
// If unsuccessful, return an error that includes the contents of stderr...
212+
let stderr = String::from_utf8(output.stderr);
213+
match stderr {
214+
Ok(stderr) => Err(Error::subprocess_with_str(&self.shell, &self.cmd, &stderr)),
215+
Err(_) => Err(Error::subprocess(&self.shell, &self.cmd)),
216+
}?
192217
}
193-
Ok(None) => Ok(None),
194-
Err(e) => Err(Error::subprocess_with_err(&self.shell, &self.cmd, e)),
195-
}
196-
}
197-
198-
fn write_output(&mut self, status: ExitStatus) -> Result<(), Error> {
199-
if status.success() {
200-
// If successful, write subprocess's stdout to main process's stdout...
201-
let mut stdout_of_child = self.inner.stdout.as_mut().unwrap();
202-
let reader = &mut stdout_of_child;
203-
204-
let stdout_of_main = io::stdout();
205-
let mut guard = stdout_of_main.lock();
206-
let writer = &mut guard;
207-
208-
io::copy(reader, writer)
209-
.map_err(|e| Error::subprocess_with_err(&self.shell, &self.cmd, e))?;
210-
211-
writer
212-
.flush()
213-
.map_err(|e| Error::subprocess_with_err(&self.shell, &self.cmd, e))?;
214-
215-
Ok(())
216218
} else {
217-
// If unsuccessful, return an error that includes the contents of stderr...
218-
let mut buf = String::new();
219-
match self.inner.stderr.as_mut().unwrap().read_to_string(&mut buf) {
220-
Ok(_nread) => Err(Error::subprocess_with_str(&self.shell, &self.cmd, &buf)),
221-
Err(e) => Err(Error::subprocess_with_err(&self.shell, &self.cmd, e)),
222-
}
219+
Poll::Pending
223220
}
224221
}
225222
}
226-
227-
impl std::ops::Deref for Child {
228-
type Target = std::process::Child;
229-
230-
fn deref(&self) -> &Self::Target {
231-
&self.inner
232-
}
233-
}
234-
235-
impl std::ops::DerefMut for Child {
236-
fn deref_mut(&mut self) -> &mut Self::Target {
237-
&mut self.inner
238-
}
239-
}
240-
241-
impl From<Child> for std::process::Child {
242-
fn from(child: Child) -> Self {
243-
child.inner
244-
}
245-
}

0 commit comments

Comments
 (0)