Skip to content

Commit

Permalink
update websocket.zig for compressionsupport
Browse files Browse the repository at this point in the history
  • Loading branch information
karlseguin committed Feb 23, 2025
1 parent f16b296 commit d2b7f75
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 12 deletions.
4 changes: 2 additions & 2 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
.hash = "122061f30077ef518dd435d397598ab3c45daa3d2c25e6b45383fb94d0bd2c3af1af"
},
.websocket = .{
.url = "https://github.com/karlseguin/websocket.zig/archive/c63acec1e31159647c66979571604097406d5647.tar.gz",
.hash = "1220cb75f778354bc894ceb824542c942b6f5f41d56052b2e146812cd323c3bb6b89"
.url = "https://github.com/karlseguin/websocket.zig/archive/b42030a9724b0babc937b771a9e79245f0401afb.tar.gz",
.hash = "1220ceeba494d30cfde1ea036f04e14de8be9b0350b5630918af60b0a04545e6b3ce"
},
},
}
4 changes: 2 additions & 2 deletions examples/08_websocket.zig
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ pub fn main() !void {
const allocator = gpa.allocator();

// For websocket support, you _must_ define a Handler, and your Handler _must_
// have a WebsocketHandler decleration
var server = try httpz.Server(Handler).init(allocator, .{ .port = PORT }, Handler{});
// have a WebsocketHandler declaration
var server = try httpz.Server(Handler).init(allocator, .{.port = PORT}, Handler{});

defer server.deinit();

Expand Down
6 changes: 6 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,12 @@ try httpz.listen(allocator, &router, .{
small_buffer_pool: ?usize = null,
large_buffer_size: ?usize = null,
large_buffer_pool: ?u16 = null,
compression: bool = false,
compression_retain_writer: bool = true,
// if compression is true, and this is null, then
// we accept compressed messaged from the client, but never send
// compressed messages
compression_write_treshold: ?usize = null,
},
});
```
Expand Down
3 changes: 3 additions & 0 deletions src/config.zig
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ pub const Config = struct {
small_buffer_pool: ?usize = null,
large_buffer_size: ?usize = null,
large_buffer_pool: ?u16 = null,
compression: bool = false,
compression_retain_writer: bool = true,
compression_write_treshold: ?usize = null,
};

pub fn threadPoolCount(self: *const Config) u32 {
Expand Down
34 changes: 26 additions & 8 deletions src/httpz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,14 @@ pub fn Server(comptime H: type) type {

// do not pass arena.allocator to WorkerState, it needs to be able to
// allocate and free at will.
const ws_config = config.websocket;
var websocket_state = try websocket.server.WorkerState.init(allocator, .{
.max_message_size = config.websocket.max_message_size,
.max_message_size = ws_config.max_message_size,
.buffers = .{
.small_size = if (has_websocket) config.websocket.small_buffer_size else 0,
.small_pool = if (has_websocket) config.websocket.small_buffer_pool else 0,
.large_size = if (has_websocket) config.websocket.large_buffer_size else 0,
.large_pool = if (has_websocket) config.websocket.large_buffer_pool else 0,
.small_size = if (has_websocket) ws_config.small_buffer_size else 0,
.small_pool = if (has_websocket) ws_config.small_buffer_pool else 0,
.large_size = if (has_websocket) ws_config.large_buffer_size else 0,
.large_pool = if (has_websocket) ws_config.large_buffer_pool else 0,
},
// disable handshake memory allocation since httpz is handling
// the handshake request directly
Expand All @@ -293,6 +294,10 @@ pub fn Server(comptime H: type) type {
.max_size = 0,
.max_headers = 0,
},
.compression = if (ws_config.compression) .{
.write_threshold = ws_config.compression_write_treshold,
.retain_write_buffer = ws_config.compression_retain_writer,
} else null,
});
errdefer websocket_state.deinit();

Expand Down Expand Up @@ -657,19 +662,32 @@ pub fn upgradeWebsocket(comptime H: type, req: *Request, res: *Response, ctx: an
const key = req.header("sec-websocket-key") orelse return false;

const http_conn = res.conn;

const ws_worker: *websocket.server.Worker(H) = @ptrCast(@alignCast(http_conn.ws_worker));

var hc = try ws_worker.createConn(http_conn.stream.handle, http_conn.address, worker.timestamp(0));
errdefer ws_worker.cleanupConn(hc);

hc.handler = try H.init(&hc.conn, ctx);
try http_conn.stream.writeAll(&websocket.Handshake.createReply(key));

var agreed_compression: ?websocket.Compression = null;
if (ws_worker.canCompress()) {
if (req.header("sec-websocket-extensions")) |ext| {
if (try websocket.Handshake.parseExtension(ext)) |request_compression| {
agreed_compression = .{
.client_no_context_takeover = request_compression.client_no_context_takeover,
.server_no_context_takeover = request_compression.server_no_context_takeover,
};
}
}
}

var reply_buf: [512]u8 = undefined;
try http_conn.stream.writeAll(websocket.Handshake.createReply(key, agreed_compression, &reply_buf));
if (comptime std.meta.hasFn(H, "afterInit")) {
const params = @typeInfo(@TypeOf(H.afterInit)).@"fn".params;
try if (comptime params.len == 1) hc.handler.?.afterInit() else hc.handler.?.afterInit(ctx);
}

try ws_worker.setupConnection(hc, agreed_compression);
res.written = true;
http_conn.handover = .{ .websocket = hc };
return true;
Expand Down

0 comments on commit d2b7f75

Please sign in to comment.