Skip to content

Commit

Permalink
Improve request body streaming API to be less error prone an to be ex…
Browse files Browse the repository at this point in the history
…posed

as an io.Reader
  • Loading branch information
karlseguin committed Nov 10, 2024
1 parent 14ca8a0 commit 74fd85a
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 95 deletions.
61 changes: 42 additions & 19 deletions src/httpz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,12 @@ pub fn Server(comptime H: type) type {
res.write() catch {
conn.handover = .close;
};

if (req.unread_body > 0 and conn.handover == .keepalive) {
drain(&req) catch {
conn.handover = .close;
};
}
}

pub fn middleware(self: *Self, comptime M: type, config: M.Config) !Middleware(H) {
Expand Down Expand Up @@ -714,6 +720,21 @@ const FallbackAllocator = struct {
}
};

// Called when we have unread bytes on the request and want to keepalive the
// connection. Only happens when lazy_read_size is configured and the client
// didn't read the [whole] body
// There should already be a receive timeout on the socket since the only
// way for this to be
fn drain(req: *Request) !void {
var r = try req.reader(2000);
var buf: [4096]u8 = undefined;
while (true) {
if (try r.read(&buf) == 0) {
return;
}
}
}

const t = @import("t.zig");
var global_test_allocator = std.heap.GeneralPurposeAllocator(.{}){};

