Skip to content

Commit

Permalink
Split wasi-stream into separate input and output stream types. (bytec…
Browse files Browse the repository at this point in the history
…odealliance#73)

* Split wasi-stream into separate input and output stream types.

This syncs the prototype's streams with the upstream wasi-io repo.

Streams are unidirectional, so this allows us to statically describe
whether something is an input stream or an output stream in an
interface.

This differs a little from the component model async streams, which
don't have separate input and output streams, but it does partially
reflect how the component model async design differentiates between
input streams in type signatures, which are passed in as arguments,
and output streams, which are returned as return values.

* Fix compilation on Windows.
  • Loading branch information
sunfishcode authored Feb 6, 2023
1 parent bb95c24 commit 2ae727c
Show file tree
Hide file tree
Showing 19 changed files with 543 additions and 422 deletions.
14 changes: 7 additions & 7 deletions host/src/filesystem.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(unused_variables)]

use crate::wasi_poll::WasiStream;
use crate::wasi_poll::{InputStream, OutputStream};
use crate::{wasi_filesystem, HostResult, WasiCtx};
use std::{
io::{IoSlice, IoSliceMut},
Expand Down Expand Up @@ -656,7 +656,7 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx {
&mut self,
fd: wasi_filesystem::Descriptor,
offset: wasi_filesystem::Filesize,
) -> HostResult<WasiStream, wasi_filesystem::Errno> {
) -> HostResult<InputStream, wasi_filesystem::Errno> {
let f = self.table_mut().get_file_mut(fd).map_err(convert)?;

// Duplicate the file descriptor so that we get an indepenent lifetime.
Expand All @@ -666,7 +666,7 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx {
let reader = FileStream::new_reader(clone, offset);

// Box it up.
let boxed: Box<dyn wasi_common::WasiStream> = Box::new(reader);
let boxed: Box<dyn wasi_common::InputStream> = Box::new(reader);

// Insert the stream view into the table.
let index = self.table_mut().push(Box::new(boxed)).map_err(convert)?;
Expand All @@ -678,7 +678,7 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx {
&mut self,
fd: wasi_filesystem::Descriptor,
offset: wasi_filesystem::Filesize,
) -> HostResult<WasiStream, wasi_filesystem::Errno> {
) -> HostResult<OutputStream, wasi_filesystem::Errno> {
let f = self.table_mut().get_file_mut(fd).map_err(convert)?;

// Duplicate the file descriptor so that we get an indepenent lifetime.
Expand All @@ -688,7 +688,7 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx {
let writer = FileStream::new_writer(clone, offset);

// Box it up.
let boxed: Box<dyn wasi_common::WasiStream> = Box::new(writer);
let boxed: Box<dyn wasi_common::OutputStream> = Box::new(writer);

// Insert the stream view into the table.
let index = self.table_mut().push(Box::new(boxed)).map_err(convert)?;
Expand All @@ -699,7 +699,7 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx {
async fn append_via_stream(
&mut self,
fd: wasi_filesystem::Descriptor,
) -> HostResult<WasiStream, wasi_filesystem::Errno> {
) -> HostResult<OutputStream, wasi_filesystem::Errno> {
let f = self.table_mut().get_file_mut(fd).map_err(convert)?;

// Duplicate the file descriptor so that we get an indepenent lifetime.
Expand All @@ -709,7 +709,7 @@ impl wasi_filesystem::WasiFilesystem for WasiCtx {
let appender = FileStream::new_appender(clone);

// Box it up.
let boxed: Box<dyn wasi_common::WasiStream> = Box::new(appender);
let boxed: Box<dyn wasi_common::OutputStream> = Box::new(appender);

// Insert the stream view into the table.
let index = self.table_mut().push(Box::new(boxed)).map_err(convert)?;
Expand Down
154 changes: 154 additions & 0 deletions host/src/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use crate::{
wasi_io::{InputStream, OutputStream, StreamError, WasiIo},
HostResult, WasiCtx,
};
use wasi_common::stream::TableStreamExt;

fn convert(error: wasi_common::Error) -> anyhow::Error {
if let Some(_errno) = error.downcast_ref() {
anyhow::Error::new(StreamError {})
} else {
error.into()
}
}

#[async_trait::async_trait]
impl WasiIo for WasiCtx {
async fn drop_input_stream(&mut self, stream: InputStream) -> anyhow::Result<()> {
self.table_mut()
.delete::<Box<dyn wasi_common::InputStream>>(stream)
.map_err(convert)?;
Ok(())
}

async fn drop_output_stream(&mut self, stream: OutputStream) -> anyhow::Result<()> {
self.table_mut()
.delete::<Box<dyn wasi_common::OutputStream>>(stream)
.map_err(convert)?;
Ok(())
}

async fn read(
&mut self,
stream: InputStream,
len: u64,
) -> HostResult<(Vec<u8>, bool), StreamError> {
let s: &mut Box<dyn wasi_common::InputStream> = self
.table_mut()
.get_input_stream_mut(stream)
.map_err(convert)?;

let mut buffer = vec![0; len.try_into().unwrap()];

let (bytes_read, end) = s.read(&mut buffer).await.map_err(convert)?;

buffer.truncate(bytes_read as usize);

Ok(Ok((buffer, end)))
}

async fn write(
&mut self,
stream: OutputStream,
bytes: Vec<u8>,
) -> HostResult<u64, StreamError> {
let s: &mut Box<dyn wasi_common::OutputStream> = self
.table_mut()
.get_output_stream_mut(stream)
.map_err(convert)?;

let bytes_written: u64 = s.write(&bytes).await.map_err(convert)?;

Ok(Ok(u64::try_from(bytes_written).unwrap()))
}

async fn skip(
&mut self,
stream: InputStream,
len: u64,
) -> HostResult<(u64, bool), StreamError> {
let s: &mut Box<dyn wasi_common::InputStream> = self
.table_mut()
.get_input_stream_mut(stream)
.map_err(convert)?;

let (bytes_skipped, end) = s.skip(len).await.map_err(convert)?;

Ok(Ok((bytes_skipped, end)))
}

async fn write_repeated(
&mut self,
stream: OutputStream,
byte: u8,
len: u64,
) -> HostResult<u64, StreamError> {
let s: &mut Box<dyn wasi_common::OutputStream> = self
.table_mut()
.get_output_stream_mut(stream)
.map_err(convert)?;

let bytes_written: u64 = s.write_repeated(byte, len).await.map_err(convert)?;

Ok(Ok(bytes_written))
}

async fn splice(
&mut self,
_src: InputStream,
_dst: OutputStream,
_len: u64,
) -> HostResult<(u64, bool), StreamError> {
// TODO: We can't get two streams at the same time because they both
// carry the exclusive lifetime of `self`. When [`get_many_mut`] is
// stabilized, that could allow us to add a `get_many_stream_mut` or
// so which lets us do this.
//
// [`get_many_mut`]: https://doc.rust-lang.org/stable/std/collections/hash_map/struct.HashMap.html#method.get_many_mut
/*
let s: &mut Box<dyn wasi_common::InputStream> = self
.table_mut()
.get_input_stream_mut(src)
.map_err(convert)?;
let d: &mut Box<dyn wasi_common::OutputStream> = self
.table_mut()
.get_output_stream_mut(dst)
.map_err(convert)?;
let bytes_spliced: u64 = s.splice(&mut **d, len).await.map_err(convert)?;
Ok(bytes_spliced)
*/

todo!()
}

async fn forward(
&mut self,
_src: InputStream,
_dst: OutputStream,
) -> HostResult<u64, StreamError> {
// TODO: We can't get two streams at the same time because they both
// carry the exclusive lifetime of `self`. When [`get_many_mut`] is
// stabilized, that could allow us to add a `get_many_stream_mut` or
// so which lets us do this.
//
// [`get_many_mut`]: https://doc.rust-lang.org/stable/std/collections/hash_map/struct.HashMap.html#method.get_many_mut
/*
let s: &mut Box<dyn wasi_common::InputStream> = self
.table_mut()
.get_input_stream_mut(src)
.map_err(convert)?;
let d: &mut Box<dyn wasi_common::OutputStream> = self
.table_mut()
.get_output_stream_mut(dst)
.map_err(convert)?;
let bytes_spliced: u64 = s.splice(&mut **d, len).await.map_err(convert)?;
Ok(bytes_spliced)
*/

todo!()
}
}
2 changes: 2 additions & 0 deletions host/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod clocks;
mod exit;
mod filesystem;
mod io;
mod logging;
mod poll;
mod random;
Expand All @@ -27,6 +28,7 @@ pub fn add_to_linker<T: Send>(
wasi_logging::add_to_linker(l, f)?;
wasi_stderr::add_to_linker(l, f)?;
wasi_poll::add_to_linker(l, f)?;
wasi_io::add_to_linker(l, f)?;
wasi_random::add_to_linker(l, f)?;
wasi_tcp::add_to_linker(l, f)?;
wasi_exit::add_to_linker(l, f)?;
Expand Down
117 changes: 12 additions & 105 deletions host/src/poll.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use crate::{
wasi_clocks,
wasi_poll::{self, Pollable, Size, StreamError, WasiPoll, WasiStream},
HostResult, WasiCtx,
wasi_io::{InputStream, OutputStream, StreamError},
wasi_poll::{Pollable, WasiPoll},
WasiCtx,
};
use wasi_common::clocks::TableMonotonicClockExt;
use wasi_common::stream::TableStreamExt;

fn convert(error: wasi_common::Error) -> anyhow::Error {
if let Some(_errno) = error.downcast_ref() {
anyhow::Error::new(wasi_poll::StreamError {})
anyhow::Error::new(StreamError {})
} else {
error.into()
}
Expand All @@ -18,9 +19,9 @@ fn convert(error: wasi_common::Error) -> anyhow::Error {
#[derive(Copy, Clone)]
enum PollableEntry {
/// Poll for read events.
Read(WasiStream),
Read(InputStream),
/// Poll for write events.
Write(WasiStream),
Write(OutputStream),
/// Poll for a monotonic-clock timer.
MonotonicClock(wasi_clocks::MonotonicClock, wasi_clocks::Instant, bool),
}
Expand All @@ -43,13 +44,13 @@ impl WasiPoll for WasiCtx {
for (index, future) in futures.into_iter().enumerate() {
match *self.table().get(future).map_err(convert)? {
PollableEntry::Read(stream) => {
let wasi_stream: &dyn wasi_common::WasiStream =
self.table().get_stream(stream).map_err(convert)?;
let wasi_stream: &dyn wasi_common::InputStream =
self.table().get_input_stream(stream).map_err(convert)?;
poll.subscribe_read(wasi_stream, Userdata::from(index as u64));
}
PollableEntry::Write(stream) => {
let wasi_stream: &dyn wasi_common::WasiStream =
self.table().get_stream(stream).map_err(convert)?;
let wasi_stream: &dyn wasi_common::OutputStream =
self.table().get_output_stream(stream).map_err(convert)?;
poll.subscribe_write(wasi_stream, Userdata::from(index as u64));
}
PollableEntry::MonotonicClock(clock, when, absolute) => {
Expand All @@ -75,107 +76,13 @@ impl WasiPoll for WasiCtx {
Ok(results)
}

async fn drop_stream(&mut self, stream: WasiStream) -> anyhow::Result<()> {
self.table_mut()
.delete::<Box<dyn wasi_common::WasiStream>>(stream)
.map_err(convert)?;
Ok(())
}

async fn read_stream(
&mut self,
stream: WasiStream,
len: Size,
) -> HostResult<(Vec<u8>, bool), StreamError> {
let s: &mut Box<dyn wasi_common::WasiStream> =
self.table_mut().get_stream_mut(stream).map_err(convert)?;

let mut buffer = vec![0; len.try_into().unwrap()];

let (bytes_read, end) = s.read(&mut buffer).await.map_err(convert)?;

buffer.truncate(bytes_read as usize);

Ok(Ok((buffer, end)))
}

async fn write_stream(
&mut self,
stream: WasiStream,
bytes: Vec<u8>,
) -> HostResult<Size, StreamError> {
let s: &mut Box<dyn wasi_common::WasiStream> =
self.table_mut().get_stream_mut(stream).map_err(convert)?;

let bytes_written: u64 = s.write(&bytes).await.map_err(convert)?;

Ok(Ok(Size::try_from(bytes_written).unwrap()))
}

async fn skip_stream(
&mut self,
stream: WasiStream,
len: u64,
) -> HostResult<(u64, bool), StreamError> {
let s: &mut Box<dyn wasi_common::WasiStream> =
self.table_mut().get_stream_mut(stream).map_err(convert)?;

let (bytes_skipped, end) = s.skip(len).await.map_err(convert)?;

Ok(Ok((bytes_skipped, end)))
}

async fn write_repeated_stream(
&mut self,
stream: WasiStream,
byte: u8,
len: u64,
) -> HostResult<u64, StreamError> {
let s: &mut Box<dyn wasi_common::WasiStream> =
self.table_mut().get_stream_mut(stream).map_err(convert)?;

let bytes_written: u64 = s.write_repeated(byte, len).await.map_err(convert)?;

Ok(Ok(bytes_written))
}

async fn splice_stream(
&mut self,
_src: WasiStream,
_dst: WasiStream,
_len: u64,
) -> HostResult<(u64, bool), StreamError> {
// TODO: We can't get two streams at the same time because they both
// carry the exclusive lifetime of `self`. When [`get_many_mut`] is
// stabilized, that could allow us to add a `get_many_stream_mut` or
// so which lets us do this.
//
// [`get_many_mut`]: https://doc.rust-lang.org/stable/std/collections/hash_map/struct.HashMap.html#method.get_many_mut
/*
let s: &mut Box<dyn wasi_common::WasiStream> = self
.table_mut()
.get_stream_mut(src)
.map_err(convert)?;
let d: &mut Box<dyn wasi_common::WasiStream> = self
.table_mut()
.get_stream_mut(dst)
.map_err(convert)?;
let bytes_spliced: u64 = s.splice(&mut **d, len).await.map_err(convert)?;
Ok(bytes_spliced)
*/

todo!()
}

async fn subscribe_read(&mut self, stream: WasiStream) -> anyhow::Result<Pollable> {
async fn subscribe_read(&mut self, stream: InputStream) -> anyhow::Result<Pollable> {
Ok(self
.table_mut()
.push(Box::new(PollableEntry::Read(stream)))?)
}

async fn subscribe_write(&mut self, stream: WasiStream) -> anyhow::Result<Pollable> {
async fn subscribe_write(&mut self, stream: OutputStream) -> anyhow::Result<Pollable> {
Ok(self
.table_mut()
.push(Box::new(PollableEntry::Write(stream)))?)
Expand Down
Loading

0 comments on commit 2ae727c

Please sign in to comment.