diff --git a/CHANGELOG.md b/CHANGELOG.md index 2aa2eaef..a97f4f2d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ Kubeclient release versioning follows [SemVer](https://semver.org/). ## Unreleased ### Changed - `Kubeclient::Client.new` now always requires an api version, use for example: `Kubeclient::Client.new(uri, 'v1')` +- `socket_options` has been removed from `Kubeclient::Client.new` ## 4.9.1 — 2020-08-31 ### Fixed diff --git a/README.md b/README.md index 31e77db3..1a375a7d 100644 --- a/README.md +++ b/README.md @@ -160,24 +160,6 @@ namespace = File.read('/var/run/secrets/kubernetes.io/serviceaccount/namespace') ``` You can find information about tokens in [this guide](https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#accessing-the-api-from-a-pod) and in [this reference](http://kubernetes.io/docs/admin/authentication/). -### Non-blocking IO - -You can also use kubeclient with non-blocking sockets such as Celluloid::IO, see [here](https://github.com/httprb/http/wiki/Parallel-requests-with-Celluloid%3A%3AIO) -for details. For example: - -```ruby -require 'celluloid/io' -socket_options = { - socket_class: Celluloid::IO::TCPSocket, - ssl_socket_class: Celluloid::IO::SSLSocket -} -client = Kubeclient::Client.new( - 'https://localhost:8443/api', 'v1', socket_options: socket_options -) -``` - -This affects only `.watch_*` sockets, not one-off actions like `.get_*`, `.delete_*` etc. - ### Proxies You can also use kubeclient with an http proxy server such as tinyproxy. It can be entered as a string or a URI object. diff --git a/kubeclient.gemspec b/kubeclient.gemspec index bca26463..242f2f82 100644 --- a/kubeclient.gemspec +++ b/kubeclient.gemspec @@ -34,5 +34,4 @@ Gem::Specification.new do |spec| spec.add_dependency 'faraday_middleware', '~> 1.0' spec.add_dependency 'jsonpath', '~> 1.0' spec.add_dependency 'recursive-open-struct', '~> 1.1', '>= 1.1.1' - spec.add_dependency 'http', '>= 3.0', '< 5.0' end diff --git a/lib/kubeclient/common.rb b/lib/kubeclient/common.rb index 9ea84513..fcf5df38 100644 --- a/lib/kubeclient/common.rb +++ b/lib/kubeclient/common.rb @@ -26,11 +26,6 @@ module ClientMixin bearer_token_file: nil }.freeze - DEFAULT_SOCKET_OPTIONS = { - socket_class: nil, - ssl_socket_class: nil - }.freeze - DEFAULT_TIMEOUTS = { # These do NOT affect watch, watching never times out. open: Net::HTTP.new('127.0.0.1').open_timeout, # depends on ruby version @@ -68,7 +63,6 @@ def initialize_client( version, ssl_options: DEFAULT_SSL_OPTIONS, auth_options: DEFAULT_AUTH_OPTIONS, - socket_options: DEFAULT_SOCKET_OPTIONS, timeouts: DEFAULT_TIMEOUTS, http_proxy_uri: DEFAULT_HTTP_PROXY_URI, http_max_redirects: DEFAULT_HTTP_MAX_REDIRECTS, @@ -83,7 +77,6 @@ def initialize_client( @headers = {} @ssl_options = ssl_options @auth_options = auth_options - @socket_options = socket_options # Allow passing partial timeouts hash, without unspecified # @timeouts[:foo] == nil resulting in infinite timeout. @timeouts = DEFAULT_TIMEOUTS.merge(timeouts) @@ -348,7 +341,7 @@ def watch_entities(resource_name, options = {}, &block) watcher = Kubeclient::Common::WatchStream.new( uri, - http_options(uri), + watch_options(uri), formatter: ->(value) { format_response(options[:as] || @as, value) } ) @@ -501,7 +494,7 @@ def watch_pod_log(pod_name, namespace, container: nil, &block) uri.query = URI.encode_www_form(params) watcher = Kubeclient::Common::WatchStream.new( - uri, http_options(uri), formatter: ->(value) { value } + uri, watch_options(uri), formatter: ->(value) { value } ) return_or_yield_to_watcher(watcher, &block) end @@ -653,28 +646,30 @@ def return_or_yield_to_watcher(watcher, &block) end end - def http_options(uri) - options = { - basic_auth_user: @auth_options[:username], - basic_auth_password: @auth_options[:password], + def watch_options(uri) + watch_options = { + auth_options: { + username: @auth_options[:username], + password: @auth_options[:password] + }, + faraday_options: { + proxy: @http_proxy_uri + }, headers: @headers, - http_proxy_uri: @http_proxy_uri, http_max_redirects: http_max_redirects } if uri.scheme == 'https' - options[:ssl] = { + watch_options[:faraday_options][:ssl] = { ca_file: @ssl_options[:ca_file], - cert: @ssl_options[:client_cert], cert_store: @ssl_options[:cert_store], + client_cert: @ssl_options[:client_cert], key: @ssl_options[:client_key], - # ruby HTTP uses verify_mode instead of verify_ssl - # http://ruby-doc.org/stdlib-1.9.3/libdoc/openssl/rdoc/OpenSSL/SSL/SSLContext.html - verify_mode: @ssl_options[:verify_ssl] + verify: @ssl_options[:verify_ssl] } end - options.merge(@socket_options) + watch_options end def json_headers diff --git a/lib/kubeclient/watch_stream.rb b/lib/kubeclient/watch_stream.rb index aa43b5c9..e887a2a4 100644 --- a/lib/kubeclient/watch_stream.rb +++ b/lib/kubeclient/watch_stream.rb @@ -1,98 +1,71 @@ # frozen_string_literal: true require 'json' -require 'http' + module Kubeclient module Common # HTTP Stream used to watch changes on entities class WatchStream - def initialize(uri, http_options, formatter:) + def initialize(uri, options, formatter:) @uri = uri - @http_client = nil - @http_options = http_options - @http_options[:http_max_redirects] ||= Kubeclient::Client::DEFAULT_HTTP_MAX_REDIRECTS + @options = options + @headers = options[:headers] + @options[:http_max_redirects] ||= Kubeclient::Client::DEFAULT_HTTP_MAX_REDIRECTS @formatter = formatter + + @faraday_client = build_client end def each @finished = false - - @http_client = build_client - response = @http_client.request(:get, @uri, build_client_options) - unless response.code < 300 - raise Kubeclient::HttpError.new(response.code, response.reason, response) - end - buffer = +'' - response.body.each do |chunk| - buffer << chunk - while (line = buffer.slice!(/.+\n/)) - yield(@formatter.call(line.chomp)) + + begin + @faraday_client.get('', nil, @headers) do |request| + request.options.on_data = proc do |chunk| + buffer << chunk + while (line = buffer.slice!(/.+\n/)) + yield(@formatter.call(line.chomp)) + end + next if @finished + end end + rescue Faraday::Error => e + err_message = build_http_error_message(e) + response_code = e.response ? (e.response[:status] || e.response&.env&.status) : nil + error_klass = (response_code == 404 ? ResourceNotFoundError : HttpError) + raise error_klass.new(response_code, err_message, e.response) end - rescue StandardError - raise unless @finished end def finish @finished = true - @http_client&.close + @faraday_client.close end private - def max_hops - @http_options[:http_max_redirects] + 1 - end - - def follow_option - if max_hops > 1 - { max_hops: max_hops } - else - # i.e. Do not follow redirects as we have set http_max_redirects to 0 - # Setting `{ max_hops: 1 }` does not work FWIW - false - end - end - def build_client - client = HTTP::Client.new(follow: follow_option) + auth = @options[:auth_options] || {} + max_redirects = @options[:http_max_redirects] - if @http_options[:basic_auth_user] && @http_options[:basic_auth_password] - client = client.basic_auth( - user: @http_options[:basic_auth_user], - pass: @http_options[:basic_auth_password] - ) + Faraday.new(@uri, @options[:faraday_options] || {}) do |connection| + if auth[:username] && auth[:password] + connection.basic_auth(auth[:username], auth[:password]) + end + connection.use(FaradayMiddleware::FollowRedirects, limit: max_redirects) + connection.response(:raise_error) end - - client end - def using_proxy - proxy = @http_options[:http_proxy_uri] - return nil unless proxy - p_uri = URI.parse(proxy) - { - proxy_address: p_uri.hostname, - proxy_port: p_uri.port, - proxy_username: p_uri.user, - proxy_password: p_uri.password - } - end - - def build_client_options - client_options = { - headers: @http_options[:headers], - proxy: using_proxy - } - if @http_options[:ssl] - client_options[:ssl] = @http_options[:ssl] - socket_option = :ssl_socket_class - else - socket_option = :socket_class - end - client_options[socket_option] = @http_options[socket_option] if @http_options[socket_option] - client_options + def build_http_error_message(e) + json_error_msg = + begin + JSON.parse(e.response[:body] || '') || {} + rescue StandardError + {} + end + json_error_msg['message'] || e.message || '' end end end diff --git a/test/test_kubeclient.rb b/test/test_kubeclient.rb index b964d4e1..0219fe77 100644 --- a/test/test_kubeclient.rb +++ b/test/test_kubeclient.rb @@ -60,8 +60,10 @@ def test_pass_proxy assert_equal(proxy_uri.to_s, faraday_client.proxy.uri.to_s) watch_client = client.watch_pods - assert_equal(watch_client.send(:build_client_options)[:proxy][:proxy_address], proxy_uri.host) - assert_equal(watch_client.send(:build_client_options)[:proxy][:proxy_port], proxy_uri.port) + assert_equal( + watch_client.instance_variable_get('@faraday_client').proxy.uri.to_s, + proxy_uri.to_s + ) end def test_pass_max_redirects diff --git a/test/test_watch.rb b/test/test_watch.rb index d832cd27..765aa934 100644 --- a/test/test_watch.rb +++ b/test/test_watch.rb @@ -162,7 +162,7 @@ def test_watch_with_field_selector assert_requested(:get, "#{api_host}/v1/watch/events?fieldSelector=#{selector}", times: 1) end - def test_watch_with_finish_and_ebadf + def test_watch_with_finish api_host = 'http://localhost:8080/api' stub_core_api_list @@ -172,10 +172,30 @@ def test_watch_with_finish_and_ebadf client = Kubeclient::Client.new(api_host, 'v1') watcher = client.watch_events - # explodes when StandardError is not caught - watcher.each do # rubocop:disable Lint/UnreachableLoop + watcher.each do watcher.finish - raise StandardError + end + + assert_requested(:get, "#{api_host}/v1/watch/events", times: 1) + end + + def test_watch_with_finish_without_api_server_response + api_host = 'http://localhost:8080/api' + + stub_core_api_list + stub_request(:get, %r{.*/watch/events}) + .to_return(body: lambda { |request| sleep 3; open_test_file('watch_stream.json') }, status: 200) + + client = Kubeclient::Client.new(api_host, 'v1') + watcher = client.watch_events + + watch_killer = Thread.new(watcher) do |watcher| + sleep 1 + watcher.finish + end + + watcher.each do |event| + raise "Watching should finish before receiving an event" end assert_requested(:get, "#{api_host}/v1/watch/events", times: 1)