From 482635c6216b7236013cba03c9d5c1dedd31b010 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 26 Jan 2023 19:04:22 -0800 Subject: [PATCH] Correctly support streaming responses with webrick. (#8) --- lib/rackup/handler/webrick.rb | 102 ++++++++++------- lib/rackup/stream.rb | 199 ++++++++++++++++++++++++++++++++++ rackup.gemspec | 2 +- test/spec_webrick.rb | 36 +++--- 4 files changed, 282 insertions(+), 57 deletions(-) create mode 100644 lib/rackup/stream.rb diff --git a/lib/rackup/handler/webrick.rb b/lib/rackup/handler/webrick.rb index 42b6bf1..7a7070f 100644 --- a/lib/rackup/handler/webrick.rb +++ b/lib/rackup/handler/webrick.rb @@ -11,28 +11,11 @@ require_relative '../handler' require_relative '../version' -# This monkey patch allows for applications to perform their own chunking -# through WEBrick::HTTPResponse if rack is set to true. -class WEBrick::HTTPResponse - attr_accessor :rack - - alias _rack_setup_header setup_header - def setup_header - app_chunking = rack && @header['transfer-encoding'] == 'chunked' - - @chunked = app_chunking if app_chunking - - _rack_setup_header - - @chunked = false if app_chunking - end -end +require_relative '../stream' module Rackup module Handler class WEBrick < ::WEBrick::HTTPServlet::AbstractServlet - include Rack - def self.run(app, **options) environment = ENV['RACK_ENV'] || 'development' default_host = environment == 'development' ? 'localhost' : nil @@ -73,43 +56,75 @@ def initialize(server, app) @app = app end + # This handles mapping the WEBrick request to a Rack input stream. + class Input + include Stream::Reader + + def initialize(request) + @request = request + + @reader = Fiber.new do + @request.body do |chunk| + Fiber.yield(chunk) + end + + Fiber.yield(nil) + + # End of stream: + @reader = nil + end + end + + def close + @request = nil + @reader = nil + end + + private + + # Read one chunk from the request body. + def read_next + @reader&.resume + end + end + def service(req, res) - res.rack = true env = req.meta_vars env.delete_if { |k, v| v.nil? } - rack_input = StringIO.new(req.body.to_s) - rack_input.set_encoding(Encoding::BINARY) + input = Input.new(req) env.update( - RACK_INPUT => rack_input, - RACK_ERRORS => $stderr, - RACK_URL_SCHEME => ["yes", "on", "1"].include?(env[HTTPS]) ? "https" : "http", - RACK_IS_HIJACK => true, + ::Rack::RACK_INPUT => input, + ::Rack::RACK_ERRORS => $stderr, + ::Rack::RACK_URL_SCHEME => ["yes", "on", "1"].include?(env[::Rack::HTTPS]) ? "https" : "http", + ::Rack::RACK_IS_HIJACK => true, ) - env[QUERY_STRING] ||= "" - unless env[PATH_INFO] == "" - path, n = req.request_uri.path, env[SCRIPT_NAME].length - env[PATH_INFO] = path[n, path.length - n] + env[::Rack::QUERY_STRING] ||= "" + unless env[::Rack::PATH_INFO] == "" + path, n = req.request_uri.path, env[::Rack::SCRIPT_NAME].length + env[::Rack::PATH_INFO] = path[n, path.length - n] end - env[REQUEST_PATH] ||= [env[SCRIPT_NAME], env[PATH_INFO]].join + env[::Rack::REQUEST_PATH] ||= [env[::Rack::SCRIPT_NAME], env[::Rack::PATH_INFO]].join status, headers, body = @app.call(env) begin res.status = status - if value = headers[RACK_HIJACK] + if value = headers[::Rack::RACK_HIJACK] io_lambda = value + body = nil elsif !body.respond_to?(:to_path) && !body.respond_to?(:each) io_lambda = body + body = nil end if value = headers.delete('set-cookie') res.cookies.concat(Array(value)) end - headers.each { |key, value| + headers.each do |key, value| # Skip keys starting with rack., per Rack SPEC next if key.start_with?('rack.') @@ -117,22 +132,27 @@ def service(req, res) # merge the values per RFC 1945 section 4.2. value = value.join(", ") if Array === value res[key] = value - } + end if io_lambda - rd, wr = IO.pipe - res.body = rd - res.chunked = true - io_lambda.call wr + protocol = headers['rack.protocol'] || headers['upgrade'] + + if protocol + # Set all the headers correctly for an upgrade response: + res.upgrade!(protocol) + end + res.body = io_lambda elsif body.respond_to?(:to_path) res.body = ::File.open(body.to_path, 'rb') else - body.each { |part| - res.body << part - } + buffer = String.new + body.each do |part| + buffer << part + end + res.body = buffer end ensure - body.close if body.respond_to? :close + body.close if body.respond_to?(:close) end end end diff --git a/lib/rackup/stream.rb b/lib/rackup/stream.rb new file mode 100644 index 0000000..256ce67 --- /dev/null +++ b/lib/rackup/stream.rb @@ -0,0 +1,199 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2019-2022, by Samuel Williams. + +module Rackup + # The input stream is an IO-like object which contains the raw HTTP POST data. When applicable, its external encoding must be “ASCII-8BIT” and it must be opened in binary mode, for Ruby 1.9 compatibility. The input stream must respond to gets, each, read and rewind. + class Stream + def initialize(input = nil, output = Buffered.new) + @input = input + @output = output + + raise ArgumentError, "Non-writable output!" unless output.respond_to?(:write) + + # Will hold remaining data in `#read`. + @buffer = nil + @closed = false + end + + attr :input + attr :output + + # This provides a read-only interface for data, which is surprisingly tricky to implement correctly. + module Reader + # rack.hijack_io must respond to: + # read, write, read_nonblock, write_nonblock, flush, close, close_read, close_write, closed? + + # read behaves like IO#read. Its signature is read([length, [buffer]]). If given, length must be a non-negative Integer (>= 0) or nil, and buffer must be a String and may not be nil. If length is given and not nil, then this method reads at most length bytes from the input stream. If length is not given or nil, then this method reads all data until EOF. When EOF is reached, this method returns nil if length is given and not nil, or “” if length is not given or is nil. If buffer is given, then the read data will be placed into buffer instead of a newly created String object. + # @param length [Integer] the amount of data to read + # @param buffer [String] the buffer which will receive the data + # @return a buffer containing the data + def read(length = nil, buffer = nil) + return '' if length == 0 + + buffer ||= String.new.force_encoding(Encoding::BINARY) + + # Take any previously buffered data and replace it into the given buffer. + if @buffer + buffer.replace(@buffer) + @buffer = nil + else + buffer.clear + end + + if length + while buffer.bytesize < length and chunk = read_next + buffer << chunk + end + + # This ensures the subsequent `slice!` works correctly. + buffer.force_encoding(Encoding::BINARY) + + # This will be at least one copy: + @buffer = buffer.byteslice(length, buffer.bytesize) + + # This should be zero-copy: + buffer.slice!(length, buffer.bytesize) + + if buffer.empty? + return nil + else + return buffer + end + else + while chunk = read_next + buffer << chunk + end + + return buffer + end + end + + # Read at most `length` bytes from the stream. Will avoid reading from the underlying stream if possible. + def read_partial(length = nil) + if @buffer + buffer = @buffer + @buffer = nil + else + buffer = read_next + end + + if buffer and length + if buffer.bytesize > length + # This ensures the subsequent `slice!` works correctly. + buffer.force_encoding(Encoding::BINARY) + + @buffer = buffer.byteslice(length, buffer.bytesize) + buffer.slice!(length, buffer.bytesize) + end + end + + return buffer + end + + def gets + read_partial + end + + def each + while chunk = read_partial + yield chunk + end + end + + def read_nonblock(length, buffer = nil) + @buffer ||= read_next + chunk = nil + + unless @buffer + buffer&.clear + return + end + + if @buffer.bytesize > length + chunk = @buffer.byteslice(0, length) + @buffer = @buffer.byteslice(length, @buffer.bytesize) + else + chunk = @buffer + @buffer = nil + end + + if buffer + buffer.replace(chunk) + else + buffer = chunk + end + + return buffer + end + end + + include Reader + + def write(buffer) + if @output + @output.write(buffer) + return buffer.bytesize + else + raise IOError, "Stream is not writable, output has been closed!" + end + end + + def write_nonblock(buffer) + write(buffer) + end + + def <<(buffer) + write(buffer) + end + + def flush + end + + def close_read + @input&.close + @input = nil + end + + # close must never be called on the input stream. huh? + def close_write + if @output.respond_to?(:close) + @output&.close + end + + @output = nil + end + + # Close the input and output bodies. + def close(error = nil) + self.close_read + self.close_write + + return nil + ensure + @closed = true + end + + # Whether the stream has been closed. + def closed? + @closed + end + + # Whether there are any output chunks remaining? + def empty? + @output.empty? + end + + private + + def read_next + if @input + return @input.read + else + @input = nil + raise IOError, "Stream is not readable, input has been closed!" + end + end + end +end diff --git a/rackup.gemspec b/rackup.gemspec index 39a92b9..abd3648 100644 --- a/rackup.gemspec +++ b/rackup.gemspec @@ -19,7 +19,7 @@ Gem::Specification.new do |spec| spec.required_ruby_version = ">= 2.4.0" spec.add_dependency "rack", ">= 3" - spec.add_dependency "webrick" + spec.add_dependency "webrick", "~> 1.8" spec.add_development_dependency "bundler" spec.add_development_dependency "minitest", "~> 5.0" diff --git a/test/spec_webrick.rb b/test/spec_webrick.rb index 1d1e85b..792de36 100644 --- a/test/spec_webrick.rb +++ b/test/spec_webrick.rb @@ -196,22 +196,28 @@ def is_running? } end - it "produce correct HTTP semantics with and without app chunking" do - @server.mount "/chunked", Rackup::Handler::WEBrick, - Rack::Lint.new(lambda{ |req| - [ - 200, - { "transfer-encoding" => "chunked" }, - ["7\r\nchunked\r\n0\r\n\r\n"] - ] - }) + it "produce correct HTTP semantics with upgrade response" do + app = proc do |env| + body = proc do |io| + io.write "hello" + io.close + end - Net::HTTP.start(@host, @port){ |http| - res = http.get("/chunked") - res["transfer-encoding"].must_equal "chunked" - res["content-length"].must_be_nil - res.body.must_equal "chunked" - } + [101, {"connection" => "upgrade", "upgrade" => "text"}, body] + end + + @server.mount "/app", Rackup::Handler::WEBrick, Rack::Lint.new(app) + + TCPSocket.open(@host, @port) do |socket| + socket.write "GET /app HTTP/1.1\r\n" + socket.write "Host: #{@host}\r\n\r\n" + + response = socket.read + response.must_match(/HTTP\/1.1 101 Switching Protocols/) + response.must_match(/Connection: upgrade/) + response.must_match(/Upgrade: text/) + response.must_match(/hello/) + end end after do