Expand All @@ -739,13 +760,10 @@ test "tests:beforeAll" {
const ga = global_test_allocator.allocator();

{
default_server = try Server(void).init(ga, .{
.port = 5992,
.request = .{
.lazy_read_size = 4_096,
.max_body_size = 1_048_576,
}
}, {});
default_server = try Server(void).init(ga, .{ .port = 5992, .request = .{
.lazy_read_size = 4_096,
.max_body_size = 1_048_576,
} }, {});

// only need to do this because we're using listenInNewThread instead
// of blocking here. So the array to hold the middleware needs to outlive
Expand All @@ -772,7 +790,7 @@ test "tests:beforeAll" {
router.method("PING", "/test/method", TestDummyHandler.method, .{});
router.get("/test/query", TestDummyHandler.reqQuery, .{});
router.get("/test/stream", TestDummyHandler.eventStream, .{});
router.get("/test/req_stream", TestDummyHandler.reqStream, .{});
router.get("/test/req_reader", TestDummyHandler.reqReader, .{});
router.get("/test/chunked", TestDummyHandler.chunked, .{});
router.get("/test/route_data", TestDummyHandler.routeData, .{ .data = &TestDummyHandler.RouteData{ .power = 12345 } });
router.all("/test/cors", TestDummyHandler.jsonRes, .{ .middlewares = cors });
Expand Down Expand Up @@ -1306,12 +1324,12 @@ test "httpz: custom handle" {
try t.expectString("HTTP/1.1 200 \r\nContent-Length: 9\r\n\r\nhello teg", testReadAll(stream, &buf));
}

test "httpz: request body streaming" {
test "httpz: request body reader" {
{
// no body
const stream = testStream(5992);
defer stream.close();
try stream.writeAll("GET /test/req_stream HTTP/1.1\r\nContent-Length: 0\r\n\r\n");
try stream.writeAll("GET /test/req_reader HTTP/1.1\r\nContent-Length: 0\r\n\r\n");

var res = testReadParsed(stream);
defer res.deinit();
Expand All @@ -1322,7 +1340,7 @@ test "httpz: request body streaming" {
// small body
const stream = testStream(5992);
defer stream.close();
try stream.writeAll("GET /test/req_stream HTTP/1.1\r\nContent-Length: 4\r\n\r\n123z");
try stream.writeAll("GET /test/req_reader HTTP/1.1\r\nContent-Length: 4\r\n\r\n123z");

var res = testReadParsed(stream);
defer res.deinit();
Expand All @@ -1336,7 +1354,7 @@ test "httpz: request body streaming" {
for (0..10) |_| {
const stream = testStream(5992);
defer stream.close();
var req: []const u8 = "GET /test/req_stream HTTP/1.1\r\nContent-Length: 20000\r\n\r\n" ++ ("a" ** 20_000);
var req: []const u8 = "GET /test/req_reader HTTP/1.1\r\nContent-Length: 20000\r\n\r\n" ++ ("a" ** 20_000);
while (req.len > 0) {
const len = random.uintAtMost(usize, req.len - 1) + 1;
const n = stream.write(req[0..len]) catch |err| switch (err) {
Expand All @@ -1351,7 +1369,6 @@ test "httpz: request body streaming" {
defer res.deinit();
try res.expectJson(.{ .length = 20_000 });
}

}

test "websocket: invalid request" {
Expand Down Expand Up @@ -1549,16 +1566,22 @@ const TestDummyHandler = struct {
try res.startEventStream(StreamContext{ .data = "hello" }, StreamContext.handle);
}

fn reqStream(req: *Request, res: *Response) !void {
var stream = try req.streamBody();
defer stream.deinit();
fn reqReader(req: *Request, res: *Response) !void {
var reader = try req.reader(2000);

var l: usize = 0;
var buf: [1024]u8 = undefined;
while (try stream.read(&buf)) |data| {
l += data.len;
while (true) {
const n = try reader.read(&buf);
if (n == 0) {
break;
}
if (req.body_len > 10 and std.mem.indexOfNonePos(u8, buf[0..n], 0, "a") != null) {
return error.InvalidData;
}
l += n;
}
return res.json(.{.length = l}, .{});
return res.json(.{ .length = l }, .{});
}

const StreamContext = struct {
Expand Down
81 changes: 44 additions & 37 deletions src/request.zig
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ pub const Request = struct {
body_buffer: ?buffer.Buffer = null,
body_len: usize = 0,

// True if we haven't read the [full] body yet. This can only happen when
// The number of unread bytes from the body. This can only happen when
// lazy_read_size is configured and the request is larger that this value.
// There can still be _part_ of the body in body_buffer.
lazy_body: bool,
unread_body: usize,

// cannot use an optional on qs, because it's pre-allocated so always exists
qs_read: bool = false,
Expand Down Expand Up @@ -95,7 +95,7 @@ pub const Request = struct {
.fd = &state.fd,
.mfd = &state.mfd,
.method = state.method.?,
.lazy_body = state.lazy_body,
.unread_body = state.unread_body,
.method_string = state.method_string orelse "",
.protocol = state.protocol.?,
.url = Url.parse(state.url.?),
Expand Down Expand Up @@ -175,24 +175,34 @@ pub const Request = struct {
return self.parseMultiFormData();
}

pub fn streamBody(self: *Request) !Stream {
pub const Reader = std.io.Reader(*BodyReader, BodyReader.Error, BodyReader.read);

pub fn reader(self: *Request, timeout_ms: usize) !Reader {
var buf: []const u8 = &.{};
if (self.body_buffer) |bb| {
std.debug.assert(bb.type == .static);
buf = bb.data;
}

const conn = self.conn;
if (self.lazy_body == true) {
if (self.unread_body > 0) {
try conn.blockingMode();
const timeval = std.mem.toBytes(std.posix.timeval{
.sec = @intCast(@divTrunc(timeout_ms, 1000)),
.usec = @intCast(@mod(timeout_ms, 1000) * 1000),
});
try std.posix.setsockopt(conn.stream.handle, std.posix.SOL.SOCKET, std.posix.SO.RCVTIMEO, &timeval);
}

return .{
const r = try self.arena.create(BodyReader);
r.* = .{
.req = self,
.buffer = buf,
.remaining = self.body_len,
.socket = conn.stream.handle,
};

return .{ .context = r};
}

// OK, this is a bit complicated.
Expand Down Expand Up @@ -493,39 +503,35 @@ pub const Request = struct {
};
}

pub const Stream = struct {
pub const BodyReader = struct {
req: *Request,
socket: std.posix.socket_t,
remaining: usize,
buffer: []const u8,
socket: std.posix.socket_t,

pub fn deinit(self: *Stream) void {
self.req.conn.nonblockingMode() catch {};
}
pub const Error = std.posix.ReadError;

pub fn read(self: *Stream, into: []u8) !?[]u8 {
pub fn read(self: *BodyReader, into: []u8) Error!usize {
const b = self.buffer;
const remaining = self.remaining;

if (b.len != 0) {
const l = @min(b.len, into.len);

const buf = into[0..l];
@memcpy(buf, b[0..l]);

@memcpy(into[0..l], b[0..l]);
self.buffer = b[l..];
self.remaining = remaining - l;

return buf;
return l;
}

if (remaining == 0) {
return null;
return 0;
}

var buf = if (into.len > remaining) into[0..remaining] else into;
const buf = if (into.len > remaining) into[0..remaining] else into;
const n = try std.posix.read(self.socket, buf);
self.remaining = remaining - n;
return if (n == 0) null else buf[0..n];
return n;
}
};
};
Expand Down Expand Up @@ -595,9 +601,9 @@ pub const State = struct {
// know what it is from the content-length header
body_len: usize,

// True if we aren't reading the body. Happens when lazy_read_size is enabled
// and we get a large body. It'll be up to the app to read it!
lazy_body: bool,
// Happens when lazy_read_size is enabled and we get a large body.
// It'll be up to the app to read it!
unread_body: usize,

middlewares: std.StringHashMap(*anyopaque),

Expand All @@ -614,7 +620,7 @@ pub const State = struct {
.method = null,
.method_string = "",
.protocol = null,
.lazy_body = false,
.unread_body = 0,
.buffer_pool = buffer_pool,
.lazy_read_size = config.lazy_read_size,
.max_body_size = config.max_body_size orelse 1_048_576,
Expand All @@ -641,7 +647,7 @@ pub const State = struct {
self.len = 0;
self.url = null;
self.method = null;
self.lazy_body = false;
self.unread_body = 0;
self.method_string = null;
self.protocol = null;

Expand Down Expand Up @@ -794,7 +800,7 @@ pub const State = struct {
self.pos = space + 1;
self.method = .OTHER;
self.method_string = candidate;
}
},
}
return true;
}
Expand Down Expand Up @@ -953,30 +959,31 @@ pub const State = struct {
const buf = self.buf;

// how much (if any) of the body we've already read
const read = len - pos;

if (read > cl) {
return error.InvalidContentLength;
}

// how much of the body are we missing
const missing = cl - read;

if (self.lazy_read_size) |lazy_read| {
if (cl >= lazy_read) {
self.pos = len;
self.lazy_body = true;
self.unread_body = missing;
self.body = .{ .type = .static, .data = buf[pos..len] };
return true;
}
}

const read = len - pos;
if (read == cl) {
if (missing == 0) {
// we've read the entire body into buf, point to that.
self.pos = len;
self.body = .{ .type = .static, .data = buf[pos..len] };
return true;
}

if (read > cl) {
return error.InvalidContentLength;
}

// how much of the body are we missing
const missing = cl - read;

// how much spare space we have in our static buffer
const spare = buf.len - len;
if (missing < spare) {
Expand Down Expand Up @@ -1606,7 +1613,7 @@ test "request: fuzz" {
const number_of_requests = random.uintAtMost(u8, 10) + 1;

for (0..number_of_requests) |_| {
defer ctx.conn.requestDone(4096);
defer ctx.conn.requestDone(4096, true) catch unreachable;
const method = randomMethod(random);
const url = t.randomString(random, aa, 20);

Expand Down
1 change: 0 additions & 1 deletion src/response.zig
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,6 @@ pub const Response = struct {
};
};


// All the upfront memory allocation that we can do. Gets re-used from request
// to request.
pub const State = struct {
Expand Down
1 change: 1 addition & 0 deletions src/t.zig
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub const Context = struct {
.ws_worker = undefined,
.conn_arena = ctx_arena,
.req_arena = std.heap.ArenaAllocator.init(aa),
._io_mode = if (httpz.blockingMode()) .blocking else .nonblocking,
};

return .{
Expand Down
Loading

0 comments on commit 74fd85a

Please sign in to comment.